Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions engine/src/main/java/com/arcadedb/index/vector/LSMVectorIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,31 @@ private void buildGraphFromScratch(final GraphBuildCallback graphCallback) {
* @param graphCallback Optional callback for graph build progress
*/
private void buildGraphFromScratchWithRetry(final GraphBuildCallback graphCallback) {
// Always have a progress reporter: if caller didn't provide one, log throttled progress every ~2s
final GraphBuildCallback effectiveGraphCallback;
if (graphCallback != null) {
effectiveGraphCallback = graphCallback;
} else {
final long[] lastLogTimeMs = {System.currentTimeMillis()};
final int[] lastLoggedProcessed = {-1};
effectiveGraphCallback = (phase, processedNodes, totalNodes, vectorAccesses) -> {
if (totalNodes <= 0)
return;

final long now = System.currentTimeMillis();
final boolean progressed = processedNodes != lastLoggedProcessed[0];
final boolean timeElapsed = now - lastLogTimeMs[0] >= 2000;
final boolean reachedEnd = processedNodes >= totalNodes && lastLoggedProcessed[0] != totalNodes;
final boolean shouldLog = progressed && (timeElapsed || reachedEnd);

if (shouldLog) {
LogManager.instance().log(this, Level.INFO,
"Graph build %s: %d/%d (vector accesses=%d)", phase, processedNodes, totalNodes, vectorAccesses);
lastLogTimeMs[0] = now;
lastLoggedProcessed[0] = processedNodes;
}
};
}
Comment on lines +791 to +811
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The use of long[] and int[] as mutable containers for lastLogTimeMs and lastLoggedProcessed is not thread-safe. The effectiveGraphCallback is invoked from both the main build thread (during validation) and a separate progress monitor thread (during building). This can lead to race conditions and incorrect throttling behavior. To ensure thread safety, you should use java.util.concurrent.atomic.AtomicLong and java.util.concurrent.atomic.AtomicInteger instead.

Additionally, the throttle duration 2000 is a magic number. It would be better to extract it into a named constant for improved readability and maintainability, for example private static final int BUILD_PROGRESS_LOG_THROTTLE_MS = 2000;.

    } else {
      final java.util.concurrent.atomic.AtomicLong lastLogTimeMs = new java.util.concurrent.atomic.AtomicLong(System.currentTimeMillis());
      final java.util.concurrent.atomic.AtomicInteger lastLoggedProcessed = new java.util.concurrent.atomic.AtomicInteger(-1);
      effectiveGraphCallback = (phase, processedNodes, totalNodes, vectorAccesses) -> {
        if (totalNodes <= 0)
          return;

        final long now = System.currentTimeMillis();
        final boolean progressed = processedNodes != lastLoggedProcessed.get();
        final boolean timeElapsed = now - lastLogTimeMs.get() >= 2000;
        final boolean reachedEnd = processedNodes >= totalNodes && lastLoggedProcessed.get() != totalNodes;
        final boolean shouldLog = progressed && (timeElapsed || reachedEnd);

        if (shouldLog) {
          LogManager.instance().log(this, Level.INFO,
              "Graph build %s: %d/%d (vector accesses=%d)", phase, processedNodes, totalNodes, vectorAccesses);
          lastLogTimeMs.set(now);
          lastLoggedProcessed.set(processedNodes);
        }
      };
    }

// CRITICAL FIX: Collect vectors DIRECTLY from pages instead of from vectorIndex.
// This avoids race conditions where concurrent replication adds entries to vectorIndex
// that don't yet exist on disk pages. We iterate pages and read what's actually persisted.
Expand Down Expand Up @@ -966,14 +991,14 @@ private void buildGraphFromScratchWithRetry(final GraphBuildCallback graphCallba

// Report validation progress
validatedCount++;
if (graphCallback != null && validatedCount % VALIDATION_PROGRESS_INTERVAL == 0) {
graphCallback.onGraphBuildProgress("validating", validatedCount, totalVectorsToValidate, 0);
if (effectiveGraphCallback != null && validatedCount % VALIDATION_PROGRESS_INTERVAL == 0) {
effectiveGraphCallback.onGraphBuildProgress("validating", validatedCount, totalVectorsToValidate, 0);
}
}

// Final validation progress report
if (graphCallback != null && validatedCount > 0) {
graphCallback.onGraphBuildProgress("validating", validatedCount, totalVectorsToValidate, 0);
if (effectiveGraphCallback != null && validatedCount > 0) {
effectiveGraphCallback.onGraphBuildProgress("validating", validatedCount, totalVectorsToValidate, 0);
}

if (skippedDeletedDocs > 0) {
Expand Down Expand Up @@ -1034,7 +1059,7 @@ private void buildGraphFromScratchWithRetry(final GraphBuildCallback graphCallba
// Start progress monitoring thread if callback provided
final Thread progressMonitor;
final AtomicBoolean buildComplete = new AtomicBoolean(false);
if (graphCallback != null) {
if (effectiveGraphCallback != null) {
final int totalNodes = vectors.size();
progressMonitor = new Thread(() -> {
try {
Expand All @@ -1044,7 +1069,7 @@ private void buildGraphFromScratchWithRetry(final GraphBuildCallback graphCallba
final int insertsInProgress = builder.insertsInProgress();

// Report progress
graphCallback.onGraphBuildProgress("building", nodesAdded, totalNodes, nodesAdded + insertsInProgress);
effectiveGraphCallback.onGraphBuildProgress("building", nodesAdded, totalNodes, nodesAdded + insertsInProgress);

// Sleep briefly before next poll
Thread.sleep(100); // Poll every 100ms
Expand Down Expand Up @@ -1102,8 +1127,8 @@ private void buildGraphFromScratchWithRetry(final GraphBuildCallback graphCallba
LogManager.instance().log(this, Level.FINE, "Writing vector graph to disk for index: %s (nodes=%d)", indexName, totalNodes);

// Report persistence phase start
if (graphCallback != null)
graphCallback.onGraphBuildProgress("persisting", 0, totalNodes, 0);
if (effectiveGraphCallback != null)
effectiveGraphCallback.onGraphBuildProgress("persisting", 0, totalNodes, 0);

// Start a dedicated transaction for graph persistence with chunked commits
long chunkSizeMB = getTxChunkSize();
Expand Down Expand Up @@ -1132,8 +1157,8 @@ private void buildGraphFromScratchWithRetry(final GraphBuildCallback graphCallba
graphFile.writeGraph(graphIndex, vectors, chunkSizeMB, chunkCallback);

// Report persistence completion
if (graphCallback != null) {
graphCallback.onGraphBuildProgress("persisting", totalNodes, totalNodes, 0);
if (effectiveGraphCallback != null) {
effectiveGraphCallback.onGraphBuildProgress("persisting", totalNodes, totalNodes, 0);
}

// Commit the transaction to persist graph pages
Expand Down
Loading