2121import org .apache .logging .log4j .Logger ;
2222import org .opensearch .common .unit .TimeValue ;
2323import org .opensearch .knn .common .exception .OutOfNativeMemoryException ;
24+ import org .opensearch .knn .common .featureflags .KNNFeatureFlags ;
2425import org .opensearch .knn .index .KNNSettings ;
2526import org .opensearch .knn .plugin .stats .StatNames ;
2627
2728import java .io .Closeable ;
29+ import java .util .Deque ;
2830import java .util .HashMap ;
31+ import java .util .Iterator ;
2932import java .util .Map ;
3033import java .util .Optional ;
34+ import java .util .concurrent .ConcurrentLinkedDeque ;
3135import java .util .concurrent .ExecutionException ;
3236import java .util .concurrent .ExecutorService ;
3337import java .util .concurrent .Executors ;
@@ -45,6 +49,7 @@ public class NativeMemoryCacheManager implements Closeable {
4549 private static NativeMemoryCacheManager INSTANCE ;
4650
4751 private Cache <String , NativeMemoryAllocation > cache ;
52+ private Deque <String > accessRecencyQueue ;
4853 private final ExecutorService executor ;
4954 private AtomicBoolean cacheCapacityReached ;
5055 private long maxWeight ;
@@ -97,7 +102,7 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
97102 }
98103
99104 cacheCapacityReached = new AtomicBoolean (false );
100-
105+ accessRecencyQueue = new ConcurrentLinkedDeque <>();
101106 cache = cacheBuilder .build ();
102107 }
103108
@@ -301,7 +306,52 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
301306 );
302307 }
303308
304- return cache .get (nativeMemoryEntryContext .getKey (), nativeMemoryEntryContext ::load );
309+ if (KNNFeatureFlags .isForceEvictCacheEnabled ()) {
310+ // Utilizes a force eviction mechanism to free up memory before the entry can be added to the cache
311+ // In case of a cache hit, the operation just updates the locally maintained recency list
312+ // In case of a cache miss, least recently accessed entries are evicted in a blocking manner
313+ // before the new entry can be added to the cache.
314+ String key = nativeMemoryEntryContext .getKey ();
315+ NativeMemoryAllocation result = cache .getIfPresent (key );
316+
317+ // Cache Hit
318+ // In case of a cache hit, moving the item to the end of the recency queue adds
319+ // some overhead to the get operation. This can be optimized further to make this operation
320+ // as lightweight as possible. Multiple approaches and their outcomes were documented
321+ // before moving forward with the current solution.
322+ // The details are outlined here: https://github.com/opensearch-project/k-NN/pull/2015#issuecomment-2327064680
323+ if (result != null ) {
324+ accessRecencyQueue .remove (key );
325+ accessRecencyQueue .addLast (key );
326+ return result ;
327+ }
328+
329+ // Cache Miss
330+ // Evict before put
331+ synchronized (this ) {
332+ if (getCacheSizeInKilobytes () + nativeMemoryEntryContext .calculateSizeInKB () >= maxWeight ) {
333+ Iterator <String > lruIterator = accessRecencyQueue .iterator ();
334+ while (lruIterator .hasNext ()
335+ && (getCacheSizeInKilobytes () + nativeMemoryEntryContext .calculateSizeInKB () >= maxWeight )) {
336+
337+ String keyToRemove = lruIterator .next ();
338+ NativeMemoryAllocation allocationToRemove = cache .getIfPresent (keyToRemove );
339+ if (allocationToRemove != null ) {
340+ allocationToRemove .close ();
341+ cache .invalidate (keyToRemove );
342+ }
343+ lruIterator .remove ();
344+ }
345+ }
346+
347+ result = cache .get (key , nativeMemoryEntryContext ::load );
348+ accessRecencyQueue .addLast (key );
349+
350+ return result ;
351+ }
352+ } else {
353+ return cache .get (nativeMemoryEntryContext .getKey (), nativeMemoryEntryContext ::load );
354+ }
305355 }
306356
307357 /**
0 commit comments