Skip to content

[Improve][connector-elasticsearch-v2] Add slicing support and e2e coverage for Elasticsearch source#10454

Open
CosmosNi wants to merge 7 commits intoapache:devfrom
CosmosNi:feature_es_slice
Open

[Improve][connector-elasticsearch-v2] Add slicing support and e2e coverage for Elasticsearch source#10454
CosmosNi wants to merge 7 commits intoapache:devfrom
CosmosNi:feature_es_slice

Conversation

@CosmosNi
Copy link
Contributor

@CosmosNi CosmosNi commented Feb 5, 2026

This change adds configurable slicing to the Elasticsearch source (slice_max), enabling parallel reads for Scroll and PIT while keeping SQL mode unchanged. It propagates slice metadata through splits, injects slice parameters into Scroll/PIT requests, and logs active slice info at runtime. E2E coverage is extended with PIT/Scroll slicing scenarios, and docs are updated to describe the new option and examples.

Purpose of this pull request

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

# Conflicts:
#	docs/en/connectors/source/Elasticsearch.md
#	docs/zh/connectors/source/Elasticsearch.md
#	seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
#	seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
#	seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
#	seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
#	seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@DanielCarter-stack
Copy link

Issue 1: Shared PIT Resource Leak After Checkpoint Recovery

Location: ElasticsearchSourceSplitEnumerator.java:61, 70-84

Related Context:

  • State serialization: ElasticsearchSourceState.java:29-33
  • State deserialization: ElasticsearchSourceSplitEnumerator.java:70-84 (constructor)
  • PIT cleanup: ElasticsearchSourceSplitEnumerator.java:192-200 (close method)

Problem Description:
sharedPitIds Map is a newly introduced instance variable used to track shared PIT IDs. However, during checkpoint recovery, the constructor only restores pendingSplit and does not restore sharedPitIds. This leads to:

  1. PIT ID tracking lost: PIT IDs created before recovery are not in sharedPitIds
  2. Resource leak: close() method cannot delete these PITs
  3. Resource accumulation: In scenarios with frequent checkpoints, a large number of PITs will not be cleaned up

Potential Risks:

  • During frequent checkpoints in production environments, a large number of uncleaned PITs will accumulate on the Elasticsearch cluster
  • Occupies ES cluster memory resources
  • May trigger ES PIT quantity limits (default limits are relatively loose, but still need to be considered)

Impact Scope:

  • Direct Impact: All jobs using PIT + slicing with checkpoint enabled
  • Indirect Impact: Memory and resource management of the Elasticsearch cluster
  • Affected Scope: Single Connector (Elasticsearch Source)

Severity: MAJOR

Improvement Suggestion:

// Option 1: Extract PIT ID from restored split
public ElasticsearchSourceSplitEnumerator(
        SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context,
        ElasticsearchSourceState sourceState,
        ReadonlyConfig connConfig,
        List<ElasticsearchConfig> elasticsearchConfigs) {
    this.context = context;
    this.connConfig = connConfig;
    this.pendingSplit = new HashMap<>();
    this.sharedPitIds = new HashMap<>();
    this.shouldEnumerate = sourceState == null;
    if (sourceState != null) {
        this.shouldEnumerate = sourceState.isShouldEnumerate();
        this.pendingSplit.putAll(sourceState.getPendingSplit());
        
        // Restore sharedPitIds
        for (List<ElasticsearchSourceSplit> splits : sourceState.getPendingSplit().values()) {
            for (ElasticsearchSourceSplit split : splits) {
                String pitId = split.getElasticsearchConfig().getPitId();
                if (StringUtils.isNotEmpty(pitId)) {
                    String indexName = split.getElasticsearchConfig().getIndex();
                    sharedPitIds.putIfAbsent(indexName, pitId);
                }
            }
        }
    }
    this.elasticsearchConfigs = elasticsearchConfigs;
}

// Option 2: Explicitly save sharedPitIds in ElasticsearchSourceState
// Need to modify ElasticsearchSourceState class

Rationale: Ensure that shared PIT resources can be properly tracked and cleaned up after checkpoint recovery.


Issue 2: ElasticsearchConfig Missing serialVersionUID

Location: ElasticsearchConfig.java:33

Related Context:

  • Implements Serializable interface
  • Serialized in ElasticsearchSourceSplit
  • Other serializable classes have serialVersionUID

Problem Description:
ElasticsearchConfig implementsthe Serializable interface and will be serialized into checkpoints, but does not declare serialVersionUID. This causes the JVM to automatically generate UID, posing a risk of cross-version deserialization failure.

Potential Risks:

  • After upgrading SeaTunnel version, old checkpoints may not be recoverable
  • Different JVM implementations may generate different UIDs

Impact Scope:

  • Direct Impact: All jobs using Elasticsearch Source with checkpoint enabled
  • Indirect Impact: State recovery during version upgrades
  • Affected Scope: Single Connector

Severity: MAJOR

Improvement Suggestion:

@Getter
@Setter
public class ElasticsearchConfig implements Serializable {
    private static final long serialVersionUID = 1L;  // Add serialVersionUID
    
    private String index;
    // ... other fields
}

Rationale: Explicitly declaring serialVersionUID ensures serialization compatibility and avoids cross-version recovery failures.


Issue 3: Duplicate SQL Mode Validation Logic

Locations:

  • ElasticsearchSource.java:184-188
  • ElasticsearchSourceSplitEnumerator.java:156-159

Related Context:

  • Both places have the same if (SearchTypeEnum.SQL.equals(...) && sliceMax > 1) validation
  • Both log the same warning messages

Problem Description:
The same validation logic is duplicated in two places, increasing maintenance costs and causing duplicate warning logs.

Potential Risks:

  • Code redundancy, high maintenance cost
  • Future modifications may miss one location
  • Poor user experience (duplicate logs)

Impact Scope:

  • Direct Impact: Code maintainability
  • Indirect Impact: None
  • Affected Scope: Single Connector

Severity: MINOR

Improvement Suggestion:

// Option 1: Validate only once in ElasticsearchSource (recommended)
// Keep validation in ElasticsearchSource.java
// Remove validation in ElasticsearchSourceSplitEnumerator.java, directly use elasticsearchConfig.getSliceMax()

// Option 2: Extract as static utility method
public static int validateSliceMaxForSearchType(SearchTypeEnum searchType, int sliceMax) {
    if (SearchTypeEnum.SQL.equals(searchType) && sliceMax > 1) {
        log.warn("SQL search_type does not support slicing. slice_max will be ignored.");
        return 1;
    }
    return Math.max(1, sliceMax);
}

Rationale: Eliminate code redundancy and improve maintainability.


Issue 4: E2E Test Data Insufficient to Verify Data Correctness

Locations:

  • ElasticsearchIT.java:690 (generateTestDataSet1)
  • ElasticsearchIT.java:428-447 (test method)

Related Context:

  • Test generates 100 data records
  • slice_max = 2
  • Only validates set equality, not data uniqueness

Problem Description:
Current E2E tests have the following deficiencies:

  1. Data volume too small: 100 records are too few for 2 slices, insufficient to fully verify slice logic
  2. No uniqueness validation: If slice logic has bugs causing data duplication, current assertIterableEquals cannot detect it
  3. No data distribution validation: Does not verify whether the amount of data read by each slice is reasonable
  4. Missing checkpoint tests: Does not test checkpoint recovery scenarios

Potential Risks:

  • Slice implementation has bugs (e.g., data duplication, loss) but tests cannot detect them
  • Data quality issues in production environments

Impact Scope:

  • Direct Impact: Test coverage
  • Indirect Impact: Production data quality
  • Affected Scope: Single Connector

Severity: MAJOR

Improvement Suggestion:

@TestTemplate
public void testElasticsearchWithPITSlice(TestContainer container)
        throws IOException, InterruptedException {
    Container.ExecResult execResult =
            container.executeJob("/elasticsearch/elasticsearch_source_with_pit_slice.conf");
    Assertions.assertEquals(0, execResult.getExitCode());
    List<String> sinkData = readSinkDataWithSchema("st_index_pit_slice");
    
    // 1. Verify data uniqueness (newly added)
    Set<String> uniqueData = new HashSet<>(sinkData);
    Assertions.assertEquals(sinkData.size(), uniqueData.size(), 
        "Data should not have duplicates");
    
    // 2. Verify data integrity (existing)
    Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
    
    // 3. Verify data volume (newly added)
    int expectedCount = (int) mapTestDatasetForDSL().stream().count();
    Assertions.assertEquals(expectedCount, sinkData.size(),
        "Data count should match expected");
}

// Increase test data volume (in generateTestDataSet1)
for (int i = 0; i < 1000; i++) {  // Increase from 100 to 1000
    // ...
}

Rationale: Ensure the correctness of slice functionality and avoid data duplication or loss issues in production environments.


Issue 5: Missing Fallback Handling for PIT Creation Failure

Location: ElasticsearchSourceSplitEnumerator.java:167-172

Related Context:

sharedPitId = sharedPitIds.computeIfAbsent(
        indexName,
        key -> esRestClient.createPointInTime(
                key, elasticsearchConfig.getPitKeepAlive()));

Problem Description:
If createPointInTime() fails due to network issues, ES cluster anomalies, or other reasons, it will cause:

  1. Entire split enumeration failure
  2. Job crash
  3. Unable to fall back to non-sliced mode

Potential Risks:

  • Network fluctuations or temporary ES unavailability cause job failures
  • Reduced system availability

Impact Scope:

  • Direct Impact: Jobs using PIT + slicing
  • Indirect Impact: Job stability
  • Affected Scope: Single Connector

Severity: MINOR

Improvement Suggestion:

if (useSharedPit) {
    try {
        sharedPitId = sharedPitIds.computeIfAbsent(
                indexName,
                key -> {
                    try {
                        return esRestClient.createPointInTime(
                                key, elasticsearchConfig.getPitKeepAlive());
                    } catch (Exception e) {
                        log.warn("Failed to create shared PIT for index: {}, fallback to sliceMax=1. Error: {}", 
                                key, e.getMessage());
                        return null;
                    }
                });
        // If PIT creation fails, fall back to not using slices
        if (sharedPitId == null) {
            sliceMax = 1;
            useSharedPit = false;
        }
    } catch (Exception e) {
        log.warn("Exception during PIT creation for index: {}, fallback to sliceMax=1", 
                indexName, e);
        sliceMax = 1;
        useSharedPit = false;
    }
}

Rationale: Improve system fault tolerance and availability.


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants