Skip to content

Commit b282161

Browse files
jhungundwchevreuil
authored andcommitted
HBASE-28468: Integrate the data-tiering logic into cache evictions. (#5829)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
1 parent 3e11464 commit b282161

3 files changed

Lines changed: 253 additions & 3 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1075,6 +1075,14 @@ void freeSpace(final String why) {
10751075
// the cached time is recored in nanos, so we need to convert the grace period accordingly
10761076
long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000;
10771077
long bytesFreed = 0;
1078+
// Check the list of files to determine the cold files which can be readily evicted.
1079+
Map<String, String> coldFiles = null;
1080+
try {
1081+
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
1082+
coldFiles = dataTieringManager.getColdFilesList();
1083+
} catch (IllegalStateException e) {
1084+
LOG.warn("Data Tiering Manager is not set. Ignore time-based block evictions.");
1085+
}
10781086
// Scan entire map putting bucket entry into appropriate bucket entry
10791087
// group
10801088
for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
@@ -1107,6 +1115,17 @@ void freeSpace(final String why) {
11071115
}
11081116
}
11091117
}
1118+
1119+
if (bytesFreed < bytesToFreeWithExtra &&
1120+
coldFiles != null && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
1121+
) {
1122+
int freedBlockSize = bucketEntryWithKey.getValue().getLength();
1123+
if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) {
1124+
bytesFreed += freedBlockSize;
1125+
}
1126+
continue;
1127+
}
1128+
11101129
switch (entry.getPriority()) {
11111130
case SINGLE: {
11121131
bucketSingle.add(bucketEntryWithKey);
@@ -1122,6 +1141,22 @@ void freeSpace(final String why) {
11221141
}
11231142
}
11241143
}
1144+
1145+
// Check if the cold file eviction is sufficient to create enough space.
1146+
bytesToFreeWithExtra -= bytesFreed;
1147+
if (bytesToFreeWithExtra <= 0) {
1148+
LOG.debug("Bucket cache free space completed; freed space : {} bytes of cold data blocks.",
1149+
StringUtils.byteDesc(bytesFreed));
1150+
return;
1151+
}
1152+
1153+
if (LOG.isDebugEnabled()) {
1154+
LOG.debug(
1155+
"Bucket cache free space completed; freed space : {} "
1156+
+ "bytes of cold data blocks. {} more bytes required to be freed.",
1157+
StringUtils.byteDesc(bytesFreed), bytesToFreeWithExtra);
1158+
}
1159+
11251160
PriorityQueue<BucketEntryGroup> bucketQueue =
11261161
new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));
11271162

@@ -1130,7 +1165,6 @@ void freeSpace(final String why) {
11301165
bucketQueue.add(bucketMemory);
11311166

11321167
int remainingBuckets = bucketQueue.size();
1133-
11341168
BucketEntryGroup bucketGroup;
11351169
while ((bucketGroup = bucketQueue.poll()) != null) {
11361170
long overflow = bucketGroup.overflow();

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
2121

2222
import java.io.IOException;
23+
import java.util.HashMap;
2324
import java.util.HashSet;
2425
import java.util.Map;
2526
import java.util.OptionalLong;
@@ -173,12 +174,12 @@ private boolean hotDataValidator(long maxTimestamp, long hotDataAge) {
173174
private long getMaxTimestamp(Path hFilePath) throws DataTieringException {
174175
HStoreFile hStoreFile = getHStoreFile(hFilePath);
175176
if (hStoreFile == null) {
176-
LOG.error("HStoreFile corresponding to " + hFilePath + " doesn't exist");
177+
LOG.error("HStoreFile corresponding to {} doesn't exist", hFilePath);
177178
return Long.MAX_VALUE;
178179
}
179180
OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
180181
if (!maxTimestamp.isPresent()) {
181-
LOG.error("Maximum timestamp not present for " + hFilePath);
182+
LOG.error("Maximum timestamp not present for {}", hFilePath);
182183
return Long.MAX_VALUE;
183184
}
184185
return maxTimestamp.getAsLong();
@@ -270,4 +271,41 @@ private long getDataTieringHotDataAge(Configuration conf) {
270271
return Long.parseLong(
271272
conf.get(DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(DEFAULT_DATATIERING_HOT_DATA_AGE)));
272273
}
274+
275+
/*
276+
* This API traverses through the list of online regions and returns a subset of these files-names
277+
* that are cold.
278+
* @return List of names of files with cold data as per data-tiering logic.
279+
*/
280+
public Map<String, String> getColdFilesList() {
281+
Map<String, String> coldFiles = new HashMap<>();
282+
for (HRegion r : this.onlineRegions.values()) {
283+
for (HStore hStore : r.getStores()) {
284+
Configuration conf = hStore.getReadOnlyConfiguration();
285+
if (getDataTieringType(conf) != DataTieringType.TIME_RANGE) {
286+
// Data-Tiering not enabled for the store. Just skip it.
287+
continue;
288+
}
289+
Long hotDataAge = getDataTieringHotDataAge(conf);
290+
291+
for (HStoreFile hStoreFile : hStore.getStorefiles()) {
292+
String hFileName =
293+
hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName();
294+
OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
295+
if (!maxTimestamp.isPresent()) {
296+
LOG.warn("maxTimestamp missing for file: {}",
297+
hStoreFile.getFileInfo().getActiveFileName());
298+
continue;
299+
}
300+
long currentTimestamp = EnvironmentEdgeManager.getDelegate().currentTime();
301+
long fileAge = currentTimestamp - maxTimestamp.getAsLong();
302+
if (fileAge > hotDataAge) {
303+
// Values do not matter.
304+
coldFiles.put(hFileName, null);
305+
}
306+
}
307+
}
308+
}
309+
return coldFiles;
310+
}
273311
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.regionserver;
1919

2020
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
21+
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertTrue;
2324
import static org.junit.Assert.fail;
@@ -51,7 +52,9 @@
5152
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
5253
import org.apache.hadoop.hbase.io.hfile.BlockType;
5354
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
55+
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
5456
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
57+
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
5558
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
5659
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
5760
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -260,6 +263,181 @@ public void testColdDataFiles() {
260263
}
261264
}
262265

266+
@Test
267+
public void testPickColdDataFiles() {
268+
Map<String, String> coldDataFiles = dataTieringManager.getColdFilesList();
269+
assertEquals(1, coldDataFiles.size());
270+
// hStoreFiles[3] is the cold file.
271+
assert (coldDataFiles.containsKey(hStoreFiles.get(3).getFileInfo().getActiveFileName()));
272+
}
273+
274+
/*
275+
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity. The hot file
276+
* remains in the cache.
277+
*/
278+
@Test
279+
public void testBlockEvictions() throws Exception {
280+
long capacitySize = 40 * 1024;
281+
int writeThreads = 3;
282+
int writerQLen = 64;
283+
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
284+
285+
// Setup: Create a bucket cache with lower capacity
286+
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
287+
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
288+
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
289+
290+
// Create three Cache keys with cold data files and a block with hot data.
291+
// hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a hot file.
292+
Set<BlockCacheKey> cacheKeys = new HashSet<>();
293+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
294+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
295+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
296+
297+
// Create dummy data to be cached and fill the cache completely.
298+
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
299+
300+
int blocksIter = 0;
301+
for (BlockCacheKey key : cacheKeys) {
302+
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
303+
// Ensure that the block is persisted to the file.
304+
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
305+
}
306+
307+
// Verify that the bucket cache contains 3 blocks.
308+
assertEquals(3, bucketCache.getBackingMap().keySet().size());
309+
310+
// Add an additional block into cache with hot data which should trigger the eviction
311+
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
312+
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
313+
314+
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
315+
Waiter.waitFor(defaultConf, 10000, 100,
316+
() -> (bucketCache.getBackingMap().containsKey(newKey)));
317+
318+
// Verify that the bucket cache now contains 2 hot blocks blocks only.
319+
// Both cold blocks of 8KB will be evicted to make room for 1 block of 8KB + an additional
320+
// space.
321+
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
322+
}
323+
324+
/*
325+
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity, but one cold
326+
* block remains in the cache since the required space is freed.
327+
*/
328+
@Test
329+
public void testBlockEvictionsAllColdBlocks() throws Exception {
330+
long capacitySize = 40 * 1024;
331+
int writeThreads = 3;
332+
int writerQLen = 64;
333+
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
334+
335+
// Setup: Create a bucket cache with lower capacity
336+
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
337+
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
338+
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
339+
340+
// Create three Cache keys with three cold data blocks.
341+
// hStoreFiles.get(3) is a cold data file.
342+
Set<BlockCacheKey> cacheKeys = new HashSet<>();
343+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
344+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
345+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 16384, true, BlockType.DATA));
346+
347+
// Create dummy data to be cached and fill the cache completely.
348+
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
349+
350+
int blocksIter = 0;
351+
for (BlockCacheKey key : cacheKeys) {
352+
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
353+
// Ensure that the block is persisted to the file.
354+
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
355+
}
356+
357+
// Verify that the bucket cache contains 3 blocks.
358+
assertEquals(3, bucketCache.getBackingMap().keySet().size());
359+
360+
// Add an additional block into cache with hot data which should trigger the eviction
361+
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
362+
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
363+
364+
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
365+
Waiter.waitFor(defaultConf, 10000, 100,
366+
() -> (bucketCache.getBackingMap().containsKey(newKey)));
367+
368+
// Verify that the bucket cache now contains 1 cold block and a newly added hot block.
369+
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
370+
}
371+
372+
/*
373+
* Verify that a hot block evicted along with a cold block when bucket reaches its capacity.
374+
*/
375+
@Test
376+
public void testBlockEvictionsHotBlocks() throws Exception {
377+
long capacitySize = 40 * 1024;
378+
int writeThreads = 3;
379+
int writerQLen = 64;
380+
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
381+
382+
// Setup: Create a bucket cache with lower capacity
383+
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
384+
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
385+
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
386+
387+
// Create three Cache keys with two hot data blocks and one cold data block
388+
// hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
389+
Set<BlockCacheKey> cacheKeys = new HashSet<>();
390+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
391+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA));
392+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
393+
394+
// Create dummy data to be cached and fill the cache completely.
395+
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
396+
397+
int blocksIter = 0;
398+
for (BlockCacheKey key : cacheKeys) {
399+
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
400+
// Ensure that the block is persisted to the file.
401+
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
402+
}
403+
404+
// Verify that the bucket cache contains 3 blocks.
405+
assertEquals(3, bucketCache.getBackingMap().keySet().size());
406+
407+
// Add an additional block which should evict the only cold block with an additional hot block.
408+
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
409+
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
410+
411+
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
412+
Waiter.waitFor(defaultConf, 10000, 100,
413+
() -> (bucketCache.getBackingMap().containsKey(newKey)));
414+
415+
// Verify that the bucket cache now contains 2 hot blocks.
416+
// Only one of the older hot blocks is retained and other one is the newly added hot block.
417+
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
418+
}
419+
420+
private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, int expectedHotBlocks,
421+
int expectedColdBlocks) {
422+
int numHotBlocks = 0, numColdBlocks = 0;
423+
424+
assertEquals(expectedTotalKeys, keys.size());
425+
int iter = 0;
426+
for (BlockCacheKey key : keys) {
427+
try {
428+
if (dataTieringManager.isHotData(key)) {
429+
numHotBlocks++;
430+
} else {
431+
numColdBlocks++;
432+
}
433+
} catch (Exception e) {
434+
fail("Unexpected exception!");
435+
}
436+
}
437+
assertEquals(expectedHotBlocks, numHotBlocks);
438+
assertEquals(expectedColdBlocks, numColdBlocks);
439+
}
440+
263441
private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath caller, Path path,
264442
boolean expectedResult, DataTieringException exception) {
265443
try {

0 commit comments

Comments
 (0)