diff --git a/docs/en/connector-v2/source/CosFile.md b/docs/en/connector-v2/source/CosFile.md index ecdb87cac43..43e9748deae 100644 --- a/docs/en/connector-v2/source/CosFile.md +++ b/docs/en/connector-v2/source/CosFile.md @@ -313,12 +313,12 @@ Whether to use the header line to parse the file, only used when the file_format ### file_filter_pattern [string] -Filter pattern, which used for filtering files. +Filter pattern, which used for filtering files. If you only want to filter based on file names, simply write the regular file names; If you want to filter based on the file directory at the same time, the expression needs to start with `path`. The pattern follows standard regular expressions. For details, please refer to https://en.wikipedia.org/wiki/Regular_expression. There are some examples. -File Structure Example: +If the `path` is `/data/setunnel`, and the file structure example is: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -330,7 +330,7 @@ Matching Rules Example: **Example 1**: *Match all .txt files*,Regular Expression: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` The result of this example matching is: ``` @@ -338,14 +338,14 @@ The result of this example matching is: ``` **Example 2**: *Match all file starting with abc*,Regular Expression: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` The result of this example matching is: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**Example 3**: *Match all file starting with abc,And the fourth character is either h or g*, the Regular Expression: +**Example 3**: *Match all files starting with abc in folder 20241007,And the fourth character is either h or g*, the Regular Expression: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -355,7 +355,7 @@ The result of this example matching is: ``` **Example 4**: *Match third level folders starting with 202410 and files ending with .csv*, the Regular Expression: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` The result of this example matching is: ``` diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md index cd1807043bf..d74db7cce84 100644 --- a/docs/en/connector-v2/source/FtpFile.md +++ b/docs/en/connector-v2/source/FtpFile.md @@ -106,12 +106,12 @@ Whether to enable remote host verification for FTP data channels, default is `tr ### file_filter_pattern [string] -Filter pattern, which used for filtering files. +Filter pattern, which used for filtering files. If you only want to filter based on file names, simply write the regular file names; If you want to filter based on the file directory at the same time, the expression needs to start with `path`. The pattern follows standard regular expressions. For details, please refer to https://en.wikipedia.org/wiki/Regular_expression. There are some examples. -File Structure Example: +If the `path` is `/data/setunnel`, and the file structure example is: ``` /data/seatunnel/20241001/report.txt @@ -126,7 +126,7 @@ Matching Rules Example: **Example 1**: *Match all .txt files*,Regular Expression: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` The result of this example matching is: @@ -138,7 +138,7 @@ The result of this example matching is: **Example 2**: *Match all file starting with abc*,Regular Expression: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` The result of this example matching is: @@ -147,8 +147,7 @@ The result of this example matching is: /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` - -**Example 3**: *Match all file starting with abc,And the fourth character is either h or g*, the Regular Expression: +**Example 3**: *Match all files starting with abc in folder 20241007,And the fourth character is either h or g*, the Regular Expression: ``` /data/seatunnel/20241007/abc[h,g].* @@ -163,7 +162,7 @@ The result of this example matching is: **Example 4**: *Match third level folders starting with 202410 and files ending with .csv*, the Regular Expression: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` The result of this example matching is: diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index 1275744659b..c6f9f510bf9 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -118,12 +118,12 @@ default `\n` ### file_filter_pattern [string] -Filter pattern, which used for filtering files. +Filter pattern, which used for filtering files. If you only want to filter based on file names, simply write the regular file names; If you want to filter based on the file directory at the same time, the expression needs to start with `path`. The pattern follows standard regular expressions. For details, please refer to https://en.wikipedia.org/wiki/Regular_expression. There are some examples. -File Structure Example: +If the `path` is `/data/setunnel`, and the file structure example is: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -135,7 +135,7 @@ Matching Rules Example: **Example 1**: *Match all .txt files*,Regular Expression: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` The result of this example matching is: ``` @@ -143,14 +143,14 @@ The result of this example matching is: ``` **Example 2**: *Match all file starting with abc*,Regular Expression: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` The result of this example matching is: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**Example 3**: *Match all file starting with abc,And the fourth character is either h or g*, the Regular Expression: +**Example 3**: *Match all files starting with abc in folder 20241007,And the fourth character is either h or g*, the Regular Expression: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -160,7 +160,7 @@ The result of this example matching is: ``` **Example 4**: *Match third level folders starting with 202410 and files ending with .csv*, the Regular Expression: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` The result of this example matching is: ``` diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index c9ea01c757b..e030c914c9f 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -295,12 +295,12 @@ Whether to use the header line to parse the file, only used when the file_format ### file_filter_pattern [string] -Filter pattern, which used for filtering files. +Filter pattern, which used for filtering files. If you only want to filter based on file names, simply write the regular file names; If you want to filter based on the file directory at the same time, the expression needs to start with `path`. The pattern follows standard regular expressions. For details, please refer to https://en.wikipedia.org/wiki/Regular_expression. There are some examples. -File Structure Example: +If the `path` is `/data/setunnel`, and the file structure example is: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -312,7 +312,7 @@ Matching Rules Example: **Example 1**: *Match all .txt files*,Regular Expression: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` The result of this example matching is: ``` @@ -320,14 +320,14 @@ The result of this example matching is: ``` **Example 2**: *Match all file starting with abc*,Regular Expression: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` The result of this example matching is: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**Example 3**: *Match all file starting with abc,And the fourth character is either h or g*, the Regular Expression: +**Example 3**: *Match all files starting with abc in folder 20241007,And the fourth character is either h or g*, the Regular Expression: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -337,7 +337,7 @@ The result of this example matching is: ``` **Example 4**: *Match third level folders starting with 202410 and files ending with .csv*, the Regular Expression: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` The result of this example matching is: ``` @@ -404,7 +404,7 @@ File modification time filter. The connector will filter some files base on the ### file_filter_modified_end [string] -File modification time filter. The connector will filter some files base on the last modification end time (not include end time). The default data format is `yyyy-MM-dd HH:mm:ss`. +File modification time filter. The connector will filter some files base on the last modification end time (not include end time). The default data format is `yyyy-MM-dd HH:mm:ss`. ### common options diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md index d3ed99e2352..525ebed5f32 100644 --- a/docs/en/connector-v2/source/OssFile.md +++ b/docs/en/connector-v2/source/OssFile.md @@ -265,12 +265,12 @@ Whether to read the complete file as a single chunk instead of splitting into ch ### file_filter_pattern [string] -Filter pattern, which used for filtering files. +Filter pattern, which used for filtering files. If you only want to filter based on file names, simply write the regular file names; If you want to filter based on the file directory at the same time, the expression needs to start with `path`. The pattern follows standard regular expressions. For details, please refer to https://en.wikipedia.org/wiki/Regular_expression. There are some examples. -File Structure Example: +If the `path` is `/data/setunnel`, and the file structure example is: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -282,7 +282,7 @@ Matching Rules Example: **Example 1**: *Match all .txt files*,Regular Expression: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` The result of this example matching is: ``` @@ -290,14 +290,14 @@ The result of this example matching is: ``` **Example 2**: *Match all file starting with abc*,Regular Expression: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` The result of this example matching is: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**Example 3**: *Match all file starting with abc,And the fourth character is either h or g*, the Regular Expression: +**Example 3**: *Match all files starting with abc in folder 20241007,And the fourth character is either h or g*, the Regular Expression: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -307,7 +307,7 @@ The result of this example matching is: ``` **Example 4**: *Match third level folders starting with 202410 and files ending with .csv*, the Regular Expression: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` The result of this example matching is: ``` diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md index 8d7634cb916..221a22fa0f1 100644 --- a/docs/en/connector-v2/source/OssJindoFile.md +++ b/docs/en/connector-v2/source/OssJindoFile.md @@ -299,7 +299,7 @@ Reader the sheet of the workbook. ### file_filter_pattern [string] -Filter pattern, which used for filtering files. +Filter pattern, which used for filtering files. If you only want to filter based on file names, simply write the regular file names; If you want to filter based on the file directory at the same time, the expression needs to start with `path`. The pattern follows standard regular expressions. For details, please refer to https://en.wikipedia.org/wiki/Regular_expression. There are some examples. @@ -316,7 +316,7 @@ Matching Rules Example: **Example 1**: *Match all .txt files*,Regular Expression: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` The result of this example matching is: ``` @@ -324,14 +324,14 @@ The result of this example matching is: ``` **Example 2**: *Match all file starting with abc*,Regular Expression: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` The result of this example matching is: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**Example 3**: *Match all file starting with abc,And the fourth character is either h or g*, the Regular Expression: +**Example 3**: *Match all files starting with abc in folder 20241007,And the fourth character is either h or g*, the Regular Expression: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -341,7 +341,7 @@ The result of this example matching is: ``` **Example 4**: *Match third level folders starting with 202410 and files ending with .csv*, the Regular Expression: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` The result of this example matching is: ``` diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index 5ef70e33298..2a7aff8667e 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -257,12 +257,12 @@ default `\n` ### file_filter_pattern [string] -Filter pattern, which used for filtering files. +Filter pattern, which used for filtering files. If you only want to filter based on file names, simply write the regular file names; If you want to filter based on the file directory at the same time, the expression needs to start with `path`. The pattern follows standard regular expressions. For details, please refer to https://en.wikipedia.org/wiki/Regular_expression. There are some examples. -File Structure Example: +If the `path` is `/data/setunnel`, and the file structure example is: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -274,7 +274,7 @@ Matching Rules Example: **Example 1**: *Match all .txt files*,Regular Expression: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` The result of this example matching is: ``` @@ -282,14 +282,14 @@ The result of this example matching is: ``` **Example 2**: *Match all file starting with abc*,Regular Expression: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` The result of this example matching is: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**Example 3**: *Match all file starting with abc,And the fourth character is either h or g*, the Regular Expression: +**Example 3**: *Match all files starting with abc in folder 20241007,And the fourth character is either h or g*, the Regular Expression: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -299,7 +299,7 @@ The result of this example matching is: ``` **Example 4**: *Match third level folders starting with 202410 and files ending with .csv*, the Regular Expression: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` The result of this example matching is: ``` diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md index 14f4315e671..9eed5c246e2 100644 --- a/docs/en/connector-v2/source/SftpFile.md +++ b/docs/en/connector-v2/source/SftpFile.md @@ -113,12 +113,12 @@ The File does not have a specific type list, and we can indicate which SeaTunnel ### file_filter_pattern [string] -Filter pattern, which used for filtering files. +Filter pattern, which used for filtering files. If you only want to filter based on file names, simply write the regular file names; If you want to filter based on the file directory at the same time, the expression needs to start with `path`. The pattern follows standard regular expressions. For details, please refer to https://en.wikipedia.org/wiki/Regular_expression. There are some examples. -File Structure Example: +If the `path` is `/data/setunnel`, and the file structure example is: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -130,7 +130,7 @@ Matching Rules Example: **Example 1**: *Match all .txt files*,Regular Expression: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` The result of this example matching is: ``` @@ -138,14 +138,14 @@ The result of this example matching is: ``` **Example 2**: *Match all file starting with abc*,Regular Expression: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` The result of this example matching is: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**Example 3**: *Match all file starting with abc,And the fourth character is either h or g*, the Regular Expression: +**Example 3**: *Match all files starting with abc in folder 20241007,And the fourth character is either h or g*, the Regular Expression: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -155,7 +155,7 @@ The result of this example matching is: ``` **Example 4**: *Match third level folders starting with 202410 and files ending with .csv*, the Regular Expression: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` The result of this example matching is: ``` diff --git a/docs/zh/connector-v2/source/CosFile.md b/docs/zh/connector-v2/source/CosFile.md index 5f26a7f70a2..e45cdcffa41 100644 --- a/docs/zh/connector-v2/source/CosFile.md +++ b/docs/zh/connector-v2/source/CosFile.md @@ -311,12 +311,12 @@ default `HH:mm:ss` ### file_filter_pattern [string] -过滤模式,用于过滤文件。 +文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。 该模式遵循标准正则表达式。详情请参阅https://en.wikipedia.org/wiki/Regular_expression. 有一些例子。 -文件结构示例: +若`path`为`/data/seatunnel`,且文件结构示例: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -328,7 +328,7 @@ default `HH:mm:ss` **示例1**:*匹配所有.txt文件*,正则表达式: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` 此示例匹配的结果为: ``` @@ -336,14 +336,14 @@ default `HH:mm:ss` ``` **示例2**:*匹配所有以abc*开头的文件,正则表达式: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` 此示例匹配的结果为: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**示例3**:*匹配所有以abc开头的文件,第四个字符是h或g*,正则表达式: +**示例3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -353,7 +353,7 @@ default `HH:mm:ss` ``` **示例4**:*匹配以202410开头的三级文件夹和以.csv*结尾的文件,正则表达式: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` 此示例匹配的结果为: ``` diff --git a/docs/zh/connector-v2/source/FtpFile.md b/docs/zh/connector-v2/source/FtpFile.md index f7272f45a25..54a48370cbc 100644 --- a/docs/zh/connector-v2/source/FtpFile.md +++ b/docs/zh/connector-v2/source/FtpFile.md @@ -102,12 +102,12 @@ import ChangeLog from '../changelog/connector-file-ftp.md'; ### file_filter_pattern [string] -文件过滤模式,用于过滤文件。 +文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。 该模式遵循标准正则表达式。详情请参考:https://en.wikipedia.org/wiki/Regular_expression. 以下是一些示例。 -文件结构示例: +若`path`为`/data/seatunnel`,且文件结构示例: ``` /data/seatunnel/20241001/report.txt @@ -121,7 +121,7 @@ import ChangeLog from '../changelog/connector-file-ftp.md'; **示例 1**:*匹配所有 .txt 文件*,正则表达式: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` 该示例匹配结果为: ``` @@ -129,14 +129,14 @@ import ChangeLog from '../changelog/connector-file-ftp.md'; ``` **示例 2**:*匹配所有以 abc 开头的文件*,正则表达式: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` 该示例匹配结果为: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**示例 3**:*匹配所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式: +**示例 3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -146,7 +146,7 @@ import ChangeLog from '../changelog/connector-file-ftp.md'; ``` **示例 4**:*匹配第三级文件夹以 202410 开头且文件以 .csv 结尾的文件*,正则表达式: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` 该示例匹配结果为: ``` @@ -352,7 +352,7 @@ SeaTunnel 将从源文件中跳过前 2 行。 ### csv_use_header_line [boolean] 仅在文件格式为 csv 时可以选择配置。 -是否使用标题行来解析文件, 标题行 与 RFC 4180 匹配 +是否使用标题行来解析文件, 标题行 与 RFC 4180 匹配 ### compress_codec [string] diff --git a/docs/zh/connector-v2/source/HdfsFile.md b/docs/zh/connector-v2/source/HdfsFile.md index 0c9bc291531..655f25192b6 100644 --- a/docs/zh/connector-v2/source/HdfsFile.md +++ b/docs/zh/connector-v2/source/HdfsFile.md @@ -119,12 +119,12 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 ### file_filter_pattern [string] -过滤模式,用于过滤文件。 +文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。 该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 以下是一些示例。 -文件结构示例: +若`path`为`/data/seatunnel`,且文件结构示例: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -136,7 +136,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 **示例 1**:*匹配所有 .txt 文件*,正则表达式: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` 此示例匹配的结果是: ``` @@ -144,14 +144,14 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 ``` **示例 2**:*匹配所有以 abc 开头的文件*,正则表达式: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` 此示例匹配的结果是: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**示例 3**:*匹配所有以 abc 开头,且第四个字符是 h 或 g 的文件*,正则表达式: +**示例 3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -161,7 +161,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 ``` **示例 4**:*匹配以 202410 开头的第三级文件夹和以 .csv 结尾的文件*,正则表达式: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` 此示例匹配的结果是: ``` diff --git a/docs/zh/connector-v2/source/LocalFile.md b/docs/zh/connector-v2/source/LocalFile.md index 752ea87de15..b2eb9b86f5c 100644 --- a/docs/zh/connector-v2/source/LocalFile.md +++ b/docs/zh/connector-v2/source/LocalFile.md @@ -304,12 +304,12 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 ### file_filter_pattern [string] -过滤模式,用于过滤文件。 +文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。 该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 以下是一些示例。 -文件结构示例: +若`path`为`/data/seatunnel`,且文件结构示例: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -321,7 +321,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 **示例 1**:*匹配所有 .txt 文件*,正则表达式: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` 此示例匹配的结果是: ``` @@ -329,14 +329,14 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 ``` **示例 2**:*匹配所有以 abc 开头的文件*,正则表达式: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` 此示例匹配的结果是: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**示例 3**:*匹配所有以 abc 开头,且第四个字符是 h 或 g 的文件*,正则表达式: +**示例 3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -346,7 +346,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 ``` **示例 4**:*匹配以 202410 开头的第三级文件夹和以 .csv 结尾的文件*,正则表达式: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` 此示例匹配的结果是: ``` diff --git a/docs/zh/connector-v2/source/OssFile.md b/docs/zh/connector-v2/source/OssFile.md index a7fa4055860..25bb1bce9cb 100644 --- a/docs/zh/connector-v2/source/OssFile.md +++ b/docs/zh/connector-v2/source/OssFile.md @@ -264,12 +264,12 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 ### file_filter_pattern [string] -过滤模式,用于过滤文件。 +文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。 该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 以下是一些示例。 -文件结构示例: +若`path`为`/data/seatunnel`,且文件结构示例: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -281,7 +281,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 **示例1**:*匹配所有.txt文件*,正则表达式: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` 此示例匹配的结果是: ``` @@ -289,14 +289,14 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 ``` **示例2**:*匹配所有以abc开头的文件*,正则表达式: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` 此示例匹配的结果是: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**示例3**:*匹配所有以abc开头,且第四个字符是h或g的文件*,正则表达式: +**示例3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -306,7 +306,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码 ``` **示例4**:*匹配以202410开头的第三级文件夹和以.csv结尾的文件*,正则表达式: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` 此示例匹配的结果是: ``` diff --git a/docs/zh/connector-v2/source/S3File.md b/docs/zh/connector-v2/source/S3File.md index 1d7b90ed1c3..9e4f0b9fca0 100644 --- a/docs/zh/connector-v2/source/S3File.md +++ b/docs/zh/connector-v2/source/S3File.md @@ -236,12 +236,12 @@ schema { ### file_filter_pattern [string] -过滤模式,用于过滤文件。 +文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。 该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 以下是一些示例。 -文件结构示例: +若`path`为`/data/seatunnel`,且文件结构示例: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -253,7 +253,7 @@ schema { **示例1**:*匹配所有.txt文件*,正则表达式: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` 此示例匹配的结果是: ``` @@ -261,14 +261,14 @@ schema { ``` **示例2**:*匹配所有以abc开头的文件*,正则表达式: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` 此示例匹配的结果是: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**示例3**:*匹配所有以abc开头,且第四个字符是h或g的文件*,正则表达式: +**示例3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -278,7 +278,7 @@ schema { ``` **示例4**:*匹配以202410开头的第三级文件夹和以.csv结尾的文件*,正则表达式: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` 此示例匹配的结果是: ``` diff --git a/docs/zh/connector-v2/source/SftpFile.md b/docs/zh/connector-v2/source/SftpFile.md index e48f4e81c5f..0182f53ba8a 100644 --- a/docs/zh/connector-v2/source/SftpFile.md +++ b/docs/zh/connector-v2/source/SftpFile.md @@ -113,12 +113,12 @@ import ChangeLog from '../changelog/connector-file-sftp.md'; ### file_filter_pattern [string] -过滤模式,用于过滤文件。 +文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。 该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 以下是一些示例。 -文件结构示例: +若`path`为`/data/seatunnel`,且文件结构示例: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -130,7 +130,7 @@ import ChangeLog from '../changelog/connector-file-sftp.md'; **示例1**:*匹配所有.txt文件*,正则表达式: ``` -/data/seatunnel/20241001/.*\.txt +.*.txt ``` 此示例匹配的结果是: ``` @@ -138,14 +138,14 @@ import ChangeLog from '../changelog/connector-file-sftp.md'; ``` **示例2**:*匹配所有以abc开头的文件*,正则表达式: ``` -/data/seatunnel/20241002/abc.* +abc.* ``` 此示例匹配的结果是: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**示例3**:*匹配所有以abc开头,且第四个字符是h或g的文件*,正则表达式: +**示例3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式: ``` /data/seatunnel/20241007/abc[h,g].* ``` @@ -155,7 +155,7 @@ import ChangeLog from '../changelog/connector-file-sftp.md'; ``` **示例4**:*匹配以202410开头的第三级文件夹和以.csv结尾的文件*,正则表达式: ``` -/data/seatunnel/202410\d*/.*\.csv +/data/seatunnel/202410\d*/.*.csv ``` 此示例匹配的结果是: ``` diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java index a8c302ab741..3bcda9bd451 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java @@ -92,6 +92,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy { protected Pattern pattern; protected Date fileModifiedStartDate; protected Date fileModifiedEndDate; + protected String fileBasePath; @Override public void init(HadoopConf conf) { @@ -223,6 +224,11 @@ public void setPluginConfig(Config pluginConfig) { String filterPattern = pluginConfig.getString(FileBaseSourceOptions.FILE_FILTER_PATTERN.key()); this.pattern = Pattern.compile(filterPattern); + // because 'ConfigFactory.systemProperties()' has a 'path' parameter, it is necessary to + // obtain 'path' under the premise of 'FILE_FILTER_PATTERN' + if (pluginConfig.hasPath(FileBaseSourceOptions.FILE_PATH.key())) { + fileBasePath = pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key()); + } } if (pluginConfig.hasPath(FileBaseSourceOptions.FILE_FILTER_MODIFIED_START.key())) { fileModifiedStartDate = @@ -406,7 +412,15 @@ protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType sea } protected boolean filterFileByPattern(FileStatus fileStatus) { - if (Objects.nonNull(pattern)) { + if (Objects.nonNull(pattern) && Objects.nonNull(fileBasePath)) { + if (pattern.pattern().startsWith(fileBasePath)) { + // filter based on the file directory at the same time + String absPath = fileStatus.getPath().toUri().getPath(); + // absPath.substring(absPath.indexOf(fileBasePath), It is to be compatible with + // scenarios where fileBasePath is a relative path + return pattern.matcher(absPath.substring(absPath.indexOf(fileBasePath))).matches(); + } + // filter based on file names return pattern.matcher(fileStatus.getPath().getName()).matches(); } return true; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/FileFilterPatternTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/FileFilterPatternTest.java new file mode 100644 index 00000000000..3691978dd25 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/FileFilterPatternTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.reader; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; + +import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.List; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT; + +public class FileFilterPatternTest { + /** + * filter based on the file directory at the same time, the expression needs to start with + * `path` + * + * @throws URISyntaxException + * @throws IOException + */ + @Test + @DisabledOnOs(OS.WINDOWS) + public void testJsonFilterPatternWithFilePath() throws URISyntaxException, IOException { + URL filterPattern = FileFilterPatternTest.class.getResource("/filter-pattern/json"); + URL conf = + ExcelReadStrategyTest.class.getResource( + "/filter-pattern/json/json2025/test_read_json.conf"); + Assertions.assertNotNull(filterPattern); + Assertions.assertNotNull(conf); + // path + String jsonPathDir = filterPattern.toURI().getPath(); + // the expression needs to start with `path` + String fileFilterPattern = jsonPathDir + "/json202[^/]*/.*.json"; + + String confPath = Paths.get(conf.toURI()).toString(); + Config pluginConfig = + ConfigFactory.parseFile(new File(confPath)) + .withValue( + FileBaseSourceOptions.FILE_FILTER_PATTERN.key(), + ConfigValueFactory.fromAnyRef(fileFilterPattern)) + .withValue( + FileBaseSourceOptions.FILE_PATH.key(), + ConfigValueFactory.fromAnyRef(jsonPathDir)); + + JsonReadStrategy jsonReadStrategy = new JsonReadStrategy(); + LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT); + jsonReadStrategy.setPluginConfig(pluginConfig); + jsonReadStrategy.init(localConf); + + List filterFileNames = jsonReadStrategy.getFileNamesByPath(jsonPathDir); + Assertions.assertEquals(2, filterFileNames.size()); + String fileName = filterFileNames.get(0); + Assertions.assertTrue(fileName.endsWith(".json")); + } + + /** + * filter based on file names, just simply write the regular file names + * + * @throws URISyntaxException + * @throws IOException + */ + @Test + @DisabledOnOs(OS.WINDOWS) + public void testJsonFilterPatternWithFileName() throws URISyntaxException, IOException { + URL filterPattern = FileFilterPatternTest.class.getResource("/filter-pattern/json"); + URL conf = + ExcelReadStrategyTest.class.getResource( + "/filter-pattern/json/json2025/test_read_json.conf"); + Assertions.assertNotNull(filterPattern); + Assertions.assertNotNull(conf); + // path + String jsonPathDir = filterPattern.toURI().getPath(); + // just simply write the regular file names + String fileFilterPattern = ".*.json"; + String confPath = Paths.get(conf.toURI()).toString(); + Config pluginConfig = + ConfigFactory.parseFile(new File(confPath)) + .withValue( + FileBaseSourceOptions.FILE_FILTER_PATTERN.key(), + ConfigValueFactory.fromAnyRef(fileFilterPattern)) + .withValue( + FileBaseSourceOptions.FILE_PATH.key(), + ConfigValueFactory.fromAnyRef(jsonPathDir)); + JsonReadStrategy jsonReadStrategy = new JsonReadStrategy(); + LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT); + jsonReadStrategy.setPluginConfig(pluginConfig); + jsonReadStrategy.init(localConf); + + List filterFileNames = jsonReadStrategy.getFileNamesByPath(jsonPathDir); + Assertions.assertEquals(3, filterFileNames.size()); + for (String fileName : filterFileNames) { + Assertions.assertTrue(fileName.endsWith(".json")); + } + } + + public static class LocalConf extends HadoopConf { + private static final String HDFS_IMPL = "org.apache.hadoop.fs.LocalFileSystem"; + private static final String SCHEMA = "file"; + + public LocalConf(String hdfsNameKey) { + super(hdfsNameKey); + } + + @Override + public String getFsHdfsImpl() { + return HDFS_IMPL; + } + + @Override + public String getSchema() { + return SCHEMA; + } + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2024/202401.json b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2024/202401.json new file mode 100644 index 00000000000..d1d8fabdbf8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2024/202401.json @@ -0,0 +1 @@ +{"name": "202401"} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/202501.json b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/202501.json new file mode 100644 index 00000000000..014000891bd --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/202501.json @@ -0,0 +1 @@ +{"name": "202501"} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/test_read_json.conf b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/test_read_json.conf new file mode 100644 index 00000000000..295640031cc --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/test_read_json.conf @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +{ + sheet_name = "Sheet1" + skip_header_row_number = 1 + schema = { + fields { + c_bytes = "tinyint" + c_short = "smallint" + c_int = "int" + c_bigint = "bigint" + c_string = "string" + c_double = "double" + c_float = "float" + c_decimal = "decimal(10, 2)" + c_boolean = "boolean" + c_map = "map" + c_array = "array" + c_date = "date" + c_datetime = "timestamp" + c_time = "time" + } + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/people.json b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/people.json new file mode 100644 index 00000000000..76b148cddce --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/people.json @@ -0,0 +1 @@ +{"name": "people"} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java index c6bb9c8aed1..dc312d8ff7b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java @@ -271,7 +271,15 @@ private void disconnect(FTPClient client) throws IOException { */ private Path makeAbsolute(Path workDir, Path path) { if (path.isAbsolute()) { - return path; + String filePath = path.toUri().getPath(); + if (filePath.equals("/")) { + return workDir; + } + if (filePath.startsWith(workDir.toUri().getPath())) { + return path; + } + // delete '/' + return new Path(workDir, filePath.substring(1)); } return new Path(workDir, path); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java index 95b3a408e48..575fe7f9ade 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java @@ -206,6 +206,48 @@ public void testFtpToFtpForBinary(TestContainer container) deleteFileFromContainer(homePath); } + @TestTemplate + public void testFtpToAssertForJsonFilter(TestContainer container) + throws IOException, InterruptedException { + + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json/name=tyrantlucifer/hobby=coding/e2e.json", + ftpContainer); + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.json", + ftpContainer); + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.txt", + ftpContainer); + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=coding/e2e_2024.json", + ftpContainer); + + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/text/name=tyrantlucifer/hobby=coding/e2e.txt", + ftpContainer); + + ftpContainer.execInContainer("sh", "-c", "chmod -R 777 /home/vsftpd/seatunnel/"); + ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp /home/vsftpd/seatunnel/"); + + TestHelper helper = new TestHelper(container); + // -----filter based on the file directory at the same time, the expression needs to start + // with `path`-------- + helper.execute("/json/ftp_to_access_for_json_path_filter.conf"); + + // -------filter based on file names, just simply write the regular file names-------- + helper.execute("/json/ftp_to_access_for_json_name_filter.conf"); + + // delete path + String filterPath = "/home/vsftpd/seatunnel/tmp/seatunnel/read/filter"; + deleteFileFromContainer(filterPath); + } + private void assertJobExecution(TestContainer container, String configPath, List params) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob(configPath, params); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_name_filter.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_name_filter.conf new file mode 100644 index 00000000000..615a705c677 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_name_filter.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FtpFile { + host = "ftp" + port = 21 + user = seatunnel + password = pass + path= "tmp/seatunnel/read/filter" + file_filter_pattern=".*.json" + file_format_type= "json" + encoding = "UTF-8" + schema = { + fields { + c_string = string + } + } + } +} + + +sink { + Assert { + rules = + { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 15 + }, + { + rule_type = MIN_ROW + rule_value = 15 + } + ], + field_rules = [{ + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN_LENGTH + rule_value = 5 + }, + { + rule_type = MAX_LENGTH + rule_value = 5 + } + ] + }] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_path_filter.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_path_filter.conf new file mode 100644 index 00000000000..49b0e95ad30 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_path_filter.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FtpFile { + host = "ftp" + port = 21 + user = seatunnel + password = pass + path= "tmp/seatunnel/read/filter" + file_filter_pattern="tmp/seatunnel/read/filter/json202[^/]*/.*.json" + file_format_type= "json" + encoding = "UTF-8" + schema = { + fields { + c_string = string + } + } + } +} + + +sink { + Assert { + rules = + { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 10 + }, + { + rule_type = MIN_ROW + rule_value = 10 + } + ], + field_rules = [{ + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN_LENGTH + rule_value = 5 + }, + { + rule_type = MAX_LENGTH + rule_value = 5 + } + ] + }] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java index 3750d82d9b5..09dbab45a4c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java @@ -94,6 +94,46 @@ public class OssFileIT extends TestSuiteBase { Assertions.assertEquals(0, extraCommands.getExitCode()); }; + @TestTemplate + public void testOssToAccessForJsonFilter(TestContainer container) + throws IOException, InterruptedException { + // Copy test files to OSS + OssUtils ossUtils = new OssUtils(); + try { + ossUtils.uploadTestFiles( + "/json/e2e.json", + "test/seatunnel/read/filter/json/name=tyrantlucifer/hobby=coding/e2e.json", + true); + + ossUtils.uploadTestFiles( + "/json/e2e.json", + "test/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.json", + true); + ossUtils.uploadTestFiles( + "/text/e2e.txt", + "test/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.txt", + true); + ossUtils.uploadTestFiles( + "/json/e2e.json", + "test/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=coding/e2e_2024.json", + true); + + ossUtils.uploadTestFiles( + "/text/e2e.txt", + "test/seatunnel/read/filter/text/name=tyrantlucifer/hobby=coding/e2e.txt", + true); + } finally { + ossUtils.close(); + } + + TestHelper helper = new TestHelper(container); + // -----filter based on the file directory at the same time, the expression needs to start + // with `path`-------- + helper.execute("oss_to_access_for_json_path_filter.conf"); + // -------filter based on file names, just simply write the regular file names-------- + helper.execute("oss_to_access_for_json_name_filter.conf"); + } + /** Copy data files to oss */ @TestTemplate public void testOssFileReadAndWrite(TestContainer container) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_name_filter.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_name_filter.conf new file mode 100644 index 00000000000..f1fc9a98328 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_name_filter.conf @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + OssFile { + bucket = "oss://whale-ops" + access_key = "xxxxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxx" + endpoint = "https://oss-accelerate.aliyuncs.com" + path = "/test/seatunnel/read/filter" + file_filter_pattern=".*.json" + file_format_type = "json" + schema = { + fields { + c_string = string + } + } + } +} + +sink { + Assert { + rules = + { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 15 + }, + { + rule_type = MIN_ROW + rule_value = 15 + } + ], + field_rules = [{ + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN_LENGTH + rule_value = 5 + }, + { + rule_type = MAX_LENGTH + rule_value = 5 + } + ] + }] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_path_filter.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_path_filter.conf new file mode 100644 index 00000000000..f689f8cc546 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_path_filter.conf @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + OssFile { + bucket = "oss://whale-ops" + access_key = "xxxxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxx" + endpoint = "https://oss-accelerate.aliyuncs.com" + path = "/test/seatunnel/read/filter" + file_filter_pattern="/test/seatunnel/read/filter/json202[^/]*/.*.json" + file_format_type = "json" + schema = { + fields { + c_string = string + } + } + } +} + +sink { + Assert { + rules = + { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 10 + }, + { + rule_type = MIN_ROW + rule_value = 10 + } + ], + field_rules = [{ + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN_LENGTH + rule_value = 5 + }, + { + rule_type = MAX_LENGTH + rule_value = 5 + } + ] + }] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileIT.java index 3ef03aad700..cb4d82e2745 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileIT.java @@ -17,19 +17,31 @@ package org.apache.seatunnel.e2e.connector.file.s3; +import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.container.TestHelper; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports; import io.airlift.compress.lzo.LzopCodec; +import lombok.extern.slf4j.Slf4j; import java.io.File; import java.io.IOException; @@ -38,14 +50,55 @@ import java.nio.file.Path; import java.nio.file.Paths; -@Disabled("have no s3 environment to run this test") -public class S3FileIT extends TestSuiteBase { +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK}, + disabledReason = + "1.The apache-compress version is not compatible with apache-poi. 2.Spark Engine is not compatible with commons-net") +@Slf4j +public class S3FileIT extends TestSuiteBase implements TestResource { + private GenericContainer s3Container; + + private static final String MINIO_IMAGE = "minio/minio:latest"; + + private static final int S3_PORT = 9000; + + private static final String S3_CONTAINER_HOST = "s3"; public static final String S3_SDK_DOWNLOAD = "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar"; public static final String HADOOP_S3_DOWNLOAD = "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.4/hadoop-aws-3.1.4.jar"; + @BeforeAll + @Override + public void startUp() throws Exception { + s3Container = + new GenericContainer<>(DockerImageName.parse(MINIO_IMAGE)) + .withNetwork(NETWORK) + .withExposedPorts(S3_PORT) + .withNetworkAliases(S3_CONTAINER_HOST) + .withCreateContainerCmdModifier( + cmd -> + cmd.withPortBindings( + new PortBinding( + Ports.Binding.bindPort(S3_PORT), + new ExposedPort(S3_PORT)))) + .withLogConsumer(new Slf4jLogConsumer(log)) + .withEnv("MINIO_ROOT_USER", "minioadmin") + .withEnv("MINIO_ROOT_PASSWORD", "minioadmin") + .withCommand("server", "/data") + .waitingFor(Wait.forLogMessage(".*", 1)); + s3Container.start(); + } + + @Override + public void tearDown() throws Exception { + if (s3Container != null) { + s3Container.close(); + } + } + @TestContainerExtension private final ContainerExtendedFactory extendedFactory = container -> { @@ -66,55 +119,87 @@ public class S3FileIT extends TestSuiteBase { Assertions.assertEquals(0, extraCommands.getExitCode()); }; + @TestTemplate + public void testS3ToAssertForJsonFilter(TestContainer container) + throws IOException, InterruptedException { + + // Copy test files to s3 + S3Utils s3Utils = new S3Utils(); + + S3Utils.uploadTestFiles( + "/json/e2e.json", + "test/seatunnel/read/filter/json/name=tyrantlucifer/hobby=codin/e2e.json", + true); + + S3Utils.uploadTestFiles( + "/json/e2e.json", + "test/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=codin/e2e.json", + true); + + S3Utils.uploadTestFiles( + "/text/e2e.txt", + "test/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=codin/e2e_2025.txt", + true); + + S3Utils.uploadTestFiles( + "/json/e2e.json", + "test/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=codin/e2e_2024.json", + true); + + S3Utils.uploadTestFiles( + "/text/e2e.txt", + "test/seatunnel/read/filter/text/name=tyrantlucifer/hobby=codin/e2e.txt", + true); + TestHelper helper = new TestHelper(container); + // -----filter based on the file directory at the same time, the expression needs to start + // with `path`-------- + helper.execute("/json/s3_to_access_for_json_path_filter.conf"); + + // -------filter based on file names, just simply write the regular file names-------- + helper.execute("/json/s3_to_access_for_json_name_filter.conf"); + } + /** Copy data files to s3 */ @TestTemplate + @Disabled public void testS3FileReadAndWrite(TestContainer container) throws IOException, InterruptedException { // Copy test files to s3 - S3Utils s3Utils = new S3Utils(); - try { - s3Utils.uploadTestFiles( - "/json/e2e.json", - "test/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json", - true); - Path jsonLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/json/e2e.json")); - s3Utils.uploadTestFiles( - jsonLzo.toString(), "test/seatunnel/read/lzo_json/e2e.json", false); - s3Utils.uploadTestFiles( - "/text/e2e.txt", - "test/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", - true); - s3Utils.uploadTestFiles( - "/text/e2e_delimiter.txt", "test/seatunnel/read/text_delimiter/e2e.txt", true); - s3Utils.uploadTestFiles( - "/text/e2e_time_format.txt", - "test/seatunnel/read/text_time_format/e2e.txt", - true); - Path txtLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/text/e2e.txt")); - s3Utils.uploadTestFiles( - txtLzo.toString(), "test/seatunnel/read/lzo_text/e2e.txt", false); - s3Utils.uploadTestFiles( - "/excel/e2e.xlsx", - "test/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx", - true); - s3Utils.uploadTestFiles( - "/orc/e2e.orc", - "test/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc", - true); - s3Utils.uploadTestFiles( - "/parquet/e2e.parquet", - "test/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet", - true); - s3Utils.uploadTestFiles( - "/excel/e2e.xlsx", - "test/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx", - true); - s3Utils.uploadTestFiles( - "/text/e2e-text.zip", "test/seatunnel/read/text_zip/e2e-text.zip", true); - s3Utils.createDir("tmp/fake_empty"); - } finally { - s3Utils.close(); - } + S3Utils.uploadTestFiles( + "/json/e2e.json", + "test/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json", + true); + Path jsonLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/json/e2e.json")); + S3Utils.uploadTestFiles(jsonLzo.toString(), "test/seatunnel/read/lzo_json/e2e.json", false); + S3Utils.uploadTestFiles( + "/text/e2e.txt", + "test/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", + true); + S3Utils.uploadTestFiles( + "/text/e2e_delimiter.txt", "test/seatunnel/read/text_delimiter/e2e.txt", true); + S3Utils.uploadTestFiles( + "/text/e2e_time_format.txt", "test/seatunnel/read/text_time_format/e2e.txt", true); + Path txtLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/text/e2e.txt")); + S3Utils.uploadTestFiles(txtLzo.toString(), "test/seatunnel/read/lzo_text/e2e.txt", false); + S3Utils.uploadTestFiles( + "/excel/e2e.xlsx", + "test/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx", + true); + S3Utils.uploadTestFiles( + "/orc/e2e.orc", + "test/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc", + true); + S3Utils.uploadTestFiles( + "/parquet/e2e.parquet", + "test/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet", + true); + S3Utils.uploadTestFiles( + "/excel/e2e.xlsx", + "test/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx", + true); + S3Utils.uploadTestFiles( + "/text/e2e-text.zip", "test/seatunnel/read/text_zip/e2e-text.zip", true); + S3Utils.createDir("tmp/fake_empty"); TestHelper helper = new TestHelper(container); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithMultipleTableIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithMultipleTableIT.java index 34fd443146c..bc45fdeece9 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithMultipleTableIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithMultipleTableIT.java @@ -76,32 +76,27 @@ public class S3FileWithMultipleTableIT extends TestSuiteBase { @TestTemplate public void addTestFiles(TestContainer container) throws IOException, InterruptedException { // Copy test files to s3 - S3Utils s3Utils = new S3Utils(); - try { - s3Utils.uploadTestFiles( - "/json/e2e.json", - "test/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json", - true); - s3Utils.uploadTestFiles( - "/text/e2e.txt", - "test/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", - true); - s3Utils.uploadTestFiles( - "/excel/e2e.xlsx", - "test/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx", - true); - s3Utils.uploadTestFiles( - "/orc/e2e.orc", - "test/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc", - true); - s3Utils.uploadTestFiles( - "/parquet/e2e.parquet", - "test/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet", - true); - s3Utils.createDir("tmp/fake_empty"); - } finally { - s3Utils.close(); - } + S3Utils.uploadTestFiles( + "/json/e2e.json", + "test/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json", + true); + S3Utils.uploadTestFiles( + "/text/e2e.txt", + "test/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", + true); + S3Utils.uploadTestFiles( + "/excel/e2e.xlsx", + "test/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx", + true); + S3Utils.uploadTestFiles( + "/orc/e2e.orc", + "test/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc", + true); + S3Utils.uploadTestFiles( + "/parquet/e2e.parquet", + "test/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet", + true); + S3Utils.createDir("tmp/fake_empty"); } @TestTemplate diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3Utils.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3Utils.java index 6c3f449d785..ab3dbfc2433 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3Utils.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3Utils.java @@ -34,29 +34,32 @@ import java.io.File; import java.io.InputStream; -public class S3Utils { +public class S3Utils implements AutoCloseable { private static Logger logger = LoggerFactory.getLogger(S3Utils.class); - private static final String ACCESS_KEY = "XXXXXX"; - private static final String SECRET_KEY = "AWS_XXXX"; + private static final String ACCESS_KEY = "minioadmin"; + private static final String SECRET_KEY = "minioadmin"; private static final String REGION = "cn-north-1"; - private static final String ENDPOINT = - "s3.cn-north-1.amazonaws.com.cn"; // For example, "https://s3.amazonaws.com" - private String bucket = "ws-package"; + private static final String ENDPOINT = "http://localhost:9000"; + private static final String BUCKET = "ws-package"; - private final AmazonS3 s3Client; + private static final AmazonS3 S3_CLIENT; - public S3Utils() { + static { BasicAWSCredentials credentials = new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY); - - this.s3Client = + S3_CLIENT = AmazonS3ClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .enablePathStyleAccess() .withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration(ENDPOINT, REGION)) .build(); + + if (!S3_CLIENT.doesBucketExistV2(BUCKET)) { + S3_CLIENT.createBucket(BUCKET); + } } - public void uploadTestFiles( + public static void uploadTestFiles( String filePath, String targetFilePath, boolean isFindFromResource) { File resourcesFile = null; if (isFindFromResource) { @@ -64,21 +67,22 @@ public void uploadTestFiles( } else { resourcesFile = new File(filePath); } - s3Client.putObject(bucket, targetFilePath, resourcesFile); + S3_CLIENT.putObject(BUCKET, targetFilePath, resourcesFile); } - public void createDir(String dir) { + public static void createDir(String dir) { ObjectMetadata metadata = new ObjectMetadata(); metadata.setContentLength(0); InputStream emptyContent = new ByteArrayInputStream(new byte[0]); PutObjectRequest putObjectRequest = - new PutObjectRequest(bucket, dir, emptyContent, metadata); - s3Client.putObject(putObjectRequest); + new PutObjectRequest(BUCKET, dir, emptyContent, metadata); + S3_CLIENT.putObject(putObjectRequest); } - public void close() { - if (s3Client != null) { - s3Client.shutdown(); + @Override + public void close() throws Exception { + if (S3_CLIENT != null) { + S3_CLIENT.shutdown(); } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_name_filter.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_name_filter.conf new file mode 100644 index 00000000000..1fbe1df2bd9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_name_filter.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = "local" +} + +source { + S3File { + fs.s3a.endpoint = "http://s3:9000" + fs.s3a.path.style.access = true + fs.s3a.aws.credentials.provider = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + access_key = "minioadmin" + secret_key = "minioadmin" + bucket = "s3a://ws-package" + path = "/test/seatunnel/read/filter" + file_filter_pattern = ".*.json" + file_format_type = "json" + schema = { + fields { + c_string = string + } + } + } +} + +sink { + Assert { + rules = { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 15 + }, + { + rule_type = MIN_ROW + rule_value = 15 + } + ] + field_rules = [ + { + field_name = "c_string" + field_type = "string" + field_value = [ + { rule_type = NOT_NULL }, + { rule_type = MIN_LENGTH, rule_value = 5 }, + { rule_type = MAX_LENGTH, rule_value = 5 } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_path_filter.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_path_filter.conf new file mode 100644 index 00000000000..fb23e6862ea --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_path_filter.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = "local" +} + +source { + S3File { + fs.s3a.endpoint = "http://s3:9000" + fs.s3a.path.style.access = true + fs.s3a.aws.credentials.provider = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + access_key = "minioadmin" + secret_key = "minioadmin" + bucket = "s3a://ws-package" + path = "/test/seatunnel/read/filter" + file_filter_pattern = "/test/seatunnel/read/filter/json202[^/]*/.*.json" + file_format_type = "json" + schema = { + fields { + c_string = string + } + } + } +} + +sink { + Assert { + rules = { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 10 + }, + { + rule_type = MIN_ROW + rule_value = 10 + } + ] + field_rules = [ + { + field_name = "c_string" + field_type = "string" + field_value = [ + { rule_type = NOT_NULL }, + { rule_type = MIN_LENGTH, rule_value = 5 }, + { rule_type = MAX_LENGTH, rule_value = 5 } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java index fdc9b94cbd2..d8bc2efbb98 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java @@ -127,6 +127,46 @@ public void startUp() throws Exception { sftpContainer.execInContainer("sh", "-c", "chown -R seatunnel /home/seatunnel/tmp/"); } + @TestTemplate + public void testFtpToAssertJsonFilter(TestContainer container) + throws IOException, InterruptedException { + + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/home/seatunnel/tmp/seatunnel/read/filter/json/name=tyrantlucifer/hobby=codin/e2e.json", + sftpContainer); + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/home/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.json", + sftpContainer); + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/home/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.txt", + sftpContainer); + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/home/seatunnel/tmp/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=coding/e2e_2024.json", + sftpContainer); + + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/home/seatunnel/tmp/seatunnel/read/filter/text/name=tyrantlucifer/hobby=coding/e2e.txt", + sftpContainer); + sftpContainer.execInContainer("sh", "-c", "chown -R seatunnel /home/seatunnel/tmp/"); + + TestHelper helper = new TestHelper(container); + // -----filter based on the file directory at the same time, the expression needs to start + // with `path`-------- + helper.execute("/json/sftp_to_access_for_json_path_filter.conf"); + + // -------filter based on file names, just simply write the regular file names-------- + helper.execute("/json/sftp_to_access_for_json_name_filter.conf"); + + // delete path + String filterPath = "/home/seatunnel/tmp/seatunnel/read/filter"; + deleteFileFromContainer(filterPath); + } + @TestTemplate public void testSftpFileReadAndWrite(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_name_filter.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_name_filter.conf new file mode 100644 index 00000000000..b9d05389412 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_name_filter.conf @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/seatunnel/read/filter" + file_format_type = "json" + plugin_output = "sftp" + file_filter_pattern=".*.json" + schema = { + fields { + c_string = string + } + } + } +} + +sink { + Assert { + rules = + { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 15 + }, + { + rule_type = MIN_ROW + rule_value = 15 + } + ], + field_rules = [{ + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN_LENGTH + rule_value = 5 + }, + { + rule_type = MAX_LENGTH + rule_value = 5 + } + ] + }] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_path_filter.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_path_filter.conf new file mode 100644 index 00000000000..373fe930bd6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_path_filter.conf @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/seatunnel/read/filter" + file_format_type = "json" + plugin_output = "sftp" + file_filter_pattern="tmp/seatunnel/read/filter/json202[^/]*/.*.json" + schema = { + fields { + c_string = string + } + } + } +} + +sink { + Assert { + rules = + { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 10 + }, + { + rule_type = MIN_ROW + rule_value = 10 + } + ], + field_rules = [{ + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN_LENGTH + rule_value = 5 + }, + { + rule_type = MAX_LENGTH + rule_value = 5 + } + ] + }] + } + } +} \ No newline at end of file