diff --git a/CHANGELOG.md b/CHANGELOG.md index 58058dbd1d..060ac75de6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Fix graph merge stats size calculation [#1844](https://github.com/opensearch-project/k-NN/pull/1844) * Disallow a vector field to have an invalid character for a physical file name. [#1936](https://github.com/opensearch-project/k-NN/pull/1936) * Add script_fields context to KNNAllowlist [#1917] (https://github.com/opensearch-project/k-NN/pull/1917) +* Fix memory overflow caused by cache behavior [#2015](https://github.com/opensearch-project/k-NN/pull/2015) ### Infrastructure * Parallelize make to reduce build time [#2006] (https://github.com/opensearch-project/k-NN/pull/2006) ### Documentation @@ -45,4 +46,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Restructure mappers to better handle null cases and avoid branching in parsing [#1939](https://github.com/opensearch-project/k-NN/pull/1939) * Added Quantization Framework and implemented 1Bit and multibit quantizer[#1889](https://github.com/opensearch-project/k-NN/issues/1889) * Encapsulate dimension, vector data type validation/processing inside Library [#1957](https://github.com/opensearch-project/k-NN/pull/1957) -* Add quantization state cache [#1960](https://github.com/opensearch-project/k-NN/pull/1960) \ No newline at end of file +* Add quantization state cache [#1960](https://github.com/opensearch-project/k-NN/pull/1960) diff --git a/src/main/java/org/opensearch/knn/common/featureflags/KNNFeatureFlags.java b/src/main/java/org/opensearch/knn/common/featureflags/KNNFeatureFlags.java new file mode 100644 index 0000000000..bab5b97bb1 --- /dev/null +++ b/src/main/java/org/opensearch/knn/common/featureflags/KNNFeatureFlags.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.knn.common.featureflags; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import lombok.experimental.UtilityClass; +import org.opensearch.common.Booleans; +import org.opensearch.common.settings.Setting; +import org.opensearch.knn.index.KNNSettings; + +import java.util.List; + +import static org.opensearch.common.settings.Setting.Property.Dynamic; +import static org.opensearch.common.settings.Setting.Property.NodeScope; + +/** + * Class to manage KNN feature flags + */ +@UtilityClass +public class KNNFeatureFlags { + + // Feature flags + private static final String KNN_FORCE_EVICT_CACHE_ENABLED = "knn.feature.cache.force_evict.enabled"; + + @VisibleForTesting + public static final Setting KNN_FORCE_EVICT_CACHE_ENABLED_SETTING = Setting.boolSetting( + KNN_FORCE_EVICT_CACHE_ENABLED, + false, + NodeScope, + Dynamic + ); + + public static List> getFeatureFlags() { + return ImmutableList.of(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING); + } + + /** + * Checks if force evict for cache is enabled by executing a check against cluster settings + * @return true if force evict setting is set to true + */ + public static boolean isForceEvictCacheEnabled() { + return Booleans.parseBoolean(KNNSettings.state().getSettingValue(KNN_FORCE_EVICT_CACHE_ENABLED).toString(), false); + } +} diff --git a/src/main/java/org/opensearch/knn/index/KNNSettings.java b/src/main/java/org/opensearch/knn/index/KNNSettings.java index e0123ef8d5..a70a17d858 100644 --- a/src/main/java/org/opensearch/knn/index/KNNSettings.java +++ b/src/main/java/org/opensearch/knn/index/KNNSettings.java @@ -34,14 +34,17 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.stream.Collectors.toUnmodifiableMap; import static org.opensearch.common.settings.Setting.Property.Dynamic; import static org.opensearch.common.settings.Setting.Property.IndexScope; import static org.opensearch.common.settings.Setting.Property.NodeScope; import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio; import static org.opensearch.core.common.unit.ByteSizeValue.parseBytesSizeValue; +import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.getFeatureFlags; /** * This class defines @@ -334,6 +337,9 @@ public class KNNSettings { } }; + private final static Map> FEATURE_FLAGS = getFeatureFlags().stream() + .collect(toUnmodifiableMap(Setting::getKey, Function.identity())); + private ClusterService clusterService; private Client client; @@ -371,7 +377,7 @@ private void setSettingsUpdateConsumers() { ); NativeMemoryCacheManager.getInstance().rebuildCache(builder.build()); - }, dynamicCacheSettings.values().stream().collect(Collectors.toUnmodifiableList())); + }, Stream.concat(dynamicCacheSettings.values().stream(), FEATURE_FLAGS.values().stream()).collect(Collectors.toUnmodifiableList())); clusterService.getClusterSettings().addSettingsUpdateConsumer(QUANTIZATION_STATE_CACHE_SIZE_LIMIT_SETTING, it -> { QuantizationStateCache.getInstance().setMaxCacheSizeInKB(it.getKb()); QuantizationStateCache.getInstance().rebuildCache(); @@ -398,6 +404,10 @@ private Setting getSetting(String key) { return dynamicCacheSettings.get(key); } + if (FEATURE_FLAGS.containsKey(key)) { + return FEATURE_FLAGS.get(key); + } + if (KNN_CIRCUIT_BREAKER_TRIGGERED.equals(key)) { return KNN_CIRCUIT_BREAKER_TRIGGERED_SETTING; } @@ -452,7 +462,8 @@ public List> getSettings() { QUANTIZATION_STATE_CACHE_SIZE_LIMIT_SETTING, QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES_SETTING ); - return Stream.concat(settings.stream(), dynamicCacheSettings.values().stream()).collect(Collectors.toList()); + return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream())) + .collect(Collectors.toList()); } public static boolean isKNNPluginEnabled() { diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java index c711f3342f..02b480ed40 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java @@ -13,6 +13,7 @@ import lombok.Getter; import org.apache.lucene.index.LeafReaderContext; +import org.opensearch.knn.common.featureflags.KNNFeatureFlags; import org.opensearch.knn.index.VectorDataType; import org.opensearch.knn.index.query.KNNWeight; import org.opensearch.knn.jni.JNIService; @@ -161,11 +162,19 @@ class IndexAllocation implements NativeMemoryAllocation { @Override public void close() { - executor.execute(() -> { + Runnable onClose = () -> { writeLock(); cleanup(); writeUnlock(); - }); + }; + + // The close operation needs to be blocking to prevent overflow + // This blocks any entry until the close has completed, preventing creation before close scenarios + if (KNNFeatureFlags.isForceEvictCacheEnabled()) { + onClose.run(); + } else { + executor.execute(onClose); + } } private void cleanup() { diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index 9478e1e006..649fb9774e 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -21,13 +21,17 @@ import org.apache.logging.log4j.Logger; import org.opensearch.common.unit.TimeValue; import org.opensearch.knn.common.exception.OutOfNativeMemoryException; +import org.opensearch.knn.common.featureflags.KNNFeatureFlags; import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.plugin.stats.StatNames; import java.io.Closeable; +import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -45,6 +49,7 @@ public class NativeMemoryCacheManager implements Closeable { private static NativeMemoryCacheManager INSTANCE; private Cache cache; + private Deque accessRecencyQueue; private final ExecutorService executor; private AtomicBoolean cacheCapacityReached; private long maxWeight; @@ -97,7 +102,7 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { } cacheCapacityReached = new AtomicBoolean(false); - + accessRecencyQueue = new ConcurrentLinkedDeque<>(); cache = cacheBuilder.build(); } @@ -301,7 +306,52 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext nativeMemoryEntryC ); } - return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load); + if (KNNFeatureFlags.isForceEvictCacheEnabled()) { + // Utilizes a force eviction mechanism to free up memory before the entry can be added to the cache + // In case of a cache hit, the operation just updates the locally maintained recency list + // In case of a cache miss, least recently accessed entries are evicted in a blocking manner + // before the new entry can be added to the cache. + String key = nativeMemoryEntryContext.getKey(); + NativeMemoryAllocation result = cache.getIfPresent(key); + + // Cache Hit + // In case of a cache hit, moving the item to the end of the recency queue adds + // some overhead to the get operation. This can be optimized further to make this operation + // as lightweight as possible. Multiple approaches and their outcomes were documented + // before moving forward with the current solution. + // The details are outlined here: https://github.com/opensearch-project/k-NN/pull/2015#issuecomment-2327064680 + if (result != null) { + accessRecencyQueue.remove(key); + accessRecencyQueue.addLast(key); + return result; + } + + // Cache Miss + // Evict before put + synchronized (this) { + if (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight) { + Iterator lruIterator = accessRecencyQueue.iterator(); + while (lruIterator.hasNext() + && (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight)) { + + String keyToRemove = lruIterator.next(); + NativeMemoryAllocation allocationToRemove = cache.getIfPresent(keyToRemove); + if (allocationToRemove != null) { + allocationToRemove.close(); + cache.invalidate(keyToRemove); + } + lruIterator.remove(); + } + } + + result = cache.get(key, nativeMemoryEntryContext::load); + accessRecencyQueue.addLast(key); + + return result; + } + } else { + return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load); + } } /** diff --git a/src/test/java/org/opensearch/knn/common/featureflags/KNNFeatureFlagsTests.java b/src/test/java/org/opensearch/knn/common/featureflags/KNNFeatureFlagsTests.java new file mode 100644 index 0000000000..f2f74944e9 --- /dev/null +++ b/src/test/java/org/opensearch/knn/common/featureflags/KNNFeatureFlagsTests.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.common.featureflags; + +import org.mockito.Mock; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.knn.KNNTestCase; +import org.opensearch.knn.index.KNNSettings; + +import static org.mockito.Mockito.when; +import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.KNN_FORCE_EVICT_CACHE_ENABLED_SETTING; +import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.isForceEvictCacheEnabled; + +public class KNNFeatureFlagsTests extends KNNTestCase { + + @Mock + ClusterSettings clusterSettings; + + public void setUp() throws Exception { + super.setUp(); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + KNNSettings.state().setClusterService(clusterService); + } + + public void testIsForceEvictCacheEnabled() throws Exception { + when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(false); + assertFalse(isForceEvictCacheEnabled()); + when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(true); + assertTrue(isForceEvictCacheEnabled()); + } +} diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java index 1e21345818..cb5fbaebaa 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java @@ -13,26 +13,36 @@ import com.google.common.collect.ImmutableMap; import lombok.SneakyThrows; +import org.junit.Before; +import org.mockito.Mock; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.knn.KNNTestCase; import org.opensearch.knn.TestUtils; import org.opensearch.knn.common.KNNConstants; -import org.opensearch.knn.index.util.IndexUtil; +import org.opensearch.knn.index.KNNSettings; +import org.opensearch.knn.index.SpaceType; import org.opensearch.knn.index.VectorDataType; +import org.opensearch.knn.index.engine.KNNEngine; +import org.opensearch.knn.index.util.IndexUtil; import org.opensearch.knn.jni.JNICommons; import org.opensearch.knn.jni.JNIService; -import org.opensearch.knn.index.SpaceType; -import org.opensearch.knn.index.engine.KNNEngine; import org.opensearch.watcher.FileWatcher; import org.opensearch.watcher.WatcherHandle; import java.nio.file.Path; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.KNN_FORCE_EVICT_CACHE_ENABLED_SETTING; public class NativeMemoryAllocationTests extends KNNTestCase { @@ -41,6 +51,19 @@ public class NativeMemoryAllocationTests extends KNNTestCase { private int testLockValue3; private int testLockValue4; + @Mock + ClusterSettings clusterSettings; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + clusterSettings = mock(ClusterSettings.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(false); + KNNSettings.state().setClusterService(clusterService); + } + public void testIndexAllocation_close() throws InterruptedException { // Create basic nmslib HNSW index Path dir = createTempDir(); @@ -207,6 +230,71 @@ public void testIndexAllocation_readLock() throws InterruptedException { assertEquals(finalValue, testLockValue1); } + public void testIndexAllocation_closeDefault() { + WatcherHandle watcherHandle = (WatcherHandle) mock(WatcherHandle.class); + ExecutorService executorService = Executors.newFixedThreadPool(2); + AtomicReference expectedException = new AtomicReference<>(); + + // Executor based non-blocking close + NativeMemoryAllocation.IndexAllocation nonBlockingIndexAllocation = new NativeMemoryAllocation.IndexAllocation( + mock(ExecutorService.class), + 0, + 0, + null, + "test", + "test", + watcherHandle + ); + + executorService.submit(nonBlockingIndexAllocation::readLock); + Future closingThread = executorService.submit(nonBlockingIndexAllocation::close); + try { + closingThread.get(); + } catch (Exception ex) { + expectedException.set(ex); + } + assertNull(expectedException.get()); + expectedException.set(null); + executorService.shutdown(); + } + + public void testIndexAllocation_closeBlocking() throws InterruptedException, ExecutionException { + WatcherHandle watcherHandle = (WatcherHandle) mock(WatcherHandle.class); + ExecutorService executorService = Executors.newFixedThreadPool(2); + AtomicReference expectedException = new AtomicReference<>(); + + // Blocking close + when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(true); + NativeMemoryAllocation.IndexAllocation blockingIndexAllocation = new NativeMemoryAllocation.IndexAllocation( + mock(ExecutorService.class), + 0, + 0, + null, + "test", + "test", + watcherHandle + ); + + executorService.submit(blockingIndexAllocation::readLock); + Future closingThread = executorService.submit(blockingIndexAllocation::close); + + // Check if thread is currently blocked + try { + closingThread.get(5, TimeUnit.SECONDS); + } catch (Exception e) { + expectedException.set(e); + } + + assertNotNull(expectedException.get()); + + executorService.submit(blockingIndexAllocation::readUnlock); + closingThread.get(); + + // Waits until close + assertTrue(blockingIndexAllocation.isClosed()); + executorService.shutdown(); + } + public void testIndexAllocation_writeLock() throws InterruptedException { // To test the writeLock, we first grab the writeLock in the main thread. Then we start another thread that // grabs the readLock and asserts testLockValue2 has been updated. Next in the main thread, we update the value