2525import java .io .IOException ;
2626import java .nio .ByteBuffer ;
2727import java .util .ArrayList ;
28+ import java .util .Arrays ;
2829import java .util .Collections ;
2930import java .util .Comparator ;
3031import java .util .HashSet ;
@@ -120,6 +121,8 @@ public class BucketCache implements BlockCache, HeapSize {
120121 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor" ;
121122 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor" ;
122123 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor" ;
124+ static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE =
125+ "hbase.bucketcache.persistence.chunksize" ;
123126
124127 /** Priority buckets */
125128 static final float DEFAULT_SINGLE_FACTOR = 0.25f ;
@@ -139,6 +142,8 @@ public class BucketCache implements BlockCache, HeapSize {
139142 final static int DEFAULT_WRITER_THREADS = 3 ;
140143 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64 ;
141144
145+ final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000000 ;
146+
142147 // Store/read block data
143148 transient final IOEngine ioEngine ;
144149
@@ -266,6 +271,8 @@ public class BucketCache implements BlockCache, HeapSize {
266271 */
267272 private String algorithm ;
268273
274+ private long persistenceChunkSize ;
275+
269276 /* Tracing failed Bucket Cache allocations. */
270277 private long allocFailLogPrevTs ; // time of previous log event for allocation failure.
271278 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000 ; // Default 1 minute.
@@ -299,6 +306,11 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
299306 this .queueAdditionWaitTime =
300307 conf .getLong (QUEUE_ADDITION_WAIT_TIME , DEFAULT_QUEUE_ADDITION_WAIT_TIME );
301308 this .bucketcachePersistInterval = conf .getLong (BUCKETCACHE_PERSIST_INTERVAL_KEY , 1000 );
309+ this .persistenceChunkSize =
310+ conf .getLong (BACKING_MAP_PERSISTENCE_CHUNK_SIZE , DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE );
311+ if (this .persistenceChunkSize <= 0 ) {
312+ persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE ;
313+ }
302314
303315 sanityCheckConfigs ();
304316
@@ -1314,8 +1326,8 @@ void persistToFile() throws IOException {
13141326 }
13151327 File tempPersistencePath = new File (persistencePath + EnvironmentEdgeManager .currentTime ());
13161328 try (FileOutputStream fos = new FileOutputStream (tempPersistencePath , false )) {
1317- fos . write ( ProtobufMagic . PB_MAGIC );
1318- BucketProtoUtils . toPB ( this ). writeDelimitedTo (fos );
1329+ LOG . debug ( "Persist in new chunked persistence format." );
1330+ persistChunkedBackingMap (fos );
13191331 } catch (IOException e ) {
13201332 LOG .error ("Failed to persist bucket cache to file" , e );
13211333 throw e ;
@@ -1357,16 +1369,24 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException {
13571369 int pblen = ProtobufMagic .lengthOfPBMagic ();
13581370 byte [] pbuf = new byte [pblen ];
13591371 IOUtils .readFully (in , pbuf , 0 , pblen );
1360- if (!ProtobufMagic .isPBMagicPrefix (pbuf )) {
1372+
1373+ if (ProtobufMagic .isPBMagicPrefix (pbuf )) {
1374+ LOG .info ("Reading old format of persistence." );
1375+ // The old non-chunked version of backing map persistence.
1376+ parsePB (BucketCacheProtos .BucketCacheEntry .parseDelimitedFrom (in ));
1377+ } else if (Arrays .equals (pbuf , BucketProtoUtils .PB_MAGIC_V2 )) {
1378+ // The new persistence format of chunked persistence.
1379+ LOG .info ("Reading new chunked format of persistence." );
1380+ retrieveChunkedBackingMap (in , bucketSizes );
1381+ } else {
13611382 // In 3.0 we have enough flexibility to dump the old cache data.
13621383 // TODO: In 2.x line, this might need to be filled in to support reading the old format
13631384 throw new IOException (
13641385 "Persistence file does not start with protobuf magic number. " + persistencePath );
13651386 }
1366- parsePB (BucketCacheProtos .BucketCacheEntry .parseDelimitedFrom (in ));
13671387 bucketAllocator = new BucketAllocator (cacheCapacity , bucketSizes , backingMap , realCacheSize );
13681388 blockNumber .add (backingMap .size ());
1369- LOG .info ("Bucket cache retrieved from file successfully" );
1389+ LOG .info ("Bucket cache retrieved from file successfully with size: {}" , backingMap . size () );
13701390 }
13711391 }
13721392
@@ -1409,6 +1429,75 @@ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String
14091429 }
14101430 }
14111431
1432+ private void verifyFileIntegrity (BucketCacheProtos .BucketCacheEntry proto ) {
1433+ try {
1434+ if (proto .hasChecksum ()) {
1435+ ((PersistentIOEngine ) ioEngine ).verifyFileIntegrity (proto .getChecksum ().toByteArray (),
1436+ algorithm );
1437+ }
1438+ backingMapValidated .set (true );
1439+ } catch (IOException e ) {
1440+ LOG .warn ("Checksum for cache file failed. "
1441+ + "We need to validate each cache key in the backing map. "
1442+ + "This may take some time, so we'll do it in a background thread," );
1443+
1444+ Runnable cacheValidator = () -> {
1445+ while (bucketAllocator == null ) {
1446+ try {
1447+ Thread .sleep (50 );
1448+ } catch (InterruptedException ex ) {
1449+ throw new RuntimeException (ex );
1450+ }
1451+ }
1452+ long startTime = EnvironmentEdgeManager .currentTime ();
1453+ int totalKeysOriginally = backingMap .size ();
1454+ for (Map .Entry <BlockCacheKey , BucketEntry > keyEntry : backingMap .entrySet ()) {
1455+ try {
1456+ ((FileIOEngine ) ioEngine ).checkCacheTime (keyEntry .getValue ());
1457+ } catch (IOException e1 ) {
1458+ LOG .debug ("Check for key {} failed. Evicting." , keyEntry .getKey ());
1459+ evictBlock (keyEntry .getKey ());
1460+ fileNotFullyCached (keyEntry .getKey ().getHfileName ());
1461+ }
1462+ }
1463+ backingMapValidated .set (true );
1464+ LOG .info ("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms." ,
1465+ totalKeysOriginally , backingMap .size (),
1466+ (EnvironmentEdgeManager .currentTime () - startTime ));
1467+ };
1468+ Thread t = new Thread (cacheValidator );
1469+ t .setDaemon (true );
1470+ t .start ();
1471+ }
1472+ }
1473+
1474+ private void parsePB (BucketCacheProtos .BucketCacheEntry firstChunk ,
1475+ List <BucketCacheProtos .BackingMap > chunks ) throws IOException {
1476+ fullyCachedFiles .clear ();
1477+ Pair <ConcurrentHashMap <BlockCacheKey , BucketEntry >, NavigableSet <BlockCacheKey >> pair =
1478+ BucketProtoUtils .fromPB (firstChunk .getDeserializersMap (), firstChunk .getBackingMap (),
1479+ this ::createRecycler );
1480+ backingMap .putAll (pair .getFirst ());
1481+ blocksByHFile .addAll (pair .getSecond ());
1482+ fullyCachedFiles .putAll (BucketProtoUtils .fromPB (firstChunk .getCachedFilesMap ()));
1483+
1484+ LOG .debug ("Number of blocks after first chunk: {}, blocksByHFile: {}" , backingMap .size (),
1485+ fullyCachedFiles .size ());
1486+ int i = 1 ;
1487+ for (BucketCacheProtos .BackingMap chunk : chunks ) {
1488+ Pair <ConcurrentHashMap <BlockCacheKey , BucketEntry >, NavigableSet <BlockCacheKey >> pair2 =
1489+ BucketProtoUtils .fromPB (firstChunk .getDeserializersMap (), chunk , this ::createRecycler );
1490+ backingMap .putAll (pair2 .getFirst ());
1491+ blocksByHFile .addAll (pair2 .getSecond ());
1492+ LOG .debug ("Number of blocks after {} reading chunk: {}, blocksByHFile: {}" , ++i ,
1493+ backingMap .size (), fullyCachedFiles .size ());
1494+ }
1495+ verifyFileIntegrity (firstChunk );
1496+ verifyCapacityAndClasses (firstChunk .getCacheCapacity (), firstChunk .getIoClass (),
1497+ firstChunk .getMapClass ());
1498+ updateRegionSizeMapWhileRetrievingFromFile ();
1499+ }
1500+
14121501 private void parsePB (BucketCacheProtos .BucketCacheEntry proto ) throws IOException {
14131502 Pair <ConcurrentHashMap <BlockCacheKey , BucketEntry >, NavigableSet <BlockCacheKey >> pair =
14141503 BucketProtoUtils .fromPB (proto .getDeserializersMap (), proto .getBackingMap (),
@@ -1417,52 +1506,60 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio
14171506 blocksByHFile = pair .getSecond ();
14181507 fullyCachedFiles .clear ();
14191508 fullyCachedFiles .putAll (BucketProtoUtils .fromPB (proto .getCachedFilesMap ()));
1420- if (proto .hasChecksum ()) {
1421- try {
1422- ((PersistentIOEngine ) ioEngine ).verifyFileIntegrity (proto .getChecksum ().toByteArray (),
1423- algorithm );
1424- backingMapValidated .set (true );
1425- } catch (IOException e ) {
1426- LOG .warn ("Checksum for cache file failed. "
1427- + "We need to validate each cache key in the backing map. "
1428- + "This may take some time, so we'll do it in a background thread," );
1429- Runnable cacheValidator = () -> {
1430- while (bucketAllocator == null ) {
1431- try {
1432- Thread .sleep (50 );
1433- } catch (InterruptedException ex ) {
1434- throw new RuntimeException (ex );
1435- }
1436- }
1437- long startTime = EnvironmentEdgeManager .currentTime ();
1438- int totalKeysOriginally = backingMap .size ();
1439- for (Map .Entry <BlockCacheKey , BucketEntry > keyEntry : backingMap .entrySet ()) {
1440- try {
1441- ((FileIOEngine ) ioEngine ).checkCacheTime (keyEntry .getValue ());
1442- } catch (IOException e1 ) {
1443- LOG .debug ("Check for key {} failed. Evicting." , keyEntry .getKey ());
1444- evictBlock (keyEntry .getKey ());
1445- fileNotFullyCached (keyEntry .getKey ().getHfileName ());
1446- }
1447- }
1448- backingMapValidated .set (true );
1449- LOG .info ("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms." ,
1450- totalKeysOriginally , backingMap .size (),
1451- (EnvironmentEdgeManager .currentTime () - startTime ));
1452- };
1453- Thread t = new Thread (cacheValidator );
1454- t .setDaemon (true );
1455- t .start ();
1456- }
1457- } else {
1458- // if has not checksum, it means the persistence file is old format
1459- LOG .info ("Persistent file is old format, it does not support verifying file integrity!" );
1460- backingMapValidated .set (true );
1461- }
1509+ verifyFileIntegrity (proto );
14621510 updateRegionSizeMapWhileRetrievingFromFile ();
14631511 verifyCapacityAndClasses (proto .getCacheCapacity (), proto .getIoClass (), proto .getMapClass ());
14641512 }
14651513
1514+ private void persistChunkedBackingMap (FileOutputStream fos ) throws IOException {
1515+ long numChunks = backingMap .size () / persistenceChunkSize ;
1516+ if (backingMap .size () % persistenceChunkSize != 0 ) {
1517+ numChunks += 1 ;
1518+ }
1519+
1520+ LOG .debug (
1521+ "persistToFile: before persisting backing map size: {}, "
1522+ + "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {}" ,
1523+ backingMap .size (), fullyCachedFiles .size (), persistenceChunkSize , numChunks );
1524+
1525+ BucketProtoUtils .serializeAsPB (this , fos , persistenceChunkSize , numChunks );
1526+
1527+ LOG .debug (
1528+ "persistToFile: after persisting backing map size: {}, "
1529+ + "fullycachedFiles size: {}, numChunksPersisteed: {}" ,
1530+ backingMap .size (), fullyCachedFiles .size (), numChunks );
1531+ }
1532+
1533+ private void retrieveChunkedBackingMap (FileInputStream in , int [] bucketSizes ) throws IOException {
1534+ byte [] bytes = new byte [Long .BYTES ];
1535+ int readSize = in .read (bytes );
1536+ if (readSize != Long .BYTES ) {
1537+ throw new IOException ("Invalid size of chunk-size read from persistence: " + readSize );
1538+ }
1539+ long batchSize = Bytes .toLong (bytes , 0 );
1540+
1541+ readSize = in .read (bytes );
1542+ if (readSize != Long .BYTES ) {
1543+ throw new IOException ("Invalid size for number of chunks read from persistence: " + readSize );
1544+ }
1545+ long numChunks = Bytes .toLong (bytes , 0 );
1546+
1547+ LOG .info ("Number of chunks: {}, chunk size: {}" , numChunks , batchSize );
1548+
1549+ ArrayList <BucketCacheProtos .BackingMap > bucketCacheMaps = new ArrayList <>();
1550+ // Read the first chunk that has all the details.
1551+ BucketCacheProtos .BucketCacheEntry firstChunk =
1552+ BucketCacheProtos .BucketCacheEntry .parseDelimitedFrom (in );
1553+
1554+ // Subsequent chunks have the backingMap entries.
1555+ for (int i = 1 ; i < numChunks ; i ++) {
1556+ LOG .info ("Reading chunk no: {}" , i + 1 );
1557+ bucketCacheMaps .add (BucketCacheProtos .BackingMap .parseDelimitedFrom (in ));
1558+ LOG .info ("Retrieved chunk: {}" , i + 1 );
1559+ }
1560+ parsePB (firstChunk , bucketCacheMaps );
1561+ }
1562+
14661563 /**
14671564 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors
14681565 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache
0 commit comments