diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index a468752de5cb..4a880ed6fc35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -246,4 +246,21 @@ default Optional> getRegionCachedInfo() { default int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { return 0; } + + /** + * API to check whether or not, the cache is enabled. + * @return returns true if the cache is enabled, false otherwise. + */ + default boolean isCacheEnabled() { + return true; + } + + /** + * Wait for the bucket cache to be enabled while server restart + * @param timeout time to wait for the bucket cache to be enable + * @return boolean true if the bucket cache is enabled, false otherwise + */ + default boolean waitForCacheInitialization(long timeout) { + return true; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index ef536f9e0be3..25549ab59ef1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -497,4 +497,15 @@ public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long e return l1Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset) + l2Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset); } + + @Override + public boolean waitForCacheInitialization(long timeout) { + return this.l1Cache.waitForCacheInitialization(timeout) + && this.l2Cache.waitForCacheInitialization(timeout); + } + + @Override + public boolean isCacheEnabled() { + return l1Cache.isCacheEnabled() && l2Cache.isCacheEnabled(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index e6b79cc55cca..8c9d473b53be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -34,12 +34,15 @@ public class HFilePreadReader extends HFileReaderImpl { private static final Logger LOG = LoggerFactory.getLogger(HFileReaderImpl.class); + private static final int WAIT_TIME_FOR_CACHE_INITIALIZATION = 10 * 60 * 1000; + public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf, Configuration conf) throws IOException { super(context, fileInfo, cacheConf, conf); final MutableBoolean shouldCache = new MutableBoolean(true); cacheConf.getBlockCache().ifPresent(cache -> { + cache.waitForCacheInitialization(WAIT_TIME_FOR_CACHE_INITIALIZATION); Optional result = cache.shouldCacheFile(path.getName()); shouldCache.setValue(result.isPresent() ? result.get().booleanValue() : true); }); @@ -110,8 +113,8 @@ public void run() { if (!cache.blockFitsIntoTheCache(block).orElse(true)) { LOG.warn( "Interrupting prefetch for file {} because block {} of size {} " - + "doesn't fit in the available cache space.", - path, cacheKey, block.getOnDiskSizeWithHeader()); + + "doesn't fit in the available cache space. isCacheEnabled: {}", + path, cacheKey, block.getOnDiskSizeWithHeader(), cache.isCacheEnabled()); interrupted = true; break; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 178d347c5b0f..3b39a8787ea2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -170,11 +170,23 @@ public class BucketCache implements BlockCache, HeapSize { private BucketCachePersister cachePersister; + /** + * Enum to represent the state of cache + */ + protected enum CacheState { + // Initializing: State when the cache is being initialised from persistence. + INITIALIZING, + // Enabled: State when cache is initialised and is ready. + ENABLED, + // Disabled: State when the cache is disabled. + DISABLED + } + /** * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so * that Bucket IO exceptions/errors don't bring down the HBase server. */ - private volatile boolean cacheEnabled; + private volatile CacheState cacheState; /** * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other @@ -325,6 +337,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.persistencePath = persistencePath; this.blockSize = blockSize; this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; + this.cacheState = CacheState.INITIALIZING; this.allocFailLogPrevTs = 0; @@ -336,32 +349,18 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.ramCache = new RAMCache(); this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); + instantiateWriterThreads(); if (isCachePersistent()) { if (ioEngine instanceof FileIOEngine) { startBucketCachePersisterThread(); } - try { - retrieveFromFile(bucketSizes); - } catch (IOException ioex) { - LOG.error("Can't restore from file[{}] because of ", persistencePath, ioex); - backingMap.clear(); - fullyCachedFiles.clear(); - backingMapValidated.set(true); - bucketAllocator = new BucketAllocator(capacity, bucketSizes); - regionCachedSize.clear(); - } + startPersistenceRetriever(bucketSizes, capacity); } else { bucketAllocator = new BucketAllocator(capacity, bucketSizes); + this.cacheState = CacheState.ENABLED; + startWriterThreads(); } - final String threadName = Thread.currentThread().getName(); - this.cacheEnabled = true; - for (int i = 0; i < writerThreads.length; ++i) { - writerThreads[i] = new WriterThread(writerQueues.get(i)); - writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); - writerThreads[i].setDaemon(true); - } - startWriterThreads(); // Run the statistics thread periodically to print the cache statistics log // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log @@ -371,7 +370,32 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" - + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); + + persistencePath + ", bucketAllocator=" + BucketAllocator.class.getName()); + } + + private void startPersistenceRetriever(int[] bucketSizes, long capacity) { + Runnable persistentCacheRetriever = () -> { + try { + retrieveFromFile(bucketSizes); + LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath); + } catch (IOException ioex) { + LOG.error("Can't restore from file[{}] because of ", persistencePath, ioex); + backingMap.clear(); + fullyCachedFiles.clear(); + backingMapValidated.set(true); + try { + bucketAllocator = new BucketAllocator(capacity, bucketSizes); + } catch (BucketAllocatorException ex) { + LOG.error("Exception during Bucket Allocation", ex); + } + regionCachedSize.clear(); + } finally { + this.cacheState = CacheState.ENABLED; + startWriterThreads(); + } + }; + Thread t = new Thread(persistentCacheRetriever); + t.start(); } private void sanityCheckConfigs() { @@ -394,6 +418,18 @@ private void sanityCheckConfigs() { + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); } + /** + * Called by the constructor to instantiate the writer threads. + */ + private void instantiateWriterThreads() { + final String threadName = Thread.currentThread().getName(); + for (int i = 0; i < this.writerThreads.length; ++i) { + this.writerThreads[i] = new WriterThread(this.writerQueues.get(i)); + this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); + this.writerThreads[i].setDaemon(true); + } + } + /** * Called by the constructor to start the writer threads. Used by tests that need to override * starting the threads. @@ -411,8 +447,9 @@ void startBucketCachePersisterThread() { cachePersister.start(); } - boolean isCacheEnabled() { - return this.cacheEnabled; + @Override + public boolean isCacheEnabled() { + return this.cacheState == CacheState.ENABLED; } @Override @@ -501,7 +538,7 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inM */ public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { - if (cacheEnabled) { + if (isCacheEnabled()) { if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { BucketEntry bucketEntry = backingMap.get(cacheKey); @@ -523,7 +560,7 @@ protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cachea protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { - if (!cacheEnabled) { + if (!isCacheEnabled()) { return; } if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { @@ -599,7 +636,7 @@ public BucketEntry getBlockForReference(BlockCacheKey key) { @Override public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics) { - if (!cacheEnabled) { + if (!isCacheEnabled()) { return null; } RAMQueueEntry re = ramCache.get(key); @@ -763,7 +800,7 @@ public boolean evictBlock(BlockCacheKey cacheKey) { */ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean evictedByEvictionProcess) { - if (!cacheEnabled) { + if (!isCacheEnabled()) { return false; } boolean existedInRamCache = removeFromRamCache(cacheKey); @@ -854,6 +891,10 @@ public void setCacheInconsistent(boolean setCacheInconsistent) { isCacheInconsistent.set(setCacheInconsistent); } + protected void setCacheState(CacheState state) { + cacheState = state; + } + /* * Statistics thread. Periodically output cache statistics to the log. */ @@ -873,6 +914,10 @@ public void run() { } public void logStats() { + if (!isCacheInitialized("BucketCache::logStats")) { + return; + } + long totalSize = bucketAllocator.getTotalSize(); long usedSize = bucketAllocator.getUsedSize(); long freeSize = totalSize - usedSize; @@ -905,10 +950,17 @@ public long getRealCacheSize() { } public long acceptableSize() { + if (!isCacheInitialized("BucketCache::acceptableSize")) { + return 0; + } return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); } long getPartitionSize(float partitionFactor) { + if (!isCacheInitialized("BucketCache::getPartitionSize")) { + return 0; + } + return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); } @@ -916,6 +968,10 @@ long getPartitionSize(float partitionFactor) { * Return the count of bucketSizeinfos still need free space */ private int bucketSizesAboveThresholdCount(float minFactor) { + if (!isCacheInitialized("BucketCache::bucketSizesAboveThresholdCount")) { + return 0; + } + BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); int fullCount = 0; for (int i = 0; i < stats.length; i++) { @@ -936,6 +992,10 @@ private int bucketSizesAboveThresholdCount(float minFactor) { * @param completelyFreeBucketsNeeded number of buckets to free **/ private void freeEntireBuckets(int completelyFreeBucketsNeeded) { + if (!isCacheInitialized("BucketCache::freeEntireBuckets")) { + return; + } + if (completelyFreeBucketsNeeded != 0) { // First we will build a set where the offsets are reference counted, usually // this set is small around O(Handler Count) unless something else is wrong @@ -962,6 +1022,9 @@ private void freeEntireBuckets(int completelyFreeBucketsNeeded) { * @param why Why we are being called */ void freeSpace(final String why) { + if (!isCacheInitialized("BucketCache::freeSpace")) { + return; + } // Ensure only one freeSpace progress at a time if (!freeSpaceLock.tryLock()) { return; @@ -1117,13 +1180,13 @@ void disableWriter() { public void run() { List entries = new ArrayList<>(); try { - while (cacheEnabled && writerEnabled) { + while (isCacheEnabled() && writerEnabled) { try { try { // Blocks entries = getRAMQueueEntries(inputQueue, entries); } catch (InterruptedException ie) { - if (!cacheEnabled || !writerEnabled) { + if (!isCacheEnabled() || !writerEnabled) { break; } } @@ -1135,7 +1198,7 @@ public void run() { } catch (Throwable t) { LOG.warn("Failed doing drain", t); } - LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); + LOG.info(this.getName() + " exiting, cacheEnabled=" + isCacheEnabled()); } } @@ -1226,7 +1289,7 @@ void doDrain(final List entries, ByteBuffer metaBuff) throws Inte // Index updated inside loop if success or if we can't succeed. We retry if cache is full // when we go to add an entry by going around the loop again without upping the index. int index = 0; - while (cacheEnabled && index < size) { + while (isCacheEnabled() && index < size) { RAMQueueEntry re = null; try { re = entries.get(index); @@ -1359,10 +1422,19 @@ void persistToFile() throws IOException { File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { LOG.debug("Persist in new chunked persistence format."); + persistChunkedBackingMap(fos); + + LOG.debug( + "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}," + + " file name: {}", + backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName()); } catch (IOException e) { LOG.error("Failed to persist bucket cache to file", e); throw e; + } catch (Throwable e) { + LOG.error("Failed during persist bucket cache to file: ", e); + throw e; } LOG.debug("Thread {} finished persisting bucket cache to file, renaming", Thread.currentThread().getName()); @@ -1395,7 +1467,7 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException { backingMapValidated.set(true); return; } - assert !cacheEnabled; + assert !isCacheEnabled(); try (FileInputStream in = new FileInputStream(persistenceFile)) { int pblen = ProtobufMagic.lengthOfPBMagic(); @@ -1538,6 +1610,10 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio blocksByHFile = pair.getSecond(); fullyCachedFiles.clear(); fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); + + LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", backingMap.size(), + fullyCachedFiles.size()); + verifyFileIntegrity(proto); updateRegionSizeMapWhileRetrievingFromFile(); verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); @@ -1601,7 +1677,7 @@ private void checkIOErrorIsTolerated() { // Do a single read to a local variable to avoid timing issue - HBASE-24454 long ioErrorStartTimeTmp = this.ioErrorStartTime; if (ioErrorStartTimeTmp > 0) { - if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { + if (isCacheEnabled() && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration + "ms, disabling cache, please check your IOEngine"); disableCache(); @@ -1615,9 +1691,11 @@ private void checkIOErrorIsTolerated() { * Used to shut down the cache -or- turn it off in the case of something broken. */ private void disableCache() { - if (!cacheEnabled) return; + if (!isCacheEnabled()) { + return; + } LOG.info("Disabling cache"); - cacheEnabled = false; + cacheState = CacheState.DISABLED; ioEngine.shutdown(); this.scheduleThreadPool.shutdown(); for (int i = 0; i < writerThreads.length; ++i) @@ -1698,6 +1776,9 @@ public long getCurrentDataSize() { @Override public long getFreeSize() { + if (!isCacheInitialized("BucketCache:getFreeSize")) { + return 0; + } return this.bucketAllocator.getFreeSize(); } @@ -1713,6 +1794,9 @@ public long getDataBlockCount() { @Override public long getCurrentSize() { + if (!isCacheInitialized("BucketCache::getCurrentSize")) { + return 0; + } return this.bucketAllocator.getUsedSize(); } @@ -2219,6 +2303,10 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d @Override public Optional blockFitsIntoTheCache(HFileBlock block) { + if (!isCacheInitialized("blockFitsIntoTheCache")) { + return Optional.of(false); + } + long currentUsed = bucketAllocator.getUsedSize(); boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); return Optional.of(result); @@ -2251,4 +2339,27 @@ public Optional getBlockSize(BlockCacheKey key) { } } + + boolean isCacheInitialized(String api) { + if (cacheState == CacheState.INITIALIZING) { + LOG.warn("Bucket initialisation pending at {}", api); + return false; + } + return true; + } + + @Override + public boolean waitForCacheInitialization(long timeout) { + try { + while (cacheState == CacheState.INITIALIZING) { + if (timeout <= 0) { + break; + } + Thread.sleep(100); + timeout -= 100; + } + } finally { + return isCacheEnabled(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java index e4382d2561e6..2039debeef96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java @@ -62,6 +62,8 @@ public void run() { LOG.info("Finishing cache persister thread."); } catch (InterruptedException e) { LOG.warn("Interrupting BucketCachePersister thread.", e); + } catch (Throwable e) { + LOG.error("Failed during persisting bucket cache to file: ", e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java index d73769a3971b..bc31a317aebd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; +import java.text.DecimalFormat; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -221,6 +222,9 @@ private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIn return false; } + DecimalFormat df = new DecimalFormat("#"); + df.setMaximumFractionDigits(4); + float cacheRatioDiffThreshold = 0.6f; // Conditions for moving the region @@ -240,7 +244,7 @@ private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIn LOG.debug( "Region {} moved from {} to {} as the region is cached {} equally on both servers", cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], - cluster.servers[oldServerIndex], cacheRatioOnCurrentServer); + cluster.servers[oldServerIndex], df.format(cacheRatioOnCurrentServer)); } return true; } @@ -257,7 +261,8 @@ private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIn "Region {} moved from {} to {} as region cache ratio {} is better than the current " + "cache ratio {}", cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], - cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, + df.format(cacheRatioOnCurrentServer)); } return true; } @@ -266,7 +271,8 @@ private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIn LOG.debug( "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}", cluster.regions[regionIndex], cluster.servers[currentServerIndex], - cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, + df.format(cacheRatioOnCurrentServer)); } return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 51927799f289..c307cd3320cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1602,9 +1602,9 @@ public Map> close(boolean abort, boolean ignoreStatus, boolean isGracefulStop) throws IOException { // Only allow one thread to close at a time. Serialize them so dual // threads attempting to close will run up against each other. - MonitoredTask status = TaskMonitor.get().createStatus( - "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " due to abort" : ""), - true); + MonitoredTask status = + TaskMonitor.get().createStatus("Closing region " + this.getRegionInfo().getEncodedName() + + (abort ? " due to abort" : " as it is being closed"), true); status.setStatus("Waiting for close lock"); try { synchronized (closeLock) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java index 88b0b51131ec..39bad8d3df13 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java @@ -132,6 +132,7 @@ public void testBlockEvictionOnGracefulStop() throws Exception { cluster.startRegionServer(); Thread.sleep(500); + regionServingRS.getBlockCache().get().waitForCacheInitialization(10000); long newUsedCacheSize = regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize(); assertEquals(oldUsedCacheSize, newUsedCacheSize); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index d676eb1bb638..bc7770881471 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; @@ -108,7 +109,7 @@ public class TestPrefetch { public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); @Before - public void setUp() throws IOException { + public void setUp() throws IOException, InterruptedException { conf = TEST_UTIL.getConfiguration(); conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); fs = HFileSystem.get(conf); @@ -364,16 +365,14 @@ public void testPrefetchWithDelay() throws Exception { // Wait for 20 seconds, no thread should start prefetch Thread.sleep(20000); assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted()); - while (!reader.prefetchStarted()) { - assertTrue("Prefetch delay has not been expired yet", - getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay()); - } - if (reader.prefetchStarted()) { - // Added some delay as we have started the timer a bit late. - Thread.sleep(500); - assertTrue("Prefetch should start post configured delay", - getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay()); - } + long timeout = 10000; + Waiter.waitFor(conf, 10000, () -> (reader.prefetchStarted() || reader.prefetchComplete())); + + assertTrue(reader.prefetchStarted() || reader.prefetchComplete()); + + assertTrue("Prefetch should start post configured delay", + getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay()); + conf.setInt(PREFETCH_DELAY, 1000); conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); prefetchExecutorNotifier.onConfigurationChange(conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index fbe4843d1524..f7223510bb67 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.BlockType; @@ -226,10 +227,8 @@ public void testHeapSizeChanges() throws Exception { public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { - while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { - Thread.sleep(100); - } - Thread.sleep(1000); + Waiter.waitFor(HBaseConfiguration.create(), 10000, + () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); } public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { @@ -303,6 +302,7 @@ public void testRetrieveFromFile() throws Exception { try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, smallBucketSizes, writeThreads, writerQLen, persistencePath); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertFalse(new File(persistencePath).exists()); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -330,6 +330,7 @@ public void testRetrieveFromPMem() throws Exception { try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, smallBucketSizes, writeThreads, writerQLen, persistencePath); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertFalse(new File(persistencePath).exists()); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -347,6 +348,7 @@ private void testRetrievalUtils(Path testDir, String ioEngineName) try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); @@ -363,6 +365,8 @@ private void testRetrievalUtils(Path testDir, String ioEngineName) assertTrue(new File(persistencePath).exists()); bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); + assertTrue(bucketCache.waitForCacheInitialization(10000)); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); } finally { if (bucketCache != null) { @@ -400,6 +404,7 @@ public void testRetrieveFromMultipleFiles() throws Exception { try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, smallBucketSizes, writeThreads, writerQLen, persistencePath); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertFalse(new File(persistencePath).exists()); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -413,6 +418,7 @@ public void testRetrieveFromMultipleFiles() throws Exception { public void testRetrieveFromFileWithoutPersistence() throws Exception { BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null); + assertTrue(bucketCache.waitForCacheInitialization(10000)); try { final Path testDir = createAndGetTestDir(); String ioEngineName = "file:" + testDir + "/bucket.cache"; @@ -431,6 +437,7 @@ public void testRetrieveFromFileWithoutPersistence() throws Exception { bucketCache.shutdown(); bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); } finally { bucketCache.shutdown(); @@ -460,6 +467,7 @@ public void testGetPartitionSize() throws IOException { BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); + assertTrue(cache.waitForCacheInitialization(10000)); validateGetPartitionSize(cache, 0.1f, 0.5f); validateGetPartitionSize(cache, 0.7f, 0.5f); @@ -497,6 +505,7 @@ public void testValidBucketCacheConfigs() throws IOException { BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); + assertTrue(cache.waitForCacheInitialization(10000)); assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, cache.getAcceptableFactor(), 0); @@ -570,6 +579,7 @@ private void checkConfigValues(Configuration conf, Map configMa } BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); + assertTrue(cache.waitForCacheInitialization(10000)); assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); } catch (IllegalArgumentException e) { @@ -796,6 +806,7 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); @@ -819,6 +830,7 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { // restore cache from file bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { @@ -844,6 +856,7 @@ public void testBlockAdditionWaitWhenCache() throws Exception { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, 1, 1, persistencePath); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); @@ -875,6 +888,7 @@ public void testBlockAdditionWaitWhenCache() throws Exception { // restore cache from file bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { @@ -931,6 +945,7 @@ private BucketCache testNotifyFileCachingCompleted(Path filePath, int totalBlock String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, 1, 1, null); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); HFileBlockPair[] hfileBlockPairs = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index 429fffa38f6c..facbe7c50d11 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -84,6 +84,7 @@ public void setUp() throws Exception { final int writerThreadsCount = 1; this.bc = new MockBucketCache("offheap", capacity, 1, new int[] { 1 }, writerThreadsCount, capacity, null, 100/* Tolerate ioerrors for 100ms */); + this.bc.waitForCacheInitialization(10000); assertEquals(writerThreadsCount, bc.writerThreads.length); assertEquals(writerThreadsCount, bc.writerQueues.size()); // Get reference to our single WriterThread instance. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 6d213ac8b40b..a2909c005fd4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -109,6 +109,7 @@ public void testPrefetchPersistence() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); + bucketCache.waitForCacheInitialization(10000); cacheConf = new CacheConfig(conf, bucketCache); long usedSize = bucketCache.getAllocator().getUsedSize(); @@ -127,6 +128,7 @@ public void testPrefetchPersistence() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); + bucketCache.waitForCacheInitialization(10000); cacheConf = new CacheConfig(conf, bucketCache); assertTrue(usedSize != 0); assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName())); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java index 8815c21be4cc..d05a97ed0201 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java @@ -21,12 +21,14 @@ import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.Cacheable; @@ -60,6 +62,9 @@ public void testBucketCacheRecovery() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(bucketCache.waitForCacheInitialization(1000)); + assertTrue( + bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); @@ -95,7 +100,8 @@ public void testBucketCacheRecovery() throws Exception { BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); - Thread.sleep(100); + assertTrue(newBucketCache.waitForCacheInitialization(1000)); + assertEquals(3, newBucketCache.backingMap.size()); assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); assertNull(newBucketCache.getBlock(smallerBlocks[0].getBlockName(), false, false, false)); @@ -120,6 +126,7 @@ public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(bucketCache.waitForCacheInitialization(10000)); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); @@ -134,18 +141,68 @@ public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); - Thread.sleep(100); + assertTrue(newBucketCache.waitForCacheInitialization(10000)); assertEquals(4, newBucketCache.backingMap.size()); newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName()); assertEquals(3, newBucketCache.backingMap.size()); TEST_UTIL.cleanupTestDir(); } + @Test + public void testValidateCacheInitialization() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + Configuration conf = HBaseConfiguration.create(); + // Disables the persister thread by setting its interval to MAX_VALUE + conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(bucketCache.waitForCacheInitialization(10000)); + + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); + + // Add four blocks + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); + // saves the current state of the cache + bucketCache.persistToFile(); + + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(newBucketCache.waitForCacheInitialization(10000)); + + // Set the state of bucket cache to INITIALIZING + newBucketCache.setCacheState(BucketCache.CacheState.INITIALIZING); + + // Validate that zero values are returned for the cache being initialized. + assertEquals(0, newBucketCache.acceptableSize()); + assertEquals(0, newBucketCache.getPartitionSize(1)); + assertEquals(0, newBucketCache.getFreeSize()); + assertEquals(0, newBucketCache.getCurrentSize()); + assertEquals(false, newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get()); + + newBucketCache.setCacheState(BucketCache.CacheState.ENABLED); + + // Validate that non-zero values are returned for enabled cache + assertTrue(newBucketCache.acceptableSize() > 0); + assertTrue(newBucketCache.getPartitionSize(1) > 0); + assertTrue(newBucketCache.getFreeSize() > 0); + assertTrue(newBucketCache.getCurrentSize() > 0); + assertTrue(newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get()); + + TEST_UTIL.cleanupTestDir(); + } + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { - while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { - Thread.sleep(100); - } + Waiter.waitFor(HBaseConfiguration.create(), 12000, + () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); } // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java index 35cdb55c2c13..ea131d6a94ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -100,6 +100,7 @@ public void testRetrieveFromFile() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); CacheTestUtils.HFileBlockPair[] blocks = @@ -116,6 +117,7 @@ public void testRetrieveFromFile() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); // persist cache to file bucketCache.shutdown(); @@ -128,7 +130,7 @@ public void testRetrieveFromFile() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - Thread.sleep(100); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); // Add blocks @@ -148,7 +150,7 @@ public void testRetrieveFromFile() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - Thread.sleep(100); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -167,6 +169,7 @@ public void testRetrieveFromFileAfterDelete() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -188,6 +191,7 @@ public void testRetrieveFromFileAfterDelete() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); } @@ -211,6 +215,7 @@ public void testModifiedBucketCacheFileData() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -235,7 +240,7 @@ public void testModifiedBucketCacheFileData() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - Thread.sleep(100); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -265,6 +270,7 @@ public void testModifiedBucketCacheFileTime() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -291,6 +297,7 @@ public void testModifiedBucketCacheFileTime() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); assertEquals(blockCount, bucketCache.backingMap.size()); @@ -317,6 +324,7 @@ public void testBucketCacheRecovery() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(bucketCache.waitForCacheInitialization(10000)); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); @@ -341,6 +349,7 @@ public void testBucketCacheRecovery() throws Exception { BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(newBucketCache.waitForCacheInitialization(10000)); assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); assertEquals(blocks[1].getBlock(), @@ -373,6 +382,7 @@ private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(bucketCache.waitForCacheInitialization(10000)); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks); @@ -388,7 +398,7 @@ private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); - + assertTrue(newBucketCache.waitForCacheInitialization(10000)); assertEquals(numBlocks, newBucketCache.backingMap.size()); for (int i = 0; i < numBlocks; i++) {