diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 4587eced6163..f89a6194cefb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -99,6 +99,12 @@ public class CacheConfig { public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY = "hbase.bucketcache.persist.intervalinmillis"; + /** + * Configuration key to set the heap usage threshold limit once prefetch threads should be + * interrupted. + */ + public static final String PREFETCH_HEAP_USAGE_THRESHOLD = "hbase.rs.prefetchheapusage"; + // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false; @@ -111,6 +117,7 @@ public class CacheConfig { public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false; public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true; public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD = Long.MAX_VALUE; + public static final double DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD = 1d; /** * Whether blocks should be cached on read (default is on if there is a cache but this can be @@ -157,6 +164,8 @@ public class CacheConfig { private final ByteBuffAllocator byteBuffAllocator; + private final double heapUsageThreshold; + /** * Create a cache configuration using the specified configuration object and defaults for family * level settings. Only use if no column family context. @@ -201,6 +210,8 @@ public CacheConfig(Configuration conf, ColumnFamilyDescriptor family, BlockCache this.cacheCompactedDataOnWrite = conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE); this.cacheCompactedDataOnWriteThreshold = getCacheCompactedBlocksOnWriteThreshold(conf); + this.heapUsageThreshold = + conf.getDouble(PREFETCH_HEAP_USAGE_THRESHOLD, DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD); this.blockCache = blockCache; this.byteBuffAllocator = byteBuffAllocator; } @@ -222,6 +233,7 @@ public CacheConfig(CacheConfig cacheConf) { this.dropBehindCompaction = cacheConf.dropBehindCompaction; this.blockCache = cacheConf.blockCache; this.byteBuffAllocator = cacheConf.byteBuffAllocator; + this.heapUsageThreshold = cacheConf.heapUsageThreshold; } private CacheConfig() { @@ -237,6 +249,7 @@ private CacheConfig() { this.dropBehindCompaction = false; this.blockCache = null; this.byteBuffAllocator = ByteBuffAllocator.HEAP; + this.heapUsageThreshold = DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD; } /** @@ -386,6 +399,17 @@ public boolean shouldReadBlockFromCache(BlockType blockType) { return false; } + /** + * Checks if the current heap usage is below the threshold configured by + * "hbase.rs.prefetchheapusage" (0.8 by default). + */ + public boolean isHeapUsageBelowThreshold() { + double total = Runtime.getRuntime().maxMemory(); + double available = Runtime.getRuntime().freeMemory(); + double usedRatio = 1d - (available / total); + return heapUsageThreshold > usedRatio; + } + /** * If we make sure the block could not be cached, we will not acquire the lock otherwise we will * acquire lock @@ -413,6 +437,10 @@ public ByteBuffAllocator getByteBuffAllocator() { return this.byteBuffAllocator; } + public double getHeapUsageThreshold() { + return heapUsageThreshold; + } + private long getCacheCompactedBlocksOnWriteThreshold(Configuration conf) { long cacheCompactedBlocksOnWriteThreshold = conf.getLong(CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 92f6a8169f32..6063ffe68891 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -106,13 +106,23 @@ public void run() { HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true, /* pread= */false, false, false, null, null, true); try { - if (!cacheConf.isInMemory() && !cache.blockFitsIntoTheCache(block).orElse(true)) { - LOG.warn( - "Interrupting prefetch for file {} because block {} of size {} " - + "doesn't fit in the available cache space.", - path, cacheKey, block.getOnDiskSizeWithHeader()); - interrupted = true; - break; + if (!cacheConf.isInMemory()) { + if (!cache.blockFitsIntoTheCache(block).orElse(true)) { + LOG.warn( + "Interrupting prefetch for file {} because block {} of size {} " + + "doesn't fit in the available cache space.", + path, cacheKey, block.getOnDiskSizeWithHeader()); + interrupted = true; + break; + } + if (!cacheConf.isHeapUsageBelowThreshold()) { + LOG.warn( + "Interrupting prefetch because heap usage is above the threshold: {} " + + "configured via {}", + cacheConf.getHeapUsageThreshold(), CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD); + interrupted = true; + break; + } } onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize(); offset += block.getOnDiskSizeWithHeader(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 0b45a930dceb..85b9199638c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -42,6 +42,7 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -156,6 +157,43 @@ public void testPrefetchBlockCacheDisabled() throws Exception { poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size()); } + @Test + public void testPrefetchHeapUsageAboveThreshold() throws Exception { + ColumnFamilyDescriptor columnFamilyDescriptor = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true) + .setBlockCacheEnabled(true).build(); + HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); + Configuration newConf = new Configuration(conf); + newConf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, 0.1); + CacheConfig cacheConfig = + new CacheConfig(newConf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); + Path storeFile = writeStoreFile("testPrefetchHeapUsageAboveThreshold", meta, cacheConfig); + MutableInt cachedCount = new MutableInt(0); + MutableInt unCachedCount = new MutableInt(0); + readStoreFile(storeFile, (r, o) -> { + HFileBlock block = null; + try { + block = r.readBlock(o, -1, false, true, false, true, null, null); + } catch (IOException e) { + fail(e.getMessage()); + } + return block; + }, (key, block) -> { + boolean isCached = blockCache.getBlock(key, true, false, true) != null; + if ( + block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX + || block.getBlockType() == BlockType.INTERMEDIATE_INDEX + ) { + if (isCached) { + cachedCount.increment(); + } else { + unCachedCount.increment(); + } + } + }, cacheConfig); + assertTrue(unCachedCount.compareTo(cachedCount) > 0); + } + @Test public void testPrefetch() throws Exception { TraceUtil.trace(() -> { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java index f6d3efa9015d..a39df7e14715 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java @@ -86,9 +86,10 @@ public Configuration setupBucketCacheConfig(long bucketCachePersistInterval) thr return conf; } - public BucketCache setupBucketCache(Configuration conf) throws IOException { - BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, - constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, + public BucketCache setupBucketCache(Configuration conf, String persistentCacheFile) + throws IOException { + BucketCache bucketCache = new BucketCache("file:" + testDir + "/" + persistentCacheFile, + capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); return bucketCache; } @@ -103,7 +104,7 @@ public void cleanupBucketCache(BucketCache bucketCache) throws IOException { public void testPrefetchPersistenceCrash() throws Exception { long bucketCachePersistInterval = 3000; Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval); - BucketCache bucketCache = setupBucketCache(conf); + BucketCache bucketCache = setupBucketCache(conf, "testPrefetchPersistenceCrash"); CacheConfig cacheConf = new CacheConfig(conf, bucketCache); FileSystem fs = HFileSystem.get(conf); // Load Cache @@ -121,7 +122,7 @@ public void testPrefetchPersistenceCrash() throws Exception { public void testPrefetchPersistenceCrashNegative() throws Exception { long bucketCachePersistInterval = Long.MAX_VALUE; Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval); - BucketCache bucketCache = setupBucketCache(conf); + BucketCache bucketCache = setupBucketCache(conf, "testPrefetchPersistenceCrashNegative"); CacheConfig cacheConf = new CacheConfig(conf, bucketCache); FileSystem fs = HFileSystem.get(conf); // Load Cache @@ -134,7 +135,7 @@ public void testPrefetchPersistenceCrashNegative() throws Exception { @Test public void testPrefetchListUponBlockEviction() throws Exception { Configuration conf = setupBucketCacheConfig(200); - BucketCache bucketCache = setupBucketCache(conf); + BucketCache bucketCache = setupBucketCache(conf, "testPrefetchListUponBlockEviction"); CacheConfig cacheConf = new CacheConfig(conf, bucketCache); FileSystem fs = HFileSystem.get(conf); // Load Blocks in cache @@ -156,7 +157,8 @@ public void testPrefetchListUponBlockEviction() throws Exception { @Test public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception { Configuration conf = setupBucketCacheConfig(200); - BucketCache bucketCache = setupBucketCache(conf); + BucketCache bucketCache = + setupBucketCache(conf, "testPrefetchBlockEvictionWhilePrefetchRunning"); CacheConfig cacheConf = new CacheConfig(conf, bucketCache); FileSystem fs = HFileSystem.get(conf); // Load Blocks in cache