7878import org .apache .hadoop .hbase .nio .ByteBuff ;
7979import org .apache .hadoop .hbase .nio .RefCnt ;
8080import org .apache .hadoop .hbase .protobuf .ProtobufMagic ;
81+ import org .apache .hadoop .hbase .regionserver .HRegion ;
8182import org .apache .hadoop .hbase .util .Bytes ;
8283import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
8384import org .apache .hadoop .hbase .util .IdReadWriteLock ;
@@ -124,6 +125,12 @@ public class BucketCache implements BlockCache, HeapSize {
124125 static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE =
125126 "hbase.bucketcache.persistence.chunksize" ;
126127
128+ /** The cache age of blocks to check if the related file is present on any online regions. */
129+ static final String BLOCK_ORPHAN_GRACE_PERIOD =
130+ "hbase.bucketcache.block.orphan.evictgraceperiod.seconds" ;
131+
132+ static final long BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT = 24 * 60 * 60 * 1000L ;
133+
127134 /** Priority buckets */
128135 static final float DEFAULT_SINGLE_FACTOR = 0.25f ;
129136 static final float DEFAULT_MULTI_FACTOR = 0.50f ;
@@ -277,6 +284,10 @@ public class BucketCache implements BlockCache, HeapSize {
277284 private long allocFailLogPrevTs ; // time of previous log event for allocation failure.
278285 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000 ; // Default 1 minute.
279286
287+ private Map <String , HRegion > onlineRegions ;
288+
289+ private long orphanBlockGracePeriod = 0 ;
290+
280291 public BucketCache (String ioEngineName , long capacity , int blockSize , int [] bucketSizes ,
281292 int writerThreadNum , int writerQLen , String persistencePath ) throws IOException {
282293 this (ioEngineName , capacity , blockSize , bucketSizes , writerThreadNum , writerQLen ,
@@ -286,11 +297,21 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
286297 public BucketCache (String ioEngineName , long capacity , int blockSize , int [] bucketSizes ,
287298 int writerThreadNum , int writerQLen , String persistencePath , int ioErrorsTolerationDuration ,
288299 Configuration conf ) throws IOException {
300+ this (ioEngineName , capacity , blockSize , bucketSizes , writerThreadNum , writerQLen ,
301+ persistencePath , ioErrorsTolerationDuration , conf , null );
302+ }
303+
304+ public BucketCache (String ioEngineName , long capacity , int blockSize , int [] bucketSizes ,
305+ int writerThreadNum , int writerQLen , String persistencePath , int ioErrorsTolerationDuration ,
306+ Configuration conf , Map <String , HRegion > onlineRegions ) throws IOException {
289307 Preconditions .checkArgument (blockSize > 0 ,
290308 "BucketCache capacity is set to " + blockSize + ", can not be less than 0" );
291309 this .algorithm = conf .get (FILE_VERIFY_ALGORITHM , DEFAULT_FILE_VERIFY_ALGORITHM );
292310 this .ioEngine = getIOEngineFromName (ioEngineName , capacity , persistencePath );
293311 this .writerThreads = new WriterThread [writerThreadNum ];
312+ this .onlineRegions = onlineRegions ;
313+ this .orphanBlockGracePeriod =
314+ conf .getLong (BLOCK_ORPHAN_GRACE_PERIOD , BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT );
294315 long blockNumCapacity = capacity / blockSize ;
295316 if (blockNumCapacity >= Integer .MAX_VALUE ) {
296317 // Enough for about 32TB of cache!
@@ -924,6 +945,29 @@ private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
924945 }
925946 }
926947
948+ private long calculateBytesToFree (StringBuilder msgBuffer ) {
949+ long bytesToFreeWithoutExtra = 0 ;
950+ BucketAllocator .IndexStatistics [] stats = bucketAllocator .getIndexStatistics ();
951+ long [] bytesToFreeForBucket = new long [stats .length ];
952+ for (int i = 0 ; i < stats .length ; i ++) {
953+ bytesToFreeForBucket [i ] = 0 ;
954+ long freeGoal = (long ) Math .floor (stats [i ].totalCount () * (1 - minFactor ));
955+ freeGoal = Math .max (freeGoal , 1 );
956+ if (stats [i ].freeCount () < freeGoal ) {
957+ bytesToFreeForBucket [i ] = stats [i ].itemSize () * (freeGoal - stats [i ].freeCount ());
958+ bytesToFreeWithoutExtra += bytesToFreeForBucket [i ];
959+ if (msgBuffer != null ) {
960+ msgBuffer .append ("Free for bucketSize(" + stats [i ].itemSize () + ")="
961+ + StringUtils .byteDesc (bytesToFreeForBucket [i ]) + ", " );
962+ }
963+ }
964+ }
965+ if (msgBuffer != null ) {
966+ msgBuffer .append ("Free for total=" + StringUtils .byteDesc (bytesToFreeWithoutExtra ) + ", " );
967+ }
968+ return bytesToFreeWithoutExtra ;
969+ }
970+
927971 /**
928972 * Free the space if the used size reaches acceptableSize() or one size block couldn't be
929973 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some
@@ -937,43 +981,21 @@ void freeSpace(final String why) {
937981 }
938982 try {
939983 freeInProgress = true ;
940- long bytesToFreeWithoutExtra = 0 ;
941- // Calculate free byte for each bucketSizeinfo
942984 StringBuilder msgBuffer = LOG .isDebugEnabled () ? new StringBuilder () : null ;
943- BucketAllocator .IndexStatistics [] stats = bucketAllocator .getIndexStatistics ();
944- long [] bytesToFreeForBucket = new long [stats .length ];
945- for (int i = 0 ; i < stats .length ; i ++) {
946- bytesToFreeForBucket [i ] = 0 ;
947- long freeGoal = (long ) Math .floor (stats [i ].totalCount () * (1 - minFactor ));
948- freeGoal = Math .max (freeGoal , 1 );
949- if (stats [i ].freeCount () < freeGoal ) {
950- bytesToFreeForBucket [i ] = stats [i ].itemSize () * (freeGoal - stats [i ].freeCount ());
951- bytesToFreeWithoutExtra += bytesToFreeForBucket [i ];
952- if (msgBuffer != null ) {
953- msgBuffer .append ("Free for bucketSize(" + stats [i ].itemSize () + ")="
954- + StringUtils .byteDesc (bytesToFreeForBucket [i ]) + ", " );
955- }
956- }
957- }
958- if (msgBuffer != null ) {
959- msgBuffer .append ("Free for total=" + StringUtils .byteDesc (bytesToFreeWithoutExtra ) + ", " );
960- }
961-
985+ long bytesToFreeWithoutExtra = calculateBytesToFree (msgBuffer );
962986 if (bytesToFreeWithoutExtra <= 0 ) {
963987 return ;
964988 }
965989 long currentSize = bucketAllocator .getUsedSize ();
966990 long totalSize = bucketAllocator .getTotalSize ();
967991 if (LOG .isDebugEnabled () && msgBuffer != null ) {
968- LOG .debug ("Free started because \" " + why + "\" ; " + msgBuffer . toString ()
969- + " of current used=" + StringUtils .byteDesc (currentSize ) + ", actual cacheSize="
992+ LOG .debug ("Free started because \" " + why + "\" ; " + msgBuffer + " of current used="
993+ + StringUtils .byteDesc (currentSize ) + ", actual cacheSize="
970994 + StringUtils .byteDesc (realCacheSize .sum ()) + ", total="
971995 + StringUtils .byteDesc (totalSize ));
972996 }
973-
974997 long bytesToFreeWithExtra =
975998 (long ) Math .floor (bytesToFreeWithoutExtra * (1 + extraFreeFactor ));
976-
977999 // Instantiate priority buckets
9781000 BucketEntryGroup bucketSingle =
9791001 new BucketEntryGroup (bytesToFreeWithExtra , blockSize , getPartitionSize (singleFactor ));
@@ -982,10 +1004,48 @@ void freeSpace(final String why) {
9821004 BucketEntryGroup bucketMemory =
9831005 new BucketEntryGroup (bytesToFreeWithExtra , blockSize , getPartitionSize (memoryFactor ));
9841006
1007+ Set <String > allValidFiles = null ;
1008+ // We need the region/stores/files tree, in order to figure out if a block is "orphan" or not.
1009+ // See further comments below for more details.
1010+ if (onlineRegions != null ) {
1011+ allValidFiles = BlockCacheUtil .listAllFilesNames (onlineRegions );
1012+ }
1013+ // the cached time is recored in nanos, so we need to convert the grace period accordingly
1014+ long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000 ;
1015+ long bytesFreed = 0 ;
9851016 // Scan entire map putting bucket entry into appropriate bucket entry
9861017 // group
9871018 for (Map .Entry <BlockCacheKey , BucketEntry > bucketEntryWithKey : backingMap .entrySet ()) {
988- switch (bucketEntryWithKey .getValue ().getPriority ()) {
1019+ BlockCacheKey key = bucketEntryWithKey .getKey ();
1020+ BucketEntry entry = bucketEntryWithKey .getValue ();
1021+ // Under certain conditions, blocks for regions not on the current region server might
1022+ // be hanging on the cache. For example, when using the persistent cache feature, if the
1023+ // RS crashes, then if not the same regions are assigned back once its online again, blocks
1024+ // for the previous online regions would be recovered and stay in the cache. These would be
1025+ // "orphan" blocks, as the files these blocks belong to are not in any of the online
1026+ // regions.
1027+ // "Orphan" blocks are a pure waste of cache space and should be evicted first during
1028+ // the freespace run.
1029+ // Compactions and Flushes may cache blocks before its files are completely written. In
1030+ // these cases the file won't be found in any of the online regions stores, but the block
1031+ // shouldn't be evicted. To avoid this, we defined this
1032+ // hbase.bucketcache.block.orphan.evictgraceperiod property, to account for a grace
1033+ // period (default 24 hours) where a block should be checked if it's an orphan block.
1034+ if (
1035+ allValidFiles != null
1036+ && entry .getCachedTime () < (System .nanoTime () - orphanGracePeriodNanos )
1037+ ) {
1038+ if (!allValidFiles .contains (key .getHfileName ())) {
1039+ if (evictBucketEntryIfNoRpcReferenced (key , entry )) {
1040+ // We calculate the freed bytes, but we don't stop if the goal was reached because
1041+ // these are orphan blocks anyway, so let's leverage this run of freeSpace
1042+ // to get rid of all orphans at once.
1043+ bytesFreed += entry .getLength ();
1044+ continue ;
1045+ }
1046+ }
1047+ }
1048+ switch (entry .getPriority ()) {
9891049 case SINGLE : {
9901050 bucketSingle .add (bucketEntryWithKey );
9911051 break ;
@@ -1000,7 +1060,6 @@ void freeSpace(final String why) {
10001060 }
10011061 }
10021062 }
1003-
10041063 PriorityQueue <BucketEntryGroup > bucketQueue =
10051064 new PriorityQueue <>(3 , Comparator .comparingLong (BucketEntryGroup ::overflow ));
10061065
@@ -1009,7 +1068,6 @@ void freeSpace(final String why) {
10091068 bucketQueue .add (bucketMemory );
10101069
10111070 int remainingBuckets = bucketQueue .size ();
1012- long bytesFreed = 0 ;
10131071
10141072 BucketEntryGroup bucketGroup ;
10151073 while ((bucketGroup = bucketQueue .poll ()) != null ) {
@@ -1026,18 +1084,15 @@ void freeSpace(final String why) {
10261084 if (bucketSizesAboveThresholdCount (minFactor ) > 0 ) {
10271085 bucketQueue .clear ();
10281086 remainingBuckets = 3 ;
1029-
10301087 bucketQueue .add (bucketSingle );
10311088 bucketQueue .add (bucketMulti );
10321089 bucketQueue .add (bucketMemory );
1033-
10341090 while ((bucketGroup = bucketQueue .poll ()) != null ) {
10351091 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed ) / remainingBuckets ;
10361092 bytesFreed += bucketGroup .free (bucketBytesToFree );
10371093 remainingBuckets --;
10381094 }
10391095 }
1040-
10411096 // Even after the above free we might still need freeing because of the
10421097 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e
10431098 // there might be some buckets where the occupancy is very sparse and thus are not
@@ -1056,7 +1111,6 @@ void freeSpace(final String why) {
10561111 + StringUtils .byteDesc (multi ) + ", " + "memory=" + StringUtils .byteDesc (memory ));
10571112 }
10581113 }
1059-
10601114 } catch (Throwable t ) {
10611115 LOG .warn ("Failed freeing space" , t );
10621116 } finally {
0 commit comments