diff --git a/engine/src/main/java/com/arcadedb/index/fulltext/LSMTreeFullTextIndex.java b/engine/src/main/java/com/arcadedb/index/fulltext/LSMTreeFullTextIndex.java index c631418b6e..6a9c8de4d4 100644 --- a/engine/src/main/java/com/arcadedb/index/fulltext/LSMTreeFullTextIndex.java +++ b/engine/src/main/java/com/arcadedb/index/fulltext/LSMTreeFullTextIndex.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -80,6 +81,10 @@ public class LSMTreeFullTextIndex implements Index, IndexInternal { private final FullTextIndexMetadata ftMetadata; private TypeIndex typeIndex; + /** + * Factory handler for creating LSMTreeFullTextIndex instances. + * Validates that the index is not unique and is defined on STRING properties. + */ public static class LSMTreeFullTextIndexFactoryHandler implements IndexFactoryHandler { @Override public IndexInternal create(final IndexBuilder builder) { @@ -159,6 +164,15 @@ public IndexCursor get(final Object[] keys) { return get(keys, -1); } + /** + * Searches the index for the given query text. + * The query text is parsed into terms, analyzed, and then matched against the index. + * Results are scored based on the number of matching terms (coordination factor). + * + * @param keys The query arguments. keys[0] is expected to be the query string. + * @param limit The maximum number of results to return. -1 for no limit. + * @return An IndexCursor containing the matching results, sorted by score descending. + */ @Override public IndexCursor get(final Object[] keys, final int limit) { final HashMap scoreMap = new HashMap<>(); @@ -184,6 +198,7 @@ public IndexCursor get(final Object[] keys, final int limit) { while (rids.hasNext()) { final RID rid = rids.next().getIdentity(); + // Accumulate score for this RID based on term frequency in the query final AtomicInteger score = scoreMap.get(rid); if (score == null) scoreMap.put(rid, new AtomicInteger(1)); @@ -227,6 +242,9 @@ private static class QueryTerm { * For example, "title:java programming" returns: * - QueryTerm(fieldName="title", value="java") * - QueryTerm(fieldName=null, value="programming") + * + * @param queryText The raw query string. + * @return A list of parsed QueryTerms. */ private List parseQueryTerms(final String queryText) { final List terms = new ArrayList<>(); @@ -266,6 +284,14 @@ private int getPropertyCount() { return props != null ? props.size() : 1; } + /** + * Indexes a document. + * Tokenizes the input values and updates the underlying LSM tree. + * Handles both single-property and multi-property indexes. + * + * @param keys The values of the indexed properties for the document. + * @param rids The RIDs associated with these keys (usually just one). + */ @Override public void put(final Object[] keys, final RID[] rids) { // If keys.length doesn't match propertyCount, this is a tokenized value from commit replay @@ -300,6 +326,12 @@ public void put(final Object[] keys, final RID[] rids) { } } + /** + * Removes a document from the index. + * Tokenizes the input values and removes the corresponding entries from the underlying LSM tree. + * + * @param keys The values of the indexed properties to remove. + */ @Override public void remove(final Object[] keys) { // If keys.length doesn't match propertyCount, this is a tokenized value from commit replay @@ -330,6 +362,12 @@ public void remove(final Object[] keys) { } } + /** + * Removes a specific RID associated with the given keys from the index. + * + * @param keys The values of the indexed properties. + * @param rid The specific RID to remove. + */ @Override public void remove(final Object[] keys, final Identifiable rid) { // If keys.length doesn't match propertyCount, this is a tokenized value from commit replay @@ -588,6 +626,14 @@ private static Analyzer createAnalyzer(final FullTextIndexMetadata metadata, fin } } + /** + * Analyzes the input text using the provided Lucene Analyzer. + * Tokenizes the text and returns a list of tokens (strings). + * + * @param analyzer The Lucene Analyzer to use. + * @param text The input text objects to analyze. + * @return A list of tokens extracted from the text. + */ public List analyzeText(final Analyzer analyzer, final Object[] text) { final List tokens = new ArrayList<>(); @@ -642,51 +688,41 @@ public List analyzeText(final Analyzer analyzer, final Object[] text) { * @throws IllegalArgumentException if sourceRids is null, empty, or exceeds maxSourceDocs */ public IndexCursor searchMoreLikeThis(final Set sourceRids, final MoreLikeThisConfig config) { - // Validate inputs - if (sourceRids == null) { + if (sourceRids == null) throw new IllegalArgumentException("sourceRids cannot be null"); - } - if (sourceRids.isEmpty()) { + if (sourceRids.isEmpty()) throw new IllegalArgumentException("sourceRids cannot be empty"); - } - if (sourceRids.size() > config.getMaxSourceDocs()) { + if (sourceRids.size() > config.getMaxSourceDocs()) throw new IllegalArgumentException( "Number of source documents (" + sourceRids.size() + ") exceeds maxSourceDocs (" + config.getMaxSourceDocs() + ")"); - } // Step 1 & 2: Extract terms from source documents and count term frequencies final Map termFreqs = new HashMap<>(); + final List propertyNames = getPropertyNames(); - for (final RID sourceRid : sourceRids) { - // Load the document - final Identifiable identifiable = sourceRid.getRecord(); - if (identifiable == null) { - continue; - } + if (propertyNames != null && !propertyNames.isEmpty()) { + for (final RID sourceRid : sourceRids) { + final Identifiable identifiable = sourceRid.getRecord(); + if (identifiable == null) + continue; - // Extract text from indexed properties - final List propertyNames = getPropertyNames(); - if (propertyNames != null && !propertyNames.isEmpty()) { final Document doc = (Document) identifiable; for (final String propName : propertyNames) { final Object value = doc.get(propName); - if (value != null) { - // Analyze the text to get tokens - final List tokens = analyzeText(indexAnalyzer, new Object[] { value }); - for (final String token : tokens) { - if (token != null) { - termFreqs.merge(token, 1, Integer::sum); - } - } + if (value == null) + continue; + + final List tokens = analyzeText(indexAnalyzer, new Object[] { value }); + for (final String token : tokens) { + if (token != null) + termFreqs.merge(token, 1, Integer::sum); } } } } - // If no terms extracted, return empty cursor - if (termFreqs.isEmpty()) { + if (termFreqs.isEmpty()) return new TempIndexCursor(Collections.emptyList()); - } // Step 3: Get document frequencies for each term final Map docFreqs = new HashMap<>(); @@ -707,48 +743,32 @@ public IndexCursor searchMoreLikeThis(final Set sourceRids, final MoreLikeT final MoreLikeThisQueryBuilder queryBuilder = new MoreLikeThisQueryBuilder(config); final List topTerms = queryBuilder.selectTopTerms(termFreqs, docFreqs, totalDocs); - // If no terms selected, return empty cursor - if (topTerms.isEmpty()) { + if (topTerms.isEmpty()) return new TempIndexCursor(Collections.emptyList()); - } // Step 5: Execute OR query and accumulate scores - final Map scoreMap = new HashMap<>(); + final Map scoreMap = new HashMap<>(); for (final String term : topTerms) { final IndexCursor termCursor = underlyingIndex.get(new String[] { term }); while (termCursor.hasNext()) { final RID rid = termCursor.next().getIdentity(); - final AtomicInteger score = scoreMap.get(rid); - if (score == null) { - scoreMap.put(rid, new AtomicInteger(1)); - } else { - score.incrementAndGet(); - } + scoreMap.merge(rid, 1, Integer::sum); } } // Step 6: Exclude source documents if configured if (config.isExcludeSource()) { - for (final RID sourceRid : sourceRids) { + for (final RID sourceRid : sourceRids) scoreMap.remove(sourceRid); - } } // Step 7: Build result list sorted by score descending final List results = new ArrayList<>(scoreMap.size()); - for (final Map.Entry entry : scoreMap.entrySet()) { - results.add(new IndexCursorEntry(null, entry.getKey(), entry.getValue().get())); - } + for (final Map.Entry entry : scoreMap.entrySet()) + results.add(new IndexCursorEntry(null, entry.getKey(), entry.getValue())); - // Sort by score descending - if (results.size() > 1) { - results.sort((o1, o2) -> { - if (o1.score == o2.score) { - return 0; - } - return o1.score < o2.score ? 1 : -1; // Descending order - }); - } + if (results.size() > 1) + results.sort(Comparator.comparingInt((IndexCursorEntry e) -> e.score).reversed()); return new TempIndexCursor(results); } diff --git a/engine/src/main/java/com/arcadedb/schema/ManualIndexBuilder.java b/engine/src/main/java/com/arcadedb/schema/ManualIndexBuilder.java index d6170e6917..9ebaccba44 100644 --- a/engine/src/main/java/com/arcadedb/schema/ManualIndexBuilder.java +++ b/engine/src/main/java/com/arcadedb/schema/ManualIndexBuilder.java @@ -44,8 +44,10 @@ protected ManualIndexBuilder(final DatabaseInternal database, final String index public Index create() { database.checkPermissionsOnDatabase(SecurityDatabaseUser.DATABASE_ACCESS.UPDATE_SCHEMA); - if (database.isAsyncProcessing()) - throw new NeedRetryException("Cannot create a new index while asynchronous tasks are running"); + // Wait for any running async tasks (e.g., compaction) to complete before creating new index + // This prevents NeedRetryException when creating multiple indexes sequentially on large datasets + while (database.isAsyncProcessing()) + database.async().waitCompletion(); final LocalSchema schema = database.getSchema().getEmbedded(); diff --git a/engine/src/main/java/com/arcadedb/schema/TypeIndexBuilder.java b/engine/src/main/java/com/arcadedb/schema/TypeIndexBuilder.java index f38c2ef8d8..b1b86290a8 100644 --- a/engine/src/main/java/com/arcadedb/schema/TypeIndexBuilder.java +++ b/engine/src/main/java/com/arcadedb/schema/TypeIndexBuilder.java @@ -83,8 +83,10 @@ public TypeFullTextIndexBuilder withFullTextType() { public TypeIndex create() { database.checkPermissionsOnDatabase(SecurityDatabaseUser.DATABASE_ACCESS.UPDATE_SCHEMA); - if (database.isAsyncProcessing()) - throw new NeedRetryException("Cannot create a new index while asynchronous tasks are running"); + // Wait for any running async tasks (e.g., compaction) to complete before creating new index + // This prevents NeedRetryException when creating multiple indexes sequentially on large datasets + while (database.isAsyncProcessing()) + database.async().waitCompletion(); final LocalSchema schema = database.getSchema().getEmbedded(); diff --git a/engine/src/test/java/com/arcadedb/index/Issue2702SequentialIndexCreationTest.java b/engine/src/test/java/com/arcadedb/index/Issue2702SequentialIndexCreationTest.java new file mode 100644 index 0000000000..f665e2e029 --- /dev/null +++ b/engine/src/test/java/com/arcadedb/index/Issue2702SequentialIndexCreationTest.java @@ -0,0 +1,261 @@ +/* + * Copyright © 2021-present Arcade Data Ltd (info@arcadedata.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-FileCopyrightText: 2021-present Arcade Data Ltd (info@arcadedata.com) + * SPDX-License-Identifier: Apache-2.0 + */ +package com.arcadedb.index; + +import com.arcadedb.GlobalConfiguration; +import com.arcadedb.TestHelper; +import com.arcadedb.database.DatabaseInternal; +import com.arcadedb.database.MutableDocument; +import com.arcadedb.engine.WALFile; +import com.arcadedb.schema.DocumentType; +import com.arcadedb.schema.Schema; +import org.junit.jupiter.api.Test; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests sequential index creation on large datasets to ensure it doesn't fail with NeedRetryException + * when background LSMTree compaction is running. + * + * Before the fix, creating multiple indexes sequentially would fail with: + * "NeedRetryException: Cannot create a new index while asynchronous tasks are running" + * + * After the fix, CREATE INDEX waits for any running async tasks (compaction) to complete, + * allowing sequential index creation to succeed. + * + * Issue: https://github.com/ArcadeData/arcadedb/issues/2702 + * + * @author ArcadeDB Team + */ +class Issue2702SequentialIndexCreationTest extends TestHelper { + private static final int TOT = 100_000; // Large enough to trigger compaction + private static final int INDEX_PAGE_SIZE = 64 * 1024; + private static final int COMPACTION_RAM_MB = 1; // Low RAM to force compaction + private static final int PARALLEL = 4; + private static final String TYPE_NAME = "Rating"; + + @Test + void testSequentialIndexCreation() throws Exception { + final int originalCompactionRamMB = GlobalConfiguration.INDEX_COMPACTION_RAM_MB.getValueAsInteger(); + final int originalMinPages = GlobalConfiguration.INDEX_COMPACTION_MIN_PAGES_SCHEDULE.getValueAsInteger(); + + try { + // Configure to trigger compaction easily + GlobalConfiguration.INDEX_COMPACTION_RAM_MB.setValue(COMPACTION_RAM_MB); + GlobalConfiguration.INDEX_COMPACTION_MIN_PAGES_SCHEDULE.setValue(0); + + // Create type with multiple properties + database.transaction(() -> { + final DocumentType type = database.getSchema() + .buildDocumentType() + .withName(TYPE_NAME) + .withTotalBuckets(PARALLEL) + .create(); + + type.createProperty("userId", Integer.class); + type.createProperty("movieId", Integer.class); + type.createProperty("rating", Float.class); + type.createProperty("timestamp", Long.class); + }); + + // Insert large dataset + database.setReadYourWrites(false); + database.async().setCommitEvery(50000); + database.async().setParallelLevel(PARALLEL); + database.async().setTransactionUseWAL(true); + database.async().setTransactionSync(WALFile.FlushType.YES_NOMETADATA); + + database.async().transaction(() -> { + for (int i = 0; i < TOT; i++) { + final MutableDocument doc = database.newDocument(TYPE_NAME); + doc.set("userId", i % 10000); + doc.set("movieId", i % 5000); + doc.set("rating", (i % 10) / 2.0f); + doc.set("timestamp", System.currentTimeMillis()); + doc.save(); + } + }); + + database.async().waitCompletion(); + + // Create first index - this should succeed and trigger async compaction + database.transaction(() -> { + database.getSchema() + .buildTypeIndex(TYPE_NAME, new String[] { "userId" }) + .withType(Schema.INDEX_TYPE.LSM_TREE) + .withUnique(false) + .withPageSize(INDEX_PAGE_SIZE) + .create(); + }); + + // Verify first index exists + assertThat(database.getSchema().getIndexByName(TYPE_NAME + "[userId]")).isNotNull(); + + // Force compaction to start in async mode + final IndexInternal firstIndex = (IndexInternal) database.getSchema().getIndexByName(TYPE_NAME + "[userId]"); + firstIndex.scheduleCompaction(); + + // Check if async processing is active + final boolean asyncProcessingActive = ((DatabaseInternal) database).isAsyncProcessing(); + + // Create second index immediately - this would fail with NeedRetryException before the fix + // because compaction from first index is still running + database.transaction(() -> { + database.getSchema() + .buildTypeIndex(TYPE_NAME, new String[] { "movieId" }) + .withType(Schema.INDEX_TYPE.LSM_TREE) + .withUnique(false) + .withPageSize(INDEX_PAGE_SIZE) + .create(); + }); + + // The test succeeds if we got here without NeedRetryException + // (before the fix, the second index creation would have failed if asyncProcessingActive was true) + + // Verify second index exists + assertThat(database.getSchema().getIndexByName(TYPE_NAME + "[movieId]")).isNotNull(); + + // Create third index - should also succeed + database.transaction(() -> { + database.getSchema() + .buildTypeIndex(TYPE_NAME, new String[] { "rating" }) + .withType(Schema.INDEX_TYPE.LSM_TREE) + .withUnique(false) + .withPageSize(INDEX_PAGE_SIZE) + .create(); + }); + + // Verify third index exists + assertThat(database.getSchema().getIndexByName(TYPE_NAME + "[rating]")).isNotNull(); + + // Verify all indexes work correctly + assertThat(database.lookupByKey(TYPE_NAME, new String[] { "userId" }, new Object[] { 100 }).hasNext()).isTrue(); + assertThat(database.lookupByKey(TYPE_NAME, new String[] { "movieId" }, new Object[] { 50 }).hasNext()).isTrue(); + assertThat(database.lookupByKey(TYPE_NAME, new String[] { "rating" }, new Object[] { 2.5f }).hasNext()).isTrue(); + + } finally { + // Restore original configuration + GlobalConfiguration.INDEX_COMPACTION_RAM_MB.setValue(originalCompactionRamMB); + GlobalConfiguration.INDEX_COMPACTION_MIN_PAGES_SCHEDULE.setValue(originalMinPages); + } + } + + @Test + void testIndexCreationWaitsForAsyncCompaction() throws Exception { + final int originalCompactionRamMB = GlobalConfiguration.INDEX_COMPACTION_RAM_MB.getValueAsInteger(); + final int originalMinPages = GlobalConfiguration.INDEX_COMPACTION_MIN_PAGES_SCHEDULE.getValueAsInteger(); + + try { + // Configure to trigger compaction easily + GlobalConfiguration.INDEX_COMPACTION_RAM_MB.setValue(COMPACTION_RAM_MB); + GlobalConfiguration.INDEX_COMPACTION_MIN_PAGES_SCHEDULE.setValue(0); + + // Create type and insert data + database.transaction(() -> { + final DocumentType type = database.getSchema() + .buildDocumentType() + .withName("TestType") + .withTotalBuckets(PARALLEL) + .create(); + + type.createProperty("id", Integer.class); + type.createProperty("value", String.class); + }); + + database.setReadYourWrites(false); + database.async().setCommitEvery(50000); + database.async().setParallelLevel(PARALLEL); + database.async().setTransactionUseWAL(true); + database.async().setTransactionSync(WALFile.FlushType.YES_NOMETADATA); + + database.async().transaction(() -> { + for (int i = 0; i < TOT; i++) { + final MutableDocument doc = database.newDocument("TestType"); + doc.set("id", i); + doc.set("value", "value-" + i); + doc.save(); + } + }); + + database.async().waitCompletion(); + + // Create first index + database.transaction(() -> { + database.getSchema() + .buildTypeIndex("TestType", new String[] { "id" }) + .withType(Schema.INDEX_TYPE.LSM_TREE) + .withUnique(false) + .withPageSize(INDEX_PAGE_SIZE) + .create(); + }); + + // Force compaction to run in async mode + final IndexInternal firstIndex = (IndexInternal) database.getSchema().getIndexByName("TestType[id]"); + firstIndex.scheduleCompaction(); + + // Wait a bit to ensure compaction starts + Thread.sleep(100); + + // Verify async processing is active + final boolean wasAsyncActive = ((DatabaseInternal) database).isAsyncProcessing(); + + // Track that index creation actually waited + final AtomicBoolean indexCreationCompleted = new AtomicBoolean(false); + final CountDownLatch creationLatch = new CountDownLatch(1); + + // Create second index in a separate thread + new Timer().schedule(new TimerTask() { + @Override + public void run() { + database.transaction(() -> { + database.getSchema() + .buildTypeIndex("TestType", new String[] { "value" }) + .withType(Schema.INDEX_TYPE.LSM_TREE) + .withUnique(false) + .withPageSize(INDEX_PAGE_SIZE) + .create(); + }); + indexCreationCompleted.set(true); + creationLatch.countDown(); + } + }, 0); + + // Wait for index creation to complete + creationLatch.await(); + + // Verify both indexes were created successfully + assertThat(database.getSchema().getIndexByName("TestType[id]")).isNotNull(); + assertThat(database.getSchema().getIndexByName("TestType[value]")).isNotNull(); + assertThat(indexCreationCompleted.get()).isTrue(); + + // If async processing was active and we still succeeded, the fix is working + // (before the fix, this would have thrown NeedRetryException) + + } finally { + // Restore original configuration + GlobalConfiguration.INDEX_COMPACTION_RAM_MB.setValue(originalCompactionRamMB); + GlobalConfiguration.INDEX_COMPACTION_MIN_PAGES_SCHEDULE.setValue(originalMinPages); + } + } +}