Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,48 @@ public boolean clearScroll(String scrollId) {
}
}

/**
* Close SQL cursor to release server-side resources.
*
* @param cursor The SQL cursor to close
* @return True if the cursor was successfully closed
*/
public boolean closeSqlCursor(String cursor) {
if (StringUtils.isEmpty(cursor)) {
log.debug("SQL cursor is empty; skip closing.");
return false;
}

String endpoint = "/_sql/close";
Request request = new Request("POST", endpoint);
Map<String, String> requestBody = new HashMap<>();
requestBody.put("cursor", cursor);
request.setJsonEntity(JsonUtils.toJsonString(requestBody));

try {
Response response = restClient.performRequest(request);
if (response == null) {
log.warn("POST {} response null for cursor: {}", endpoint, cursor);
return false;
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
JsonNode jsonNode = JsonUtils.parseObject(entity);
return jsonNode.get("succeeded").asBoolean();
} else {
log.warn(
"POST {} response status code={} for cursor: {}",
endpoint,
response.getStatusLine().getStatusCode(),
cursor);
return false;
}
} catch (Exception ex) {
log.warn("Failed to close SQL cursor: {}", cursor, ex);
return false;
}
}

private ScrollResult getDocsFromSqlResult(
String endpoint, String requestBody, JsonNode columnNodes) {
Request request = new Request("POST", endpoint);
Expand Down Expand Up @@ -1027,7 +1069,7 @@ private PointInTimeResult parsePointInTimeResponse(String responseJson, String p
String updatedPitId = rootNode.has("pit_id") ? rootNode.get("pit_id").asText() : pitId;

// Determine if there are more results
boolean hasMore = docs.size() > 0 && totalHits > 0 && docs.size() < totalHits;
boolean hasMore = !docs.isEmpty();

return new PointInTimeResult(updatedPitId, docs, totalHits, searchAfter, hasMore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,28 @@ private void scrollSearchResult(
// SQL client
if (SearchTypeEnum.SQL.equals(sourceIndexInfo.getSearchType())) {
log.info("Using SQL query for index: {}", sourceIndexInfo.getIndex());
ScrollResult scrollResult =
esRestClient.searchBySql(
sourceIndexInfo.getSqlQuery(), sourceIndexInfo.getScrollSize());

outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
while (StringUtils.isNotEmpty(scrollResult.getScrollId())) {
scrollResult =
esRestClient.searchWithSql(
scrollResult.getScrollId(), scrollResult.getColumnNodes());
String cursor = null;
try {
ScrollResult scrollResult =
esRestClient.searchBySql(
sourceIndexInfo.getSqlQuery(), sourceIndexInfo.getScrollSize());

outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
cursor = scrollResult.getScrollId();
while (StringUtils.isNotEmpty(cursor)) {
scrollResult =
esRestClient.searchWithSql(cursor, scrollResult.getColumnNodes());
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
cursor = scrollResult.getScrollId();
}
} finally {
if (StringUtils.isNotEmpty(cursor)) {
try {
esRestClient.closeSqlCursor(cursor);
} catch (Exception e) {
log.warn("Failed to close SQL cursor: {}", cursor, e);
}
}
}
} else {
// Check if we should use PIT API
Expand Down