[Feature][Connector-V2] Enable file split for S3File source#10450
[Feature][Connector-V2] Enable file split for S3File source#10450yzeng1618 wants to merge 4 commits intoapache:devfrom
Conversation
Issue 1: file_split_size lacks input validationLocation: Related Context:
Problem Description:
Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions: // S3FileSourceFactory.java
public static final ConfigOption<Long> FILE_SPLIT_SIZE = ConfigOption
.key("file_split_size")
.longType()
.defaultValue(64 * 1024 * 1024L)
.withDescription(
"The file split size (in bytes) when file split is enabled. "
+ "Must be positive. Recommended values are between 1MB and 1GB. "
+ "Note: actual split size may be larger due to row delimiter alignment.");
// Add validation in the prepare method
@Override
public void prepare(PrepareConfig config) throws Exception {
// ... existing code ...
Long fileSplitSize = options.get(FILE_SPLIT_SIZE);
if (fileSplitSize != null && fileSplitSize <= 0) {
throw new IllegalArgumentException(
"file_split_size must be positive, but got: " + fileSplitSize);
}
// Optional: Add warning
if (fileSplitSize != null && fileSplitSize < 1024 * 1024) {
LOG.warn("file_split_size is less than 1MB, which may cause too many splits. "
+ "Recommended value: at least 1MB.");
}
}Rationale: Adding configuration validation can prevent runtime errors caused by invalid configurations, detecting issues early and providing friendly error messages. Issue 2: TextReadStrategy line separator hardcodingLocation: Related Context:
Problem Description: Code Snippet: protected long adjustSplitEndToNextDelimiter(
FileSourceSplit sourceSplit, long splitEnd, byte[] delimiter) {
// ...
int nextNewlinePos = findNextNewlinePosition(content, 0, content.length);
// ...
}
private int findNextNewlinePosition(byte[] content, int start, int end) {
for (int i = start; i < end; i++) {
if (content[i] == '\n') { // Hardcoded \n
return i;
}
}
return -1;
}Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: private int findNextNewlinePosition(byte[] content, int start, int end) {
for (int i = start; i < end; i++) {
if (content[i] == '\n') {
// Unix style: found \n, return position after \n
return i + 1;
}
if (content[i] == '\r' && i + 1 < end && content[i + 1] == '\n') {
// Windows style: found \r\n, return position after \n
return i + 2;
}
if (content[i] == '\r') {
// Old Mac style: found lone \r, return position after \r
return i + 1;
}
}
return -1;
}Rationale: Support mainstream line break formats (Unix Issue 3: CSV format split fallback logic not explicit enoughLocation: Related Context:
Problem Description: Code Snippet: public void prepareRead(...) {
if (start >= end) {
// fallback to non-splitting read
skipHeader(in);
}
// ...
adjustSplitEndToNextDelimiter(sourceSplit, end, rowDelimiter);
}Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: public void prepareRead(...) {
// Explicit check: if split has no valid range, fallback to non-splitting read
if (start >= end) {
LOG.debug("Split {} has no valid range (start={}, end={}), "
+ "falling back to non-splitting read with header skip",
sourceSplit.splitId(), start, end);
skipHeader(in);
}
// ... rest of the code ...
}Rationale: Add comments and logs to explicitly state fallback behavior, improving code readability and maintainability. Issue 4: Parquet split error handling improvements not uniformly applied to other formatsLocation: Related Context:
Problem Description: Code Comparison: Parquet (Improved): throw new IOException(
String.format("Failed to get split for file: %s", filePath), e);Text/CSV (Not Improved): // No explicit error handling, relies on framework default exception propagationPotential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // TextReadStrategy.java
protected long adjustSplitEndToNextDelimiter(
FileSourceSplit sourceSplit, long splitEnd, byte[] delimiter) {
try {
// ... existing logic ...
} catch (Exception e) {
throw new IOException(
String.format("Failed to adjust split end for file: %s, splitId: %s, splitEnd: %d",
sourceSplit.path(), sourceSplit.splitId(), splitEnd),
e);
}
}Rationale: Unify error handling style to improve error diagnostic capabilities for all formats. Issue 5: Missing unit tests for Parquet split functionalityLocation: Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // ReadStrategySplitFallbackTest.java
@Test
public void testParquetSplitWithValidRange() throws Exception {
// Create a test Parquet file with multiple RowGroups
Path testParquetFile = createTestParquetFile(3); // 3 RowGroups
ParquetReadStrategy readStrategy = new ParquetReadStrategy();
FileSourceSplit split = new FileSourceSplit(0, testParquetFile, 0, 1024, null);
// Prepare read should succeed
readStrategy.prepareRead(/* params */);
// Verify that split is handled correctly
// ...
}
@Test
public void testParquetSplitFailureMessage() throws Exception {
// Test the enhanced error message
Path invalidParquetFile = createInvalidParquetFile();
ParquetFileSplitStrategy splitStrategy = new ParquetFileSplitStrategy();
IOException exception = assertThrows(IOException.class, () -> {
splitStrategy.getSplits(invalidParquetFile, /* params */);
});
assertTrue(exception.getMessage().contains("Failed to get split for file:"));
assertTrue(exception.getMessage().contains(invalidParquetFile.toString()));
}Rationale: Improve test coverage to ensure correctness of Parquet split and effectiveness of error handling. Issue 6: Missing split support for JSON formatLocation: Documentation mentions JSON format split support, but no explicit verification in code Related Context:
Problem Description:
Potential Risks:
Impact Scope:
Severity: MAJOR (if JSON actually not supported) / MINOR (if JSON already automatically supported) Improvement Suggestions:
Rationale: Ensure documentation and implementation are consistent, avoiding user misunderstanding. Issue 7: E2E tests do not cover all scenariosLocation: Related Context:
Problem Description:
Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // S3FileWithFilterIT.java
@Test
public void testS3FileParquetEnableSplit() throws Exception {
// Test Parquet file with multiple RowGroups
// Verify split behavior and data correctness
}
@Test
public void testS3FileJsonEnableSplit() throws Exception {
// Test JSON Lines file
// Verify split behavior and data correctness
}
@Test
public void testS3FileTextNoHeaderEnableSplit() throws Exception {
// Test text file without header
// Verify no header is skipped
}
@Test
public void testS3FileSplitBoundaryCase() throws Exception {
// Test file size exactly equals file_split_size
// Verify no data duplication or loss at boundaries
}Rationale: Improve test coverage to ensure all declared supported formats and scenarios are verified by E2E. Issue 8: Documentation lacks detailed explanation of split limitationsLocation: Problem Description:
Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: Add detailed explanation in documentation: ### File Split Limitations
- **Supported formats**:
- Text (plain text files)
- CSV (including with/without header)
- JSON Lines (one JSON object per line)
- Parquet (split by RowGroup)
- **Unsupported formats**:
- Compressed text files (e.g., .gz, .bz2, .zip, .lz4) - split will be automatically disabled
- Excel (.xlsx)
- XML
- Single-line JSON files (not JSON Lines)
- **Behavior with unsupported formats**:
If you enable `enable_file_split` for an unsupported format, the system will
automatically fall back to non-splitting mode. A warning log will be emitted.
- **Data ordering**:
When file split is enabled, data may be read out of order across splits.
If strict ordering is required, do not enable file split or use a single-split strategy.
- **Parquet compression**:
Parquet files with internal compression (e.g., Snappy, Gzip) are fully supported,
because Parquet split is based on RowGroup boundaries, not byte ranges.Rationale: Provide clear and complete limitation descriptions to help users correctly understand and configure. Issue 9: Missing Metrics for split performance monitoringLocation: The entire PR adds no Metrics-related code Related Context:
Problem Description:
Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // FileSourceReader.java (pseudocode)
private Counter splitCounter;
private Histogram splitSizeHistogram;
private Timer splitReadTimer;
public void open() {
splitCounter = context.metricRegistry().counter("file.split.count");
splitSizeHistogram = context.metricRegistry().histogram("file.split.size");
splitReadTimer = context.metricRegistry().timer("file.split.read.time");
}
public void readNext() {
Timer.Context timeContext = splitReadTimer.time();
try {
// ... reading logic ...
splitCounter.inc();
splitSizeHistogram.update(splitSize);
} finally {
timeContext.stop();
}
}Rationale: Provide observability to help users monitor and tune split functionality. |
…tStrategyFactoryTest and doc
|
The issues described above have been supplemented and fixed. |
|
@chl-wxpDid you implement data files in another format? If so, take a look at this |
#10129
Purpose of this pull request
Implements logical file split for
S3Filesource to improve read parallelism when ingesting large files from S3/MinIO.enable_file_splitandfile_split_sizeoptions toS3Filesource option rule.text/csv/json(align split end to nextrow_delimiter) andparquet(split by RowGroup; never breaks a RowGroup).enable_file_split=truebut the split has no range (fallback to non-splitting read).docs/enanddocs/zh, and add/extend tests (unit + e2e).Does this PR introduce any user-facing change?
Yes.
S3File:enable_file_split(boolean) andfile_split_size(long).How was this patch tested?
ReadStrategySplitFallbackTest(text/csv fallback behavior when split is enabled but no range provided)S3FileFactoryTest(option rule contains split-related options)S3FileWithFilterIT#testS3FileTextEnableSplitToAssertwith MinIO + Assert sinkCheck list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.