Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,8 @@ public class BucketCache implements BlockCache, HeapSize {
*/
transient final IdReadWriteLock<Long> offsetLock;

final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> {
int nameComparison = a.getHfileName().compareTo(b.getHfileName());
if (nameComparison != 0) {
return nameComparison;
}
return Long.compare(a.getOffset(), b.getOffset());
});
NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>(
Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));

/** Statistics thread schedule pool (for heavy debugging, could remove) */
private transient final ScheduledExecutorService scheduleThreadPool =
Expand Down Expand Up @@ -1471,8 +1466,11 @@ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String
}

private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
this::createRecycler);
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair =
BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
this::createRecycler);
backingMap = pair.getFirst();
blocksByHFile = pair.getSecond();
fullyCachedFiles.clear();
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap()));
if (proto.hasChecksum()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.hadoop.hbase.io.hfile.bucket;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.Function;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
Expand Down Expand Up @@ -121,10 +124,12 @@ private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) {
}
}

static ConcurrentHashMap<BlockCacheKey, BucketEntry> fromPB(Map<Integer, String> deserializers,
BucketCacheProtos.BackingMap backingMap, Function<BucketEntry, Recycler> createRecycler)
throws IOException {
static Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> fromPB(
Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap,
Function<BucketEntry, Recycler> createRecycler) throws IOException {
ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
NavigableSet<BlockCacheKey> resultSet = new ConcurrentSkipListSet<>(Comparator
.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));
for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
Expand Down Expand Up @@ -153,8 +158,9 @@ static ConcurrentHashMap<BlockCacheKey, BucketEntry> fromPB(Map<Integer, String>
throw new IOException("Unknown deserializer class found: " + deserializerClass);
}
result.put(key, value);
resultSet.add(key);
}
return result;
return new Pair<>(result, resultSet);
}

private static BlockType fromPb(BucketCacheProtos.BlockType blockType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,39 @@ public void testBucketCacheRecovery() throws Exception {
TEST_UTIL.cleanupTestDir();
}

@Test
public void testBucketCacheEvictByHFileAfterRecovery() throws Exception {
HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
Configuration conf = HBaseConfiguration.create();
// Disables the persister thread by setting its interval to MAX_VALUE
conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);

CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);

// Add four blocks
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
// saves the current state of the cache
bucketCache.persistToFile();

BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
Thread.sleep(100);
assertEquals(4, newBucketCache.backingMap.size());
newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName());
assertEquals(3, newBucketCache.backingMap.size());
TEST_UTIL.cleanupTestDir();
}

private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
throws InterruptedException {
while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
Expand Down