From c39455e71d4cc0ed2b9bf89beb06f3637d2bec68 Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Mon, 7 Oct 2024 10:35:25 +0530 Subject: [PATCH] HBASE-28900: Avoid resetting the bucket cache during recovery from persistence. When an inconsistency is detected during the recovery of bucket cache from persistence, we tend to throw away the complete cache and try to rebuild the cache. Thsi inconsistency can occur when the region server is abruptly terminated. This can be avoided by skipping the inconsistent backing map entry and continuing with the cache recovery. The inconsistent backing map entry will be discarded during the subsequent bucket cache validation. Change-Id: I41237bda2189cbd73e22e4eadf4d53d16a3733f6 --- .../io/hfile/bucket/BucketAllocator.java | 19 ++++-- .../hbase/io/hfile/bucket/BucketCache.java | 2 +- .../TestRecoveryPersistentBucketCache.java | 64 +++++++++++++++++++ 3 files changed, 77 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index 0b03656d7010..e25d52698bab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -126,7 +126,7 @@ public long allocate() { return offset; } - public void addAllocation(long offset) throws BucketAllocatorException { + public boolean addAllocation(long offset) throws BucketAllocatorException { offset -= baseOffset; if (offset < 0 || offset % itemAllocationSize != 0) throw new BucketAllocatorException("Attempt to add allocation for bad offset: " + offset @@ -137,10 +137,14 @@ public void addAllocation(long offset) throws BucketAllocatorException { if (matchFound) freeList[i - 1] = freeList[i]; else if (freeList[i] == idx) matchFound = true; } - if (!matchFound) throw new BucketAllocatorException( - "Couldn't find match for index " + idx + " in free list"); + if (!matchFound) { + LOG.warn("We found more entries for bucket starting at offset {} for blocks of {} size. " + + "Skipping entry at cache offset {}", baseOffset, itemAllocationSize, offset); + return false; + } ++usedCount; --freeCount; + return true; } private void free(long offset) { @@ -402,10 +406,11 @@ public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) { bsi.instantiateBucket(b); reconfigured[bucketNo] = true; } - realCacheSize.add(foundLen); - buckets[bucketNo].addAllocation(foundOffset); - usedSize += buckets[bucketNo].getItemAllocationSize(); - bucketSizeInfos[bucketSizeIndex].blockAllocated(b); + if (buckets[bucketNo].addAllocation(foundOffset)) { + realCacheSize.add(foundLen); + usedSize += buckets[bucketNo].getItemAllocationSize(); + bucketSizeInfos[bucketSizeIndex].blockAllocated(b); + } } if (sizeNotMatchedCount > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 6dc65c632de0..bc7bc955864c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -854,7 +854,7 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, * it is {@link ByteBuffAllocator#putbackBuffer}. * */ - private Recycler createRecycler(final BucketEntry bucketEntry) { + public Recycler createRecycler(final BucketEntry bucketEntry) { return () -> { freeBucketEntry(bucketEntry); return; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java index 3a4af295dc84..b8fc1be986ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -199,6 +202,67 @@ public void testValidateCacheInitialization() throws Exception { TEST_UTIL.cleanupTestDir(); } + @Test + public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception { + HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + Configuration conf = HBaseConfiguration.create(); + // Disables the persister thread by setting its interval to MAX_VALUE + conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); + conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); + conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); + conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192, + bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(bucketCache.waitForCacheInitialization(1000)); + assertTrue( + bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); + + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5); + + // Add four blocks + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); + + // creates a entry for a 5th block with the same cache offset of the 1st block. Just add it + // straight to the backingMap, bypassing caching, in order to fabricate an inconsistency + BucketEntry bucketEntry = + new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(), + blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(), + 0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator()); + bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer()); + bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry); + + // saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The + // 5th block has same cache offset as the first + bucketCache.persistToFile(); + + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + while (!newBucketCache.getBackingMapValidated().get()) { + Thread.sleep(10); + } + + assertNull(newBucketCache.getBlock(blocks[4].getBlockName(), false, false, false)); + // The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry + // or null based on different ordering of the keys in the backing map. + // Hence, skipping the check for that key. + assertEquals(blocks[1].getBlock(), + newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); + assertEquals(blocks[2].getBlock(), + newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); + assertEquals(blocks[3].getBlock(), + newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); + assertEquals(4, newBucketCache.backingMap.size()); + TEST_UTIL.cleanupTestDir(); + } + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { Waiter.waitFor(HBaseConfiguration.create(), 12000,