[Improve][connector-elasticsearchv2]Optimize Elasticsearch source che…#10446
[Improve][connector-elasticsearchv2]Optimize Elasticsearch source che…#10446CosmosNi wants to merge 2 commits intoapache:devfrom
Conversation
…ckpointing and SQL cursor cleanup
Issue 1: PIT hasMore logic change may cause infinite loop or additional queriesLocation: // Before modification
boolean hasMore = docs.size() > 0 && totalHits > 0 && docs.size() < totalHits;
// After modification
boolean hasMore = !docs.isEmpty();Related Context:
Problem Description: Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions: // Restore the original logic, or if simplification is indeed necessary, explain the reason in the PR description
boolean hasMore = !docs.isEmpty() && docs.size() < totalHits;
// If totalHits is unreliable, other judgment methods can be used, but thorough testing is requiredRationale:
Issue 2: PR description does not match actual changesLocation: PR description Related Context:
Problem Description: Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions:
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
ElasticsearchSourceSplit split;
synchronized (output.getCheckpointLock()) {
split = splits.poll(); // Only perform state reads inside the lock
}
if (split != null) {
// Execute I/O outside the lock
SeaTunnelRowType seaTunnelRowType = split.getSeaTunnelRowType();
ElasticsearchConfig sourceIndexInfo = split.getElasticsearchConfig();
scrollSearchResult(seaTunnelRowType, sourceIndexInfo, output);
} else if (noMoreSplit) {
synchronized (output.getCheckpointLock()) {
log.info("Closed the bounded ELasticsearch source");
context.signalNoMoreElement();
}
} else {
Thread.sleep(pollNextWaitTime); // Sleep outside the lock
}
}However, this change requires more careful concurrent safety review, because Rationale:
Issue 3: Timing of SQL cursor closure may cause resources not to be releasedLocation: } finally {
if (StringUtils.isNotEmpty(cursor)) {
try {
esRestClient.closeSqlCursor(cursor);
} catch (Exception e) {
log.warn("Failed to close SQL cursor: " + cursor, e);
}
}
}Related Context:
Problem Description: However, there is an edge case: if Potential Risks:
Scope of Impact:
Severity: MINOR (not a problem, but needs verification) Improvement Suggestions:
Rationale:
Issue 4: Missing unit tests and integration testsLocation: Entire PR Problem Description: Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions: @Test
public void testSqlCursorClosedOnNormalCompletion() {
// Test that cursor is closed when test completes normally
}
@Test
public void testSqlCursorClosedOnException() {
// Test that cursor is closed when test throws exception
}
@Test
public void testPitHasMoreLogic() {
// Test PIT hasMore return values in various scenarios
// - Empty results
// - Intermediate batch
// - Last batch
// - Total count is 0
}Rationale:
|
| */ | ||
| public boolean closeSqlCursor(String cursor) { | ||
| if (StringUtils.isEmpty(cursor)) { | ||
| log.warn("Attempted to close SQL cursor with empty cursor"); |
There was a problem hiding this comment.
Thank you for your contribution. I don't quite understand the log printing here. You can say that an empty cursor doesn't need to be closed, What do you think?
| return false; | ||
| } | ||
| } catch (Exception ex) { | ||
| log.warn("Failed to close SQL cursor: " + cursor, ex); |
There was a problem hiding this comment.
When printing logs, it is best to standardize the format and use placeholders {}
| try { | ||
| esRestClient.closeSqlCursor(cursor); | ||
| } catch (Exception e) { | ||
| log.warn("Failed to close SQL cursor: " + cursor, e); |
…ckpointing and SQL cursor cleanup
|
Please add some tests |
This change narrows the checkpoint lock scope in the Elasticsearch source reader so ES I/O, parsing, and sleeps happen outside the lock, reducing checkpoint delay while keeping collect atomic. It also adds a best‑effort SQL cursor close to release ES server‑side resources when SQL pagination is interrupted or completes.
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.