-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Improve][connector-elasticsearchv2]Optimize Elasticsearch source che… #10446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -307,6 +307,48 @@ public boolean clearScroll(String scrollId) { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Close SQL cursor to release server-side resources. | ||
| * | ||
| * @param cursor The SQL cursor to close | ||
| * @return True if the cursor was successfully closed | ||
| */ | ||
| public boolean closeSqlCursor(String cursor) { | ||
| if (StringUtils.isEmpty(cursor)) { | ||
| log.warn("Attempted to close SQL cursor with empty cursor"); | ||
| return false; | ||
| } | ||
|
|
||
| String endpoint = "/_sql/close"; | ||
| Request request = new Request("POST", endpoint); | ||
| Map<String, String> requestBody = new HashMap<>(); | ||
| requestBody.put("cursor", cursor); | ||
| request.setJsonEntity(JsonUtils.toJsonString(requestBody)); | ||
|
|
||
| try { | ||
| Response response = restClient.performRequest(request); | ||
| if (response == null) { | ||
| log.warn("POST {} response null for cursor: {}", endpoint, cursor); | ||
| return false; | ||
| } | ||
| if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { | ||
| String entity = EntityUtils.toString(response.getEntity()); | ||
| JsonNode jsonNode = JsonUtils.parseObject(entity); | ||
| return jsonNode.get("succeeded").asBoolean(); | ||
| } else { | ||
| log.warn( | ||
| "POST {} response status code={} for cursor: {}", | ||
| endpoint, | ||
| response.getStatusLine().getStatusCode(), | ||
| cursor); | ||
| return false; | ||
| } | ||
| } catch (Exception ex) { | ||
| log.warn("Failed to close SQL cursor: " + cursor, ex); | ||
|
||
| return false; | ||
| } | ||
| } | ||
|
|
||
| private ScrollResult getDocsFromSqlResult( | ||
| String endpoint, String requestBody, JsonNode columnNodes) { | ||
| Request request = new Request("POST", endpoint); | ||
|
|
@@ -1027,7 +1069,7 @@ private PointInTimeResult parsePointInTimeResponse(String responseJson, String p | |
| String updatedPitId = rootNode.has("pit_id") ? rootNode.get("pit_id").asText() : pitId; | ||
|
|
||
| // Determine if there are more results | ||
| boolean hasMore = docs.size() > 0 && totalHits > 0 && docs.size() < totalHits; | ||
| boolean hasMore = !docs.isEmpty(); | ||
|
|
||
| return new PointInTimeResult(updatedPitId, docs, totalHits, searchAfter, hasMore); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,16 +102,28 @@ private void scrollSearchResult( | |
| // SQL client | ||
| if (SearchTypeEnum.SQL.equals(sourceIndexInfo.getSearchType())) { | ||
| log.info("Using SQL query for index: {}", sourceIndexInfo.getIndex()); | ||
| ScrollResult scrollResult = | ||
| esRestClient.searchBySql( | ||
| sourceIndexInfo.getSqlQuery(), sourceIndexInfo.getScrollSize()); | ||
|
|
||
| outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer); | ||
| while (StringUtils.isNotEmpty(scrollResult.getScrollId())) { | ||
| scrollResult = | ||
| esRestClient.searchWithSql( | ||
| scrollResult.getScrollId(), scrollResult.getColumnNodes()); | ||
| String cursor = null; | ||
| try { | ||
| ScrollResult scrollResult = | ||
| esRestClient.searchBySql( | ||
| sourceIndexInfo.getSqlQuery(), sourceIndexInfo.getScrollSize()); | ||
|
|
||
| outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer); | ||
| cursor = scrollResult.getScrollId(); | ||
| while (StringUtils.isNotEmpty(cursor)) { | ||
| scrollResult = | ||
| esRestClient.searchWithSql(cursor, scrollResult.getColumnNodes()); | ||
| outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer); | ||
| cursor = scrollResult.getScrollId(); | ||
| } | ||
| } finally { | ||
| if (StringUtils.isNotEmpty(cursor)) { | ||
| try { | ||
| esRestClient.closeSqlCursor(cursor); | ||
| } catch (Exception e) { | ||
| log.warn("Failed to close SQL cursor: " + cursor, e); | ||
|
||
| } | ||
| } | ||
| } | ||
| } else { | ||
| // Check if we should use PIT API | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution. I don't quite understand the log printing here. You can say that an empty cursor doesn't need to be closed, What do you think?