diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 3cfc437107a4..0ca64e23484a 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -111,6 +111,8 @@ Improvements * GITHUB#15225: Improve package documentation for org.apache.lucene.util. (Syed Mohammad Saad) +* GITHUB#15795: Introduce MemoryAccountingBitsetCollectorManager to parallelize search when using MemoryAccountingBitsetCollector. (Binlong Gao) + Optimizations --------------------- * GITHUB#15681: Replace pre-sized array or empty array with lambda expression to call Collection#toArray. (Zhou Hui) diff --git a/lucene/misc/src/java/org/apache/lucene/misc/CollectorMemoryTracker.java b/lucene/misc/src/java/org/apache/lucene/misc/CollectorMemoryTracker.java index 305c61c8cd0c..14a7775a6c3c 100644 --- a/lucene/misc/src/java/org/apache/lucene/misc/CollectorMemoryTracker.java +++ b/lucene/misc/src/java/org/apache/lucene/misc/CollectorMemoryTracker.java @@ -51,4 +51,12 @@ public void updateBytes(long bytes) { public long getBytes() { return memoryUsage.get(); } + + public String getName() { + return name; + } + + public long getMemoryLimit() { + return memoryLimit; + } } diff --git a/lucene/misc/src/java/org/apache/lucene/misc/search/MemoryAccountingBitsetCollector.java b/lucene/misc/src/java/org/apache/lucene/misc/search/MemoryAccountingBitsetCollector.java index a2337ab033e8..075f6b1fd302 100644 --- a/lucene/misc/src/java/org/apache/lucene/misc/search/MemoryAccountingBitsetCollector.java +++ b/lucene/misc/src/java/org/apache/lucene/misc/search/MemoryAccountingBitsetCollector.java @@ -24,23 +24,46 @@ import org.apache.lucene.search.SimpleCollector; import org.apache.lucene.util.FixedBitSet; -/** Bitset collector which supports memory tracking */ +/** + * Bitset collector which supports memory tracking. Can operate in two modes: Full index mode: + * allocates bitset for entire index (for single-threaded search), Segment-local mode: allocates + * bitset only for segments processed (memory-efficient for concurrent search) + */ public class MemoryAccountingBitsetCollector extends SimpleCollector { final CollectorMemoryTracker tracker; + final boolean segmentLocal; FixedBitSet bitSet = new FixedBitSet(0); int length = 0; int docBase = 0; + // For segment-local mode + int minDocBase = Integer.MAX_VALUE; + int maxDocEnd = 0; + public MemoryAccountingBitsetCollector(CollectorMemoryTracker tracker) { + this(tracker, false); + } + + public MemoryAccountingBitsetCollector(CollectorMemoryTracker tracker, boolean segmentLocal) { this.tracker = tracker; + this.segmentLocal = segmentLocal; tracker.updateBytes(bitSet.ramBytesUsed()); } @Override protected void doSetNextReader(LeafReaderContext context) throws IOException { docBase = context.docBase; - length += context.reader().maxDoc(); + + if (segmentLocal) { + int docEnd = docBase + context.reader().maxDoc(); + minDocBase = Math.min(minDocBase, docBase); + maxDocEnd = Math.max(maxDocEnd, docEnd); + length = maxDocEnd - minDocBase; + } else { + length += context.reader().maxDoc(); + } + FixedBitSet newBitSet = FixedBitSet.ensureCapacity(bitSet, length); if (newBitSet != bitSet) { tracker.updateBytes(newBitSet.ramBytesUsed() - bitSet.ramBytesUsed()); @@ -50,11 +73,23 @@ protected void doSetNextReader(LeafReaderContext context) throws IOException { @Override public void collect(int doc) { - bitSet.set(docBase + doc); + if (segmentLocal) { + bitSet.set(docBase - minDocBase + doc); + } else { + bitSet.set(docBase + doc); + } } @Override public ScoreMode scoreMode() { return ScoreMode.COMPLETE_NO_SCORES; } + + int getMinDocBase() { + return segmentLocal ? minDocBase : 0; + } + + int getMaxDocEnd() { + return segmentLocal ? maxDocEnd : length; + } } diff --git a/lucene/misc/src/java/org/apache/lucene/misc/search/MemoryAccountingBitsetCollectorManager.java b/lucene/misc/src/java/org/apache/lucene/misc/search/MemoryAccountingBitsetCollectorManager.java new file mode 100644 index 000000000000..1aec60d9e630 --- /dev/null +++ b/lucene/misc/src/java/org/apache/lucene/misc/search/MemoryAccountingBitsetCollectorManager.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.misc.search; + +import java.util.Collection; +import org.apache.lucene.misc.CollectorMemoryTracker; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.util.FixedBitSet; + +/** + * CollectorManager for MemoryAccountingBitsetCollector that supports concurrent search. + * + *

Creates multiple collectors for concurrent execution, each collector only allocates bitset for + * segments it processes, then merges with proper offset in reduce(). + */ +public class MemoryAccountingBitsetCollectorManager + implements CollectorManager { + + private final CollectorMemoryTracker tracker; + private long totalBytesUsed; + + public MemoryAccountingBitsetCollectorManager(CollectorMemoryTracker tracker) { + this.tracker = tracker; + } + + @Override + public MemoryAccountingBitsetCollector newCollector() { + return new MemoryAccountingBitsetCollector( + new CollectorMemoryTracker(tracker.getName() + "-collector", tracker.getMemoryLimit()), + true); + } + + @Override + public FixedBitSet reduce(Collection collectors) { + if (collectors.isEmpty()) { + return new FixedBitSet(0); + } + + if (collectors.size() == 1) { + MemoryAccountingBitsetCollector collector = collectors.iterator().next(); + this.totalBytesUsed = collector.tracker.getBytes(); + + if (collector.getMinDocBase() == 0) { + return collector.bitSet; + } + + FixedBitSet result = new FixedBitSet(collector.getMaxDocEnd()); + int length = collector.getMaxDocEnd() - collector.getMinDocBase(); + FixedBitSet.orRange(collector.bitSet, 0, result, collector.getMinDocBase(), length); + return result; + } + + // Find global doc range across all collectors + int globalMinDocBase = Integer.MAX_VALUE; + int globalMaxDocEnd = 0; + for (MemoryAccountingBitsetCollector collector : collectors) { + globalMinDocBase = Math.min(globalMinDocBase, collector.getMinDocBase()); + globalMaxDocEnd = Math.max(globalMaxDocEnd, collector.getMaxDocEnd()); + long collectorBytes = collector.tracker.getBytes(); + this.totalBytesUsed += collectorBytes; + } + + FixedBitSet result = new FixedBitSet(globalMaxDocEnd); + + for (MemoryAccountingBitsetCollector collector : collectors) { + if (collector.bitSet != null && collector.bitSet.length() > 0) { + int length = collector.getMaxDocEnd() - collector.getMinDocBase(); + FixedBitSet.orRange(collector.bitSet, 0, result, collector.getMinDocBase(), length); + } + } + + return result; + } + + public long getBytes() { + return this.totalBytesUsed; + } +} diff --git a/lucene/misc/src/test/org/apache/lucene/misc/search/TestMemoryAccountingBitsetCollector.java b/lucene/misc/src/test/org/apache/lucene/misc/search/TestMemoryAccountingBitsetCollector.java index b642cf8f171c..cf246d01d6a8 100644 --- a/lucene/misc/src/test/org/apache/lucene/misc/search/TestMemoryAccountingBitsetCollector.java +++ b/lucene/misc/src/test/org/apache/lucene/misc/search/TestMemoryAccountingBitsetCollector.java @@ -28,6 +28,7 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.FixedBitSet; public class TestMemoryAccountingBitsetCollector extends LuceneTestCase { @@ -57,31 +58,43 @@ public void tearDown() throws Exception { dir.close(); } - public void testMemoryAccountingBitsetCollectorMemoryLimit() { + public void testMemoryAccountingBitsetCollectorMemoryLimit() throws Exception { long perCollectorMemoryLimit = 150; CollectorMemoryTracker tracker = new CollectorMemoryTracker("testMemoryTracker", perCollectorMemoryLimit); - MemoryAccountingBitsetCollector bitSetCollector = new MemoryAccountingBitsetCollector(tracker); - + MemoryAccountingBitsetCollectorManager bitsetCollectorManager = + new MemoryAccountingBitsetCollectorManager(tracker); IndexSearcher searcher = new IndexSearcher(reader); expectThrows( IllegalStateException.class, - () -> { - searcher.search(MatchAllDocsQuery.INSTANCE, bitSetCollector); - }); + () -> searcher.search(MatchAllDocsQuery.INSTANCE, bitsetCollectorManager)); + } + + public void testConcurrentMemoryLimit() throws Exception { + // For collector with collecting only 1 doc, 80 bytes are required. + long perCollectorMemoryLimit = 79; + CollectorMemoryTracker tracker = + new CollectorMemoryTracker("testMemoryTracker", perCollectorMemoryLimit); + MemoryAccountingBitsetCollectorManager bitsetCollectorManager = + new MemoryAccountingBitsetCollectorManager(tracker); + IndexSearcher searcher = newSearcher(reader); + expectThrows( + IllegalStateException.class, + () -> searcher.search(MatchAllDocsQuery.INSTANCE, bitsetCollectorManager)); } public void testCollectedResult() throws Exception { CollectorMemoryTracker tracker = new CollectorMemoryTracker("testMemoryTracker", Long.MAX_VALUE); - MemoryAccountingBitsetCollector collector = new MemoryAccountingBitsetCollector(tracker); + MemoryAccountingBitsetCollectorManager bitsetCollectorManager = + new MemoryAccountingBitsetCollectorManager(tracker); - IndexSearcher searcher = new IndexSearcher(reader); - searcher.search(MatchAllDocsQuery.INSTANCE, collector); + IndexSearcher searcher = newSearcher(reader); + FixedBitSet result = searcher.search(MatchAllDocsQuery.INSTANCE, bitsetCollectorManager); - assertEquals(1000, collector.bitSet.cardinality()); + assertEquals(1000, result.cardinality()); for (int i = 0; i < 1000; i++) { - assertTrue(collector.bitSet.get(i)); + assertTrue(result.get(i)); } } }