Skip to content

Commit b74429a

Browse files
committed
HBASE-27999 Implement cache prefetch aware load balancer (#5527)
this commit is part of the rebase of HBASE-28186 Signed-off-by: Wellington Chevreuil <[email protected]> Signed-off-by: Tak Lon (Stephen) Wu <[email protected]> Co-authored-by: Rahul Agarkar <[email protected]> (cherry picked from commit e799ee0)
1 parent 27dbcc4 commit b74429a

File tree

16 files changed

+1464
-40
lines changed

16 files changed

+1464
-40
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLoad.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,16 @@ public CompactionState getCompactionState() {
389389
return metrics.getCompactionState();
390390
}
391391

392+
@Override
393+
public Size getRegionSizeMB() {
394+
return metrics.getRegionSizeMB();
395+
}
396+
397+
@Override
398+
public float getCurrentRegionCachedRatio() {
399+
return metrics.getCurrentRegionCachedRatio();
400+
}
401+
392402
/**
393403
* @see java.lang.Object#toString()
394404
*/

hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,11 @@ public List<ServerTask> getTasks() {
430430
return metrics.getTasks();
431431
}
432432

433+
@Override
434+
public Map<String, Integer> getRegionCachedInfo() {
435+
return metrics.getRegionCachedInfo();
436+
}
437+
433438
/**
434439
* Originally, this method factored in the effect of requests going to the server as well.
435440
* However, this does not interact very well with the current region rebalancing code, which only

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1428,6 +1428,18 @@ public enum OperationStatusCode {
14281428
*/
14291429
public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
14301430

1431+
/**
1432+
* If the chosen ioengine can persist its state across restarts, the path to the file to persist
1433+
* to. This file is NOT the data file. It is a file into which we will serialize the map of what
1434+
* is in the data file. For example, if you pass the following argument as
1435+
* BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"),
1436+
* <code>file:/tmp/bucketcache.data </code>, then we will write the bucketcache data to the file
1437+
* <code>/tmp/bucketcache.data</code> but the metadata on where the data is in the supplied file
1438+
* is an in-memory map that needs to be persisted across restarts. Where to store this in-memory
1439+
* state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>.
1440+
*/
1441+
public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path";
1442+
14311443
/**
14321444
* HConstants for fast fail on the client side follow
14331445
*/

hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ private ServerMetrics mockServerMetricsWithReadRequests(ServerName server,
8686
when(rl.getWriteRequestCount()).thenReturn(0L);
8787
when(rl.getMemStoreSize()).thenReturn(Size.ZERO);
8888
when(rl.getStoreFileSize()).thenReturn(Size.ZERO);
89+
when(rl.getRegionSizeMB()).thenReturn(Size.ZERO);
90+
when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f);
8991
regionLoadMap.put(info.getRegionName(), rl);
9092
}
9193
when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap);

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.io.hfile;
1919

2020
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
21+
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;
2122
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
2223

2324
import java.io.IOException;
@@ -47,18 +48,6 @@ public final class BlockCacheFactory {
4748
public static final String BLOCKCACHE_POLICY_KEY = "hfile.block.cache.policy";
4849
public static final String BLOCKCACHE_POLICY_DEFAULT = "LRU";
4950

50-
/**
51-
* If the chosen ioengine can persist its state across restarts, the path to the file to persist
52-
* to. This file is NOT the data file. It is a file into which we will serialize the map of what
53-
* is in the data file. For example, if you pass the following argument as
54-
* BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"),
55-
* <code>file:/tmp/bucketcache.data </code>, then we will write the bucketcache data to the file
56-
* <code>/tmp/bucketcache.data</code> but the metadata on where the data is in the supplied file
57-
* is an in-memory map that needs to be persisted across restarts. Where to store this in-memory
58-
* state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>.
59-
*/
60-
public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path";
61-
6251
public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
6352

6453
public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = "hbase.bucketcache.writer.queuelength";

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
337337
fullyCachedFiles.clear();
338338
backingMapValidated.set(true);
339339
bucketAllocator = new BucketAllocator(capacity, bucketSizes);
340+
regionCachedSizeMap.clear();
340341
}
341342
} else {
342343
bucketAllocator = new BucketAllocator(capacity, bucketSizes);
@@ -1503,6 +1504,7 @@ private void disableCache() {
15031504
this.backingMap.clear();
15041505
this.blocksByHFile.clear();
15051506
this.fullyCachedFiles.clear();
1507+
this.regionCachedSizeMap.clear();
15061508
}
15071509
}
15081510

@@ -2024,9 +2026,9 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
20242026
final MutableInt count = new MutableInt();
20252027
LOG.debug("iterating over {} entries in the backing map", backingMap.size());
20262028
backingMap.entrySet().stream().forEach(entry -> {
2027-
if (entry.getKey().getHfileName().equals(fileName)) {
2029+
if (entry.getKey().getHfileName().equals(fileName.getName())) {
20282030
LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}",
2029-
fileName, entry.getKey().getOffset());
2031+
fileName.getName(), entry.getKey().getOffset());
20302032
ReentrantReadWriteLock lock = offsetLock.getLock(entry.getKey().getOffset());
20312033
lock.readLock().lock();
20322034
locks.add(lock);
@@ -2037,23 +2039,23 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
20372039
});
20382040
// We may either place only data blocks on the BucketCache or all type of blocks
20392041
if (dataBlockCount == count.getValue() || totalBlockCount == count.getValue()) {
2040-
LOG.debug("File {} has now been fully cached.", fileName);
2042+
LOG.debug("File {} has now been fully cached.", fileName.getName());
20412043
fileCacheCompleted(fileName, size);
20422044
} else {
20432045
LOG.debug(
20442046
"Prefetch executor completed for {}, but only {} blocks were cached. "
20452047
+ "Total blocks for file: {}. Checking for blocks pending cache in cache writer queue.",
2046-
fileName, count.getValue(), dataBlockCount);
2048+
fileName.getName(), count.getValue(), dataBlockCount);
20472049
if (ramCache.hasBlocksForFile(fileName.getName())) {
20482050
LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms "
2049-
+ "and try the verification again.", fileName);
2051+
+ "and try the verification again.", fileName.getName());
20502052
Thread.sleep(100);
20512053
notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
20522054
} else {
20532055
LOG.info(
20542056
"We found only {} blocks cached from a total of {} for file {}, "
20552057
+ "but no blocks pending caching. Maybe cache is full?",
2056-
count, dataBlockCount, fileName);
2058+
count, dataBlockCount, fileName.getName());
20572059
}
20582060
}
20592061
} catch (InterruptedException e) {

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.security.MessageDigest;
2323
import java.security.NoSuchAlgorithmException;
24+
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
2425
import org.apache.hadoop.hbase.util.Bytes;
2526
import org.apache.hadoop.util.Shell;
2627
import org.apache.yetus.audience.InterfaceAudience;
@@ -91,7 +92,8 @@ protected byte[] calculateChecksum(String algorithm) {
9192
private static long getFileSize(String filePath) throws IOException {
9293
DU.setExecCommand(filePath);
9394
DU.execute();
94-
return Long.parseLong(DU.getOutput().split("\t")[0]);
95+
String size = DU.getOutput().split("\t")[0];
96+
return StringUtils.isEmpty(size.trim()) ? 0 : Long.parseLong(size);
9597
}
9698

9799
private static class DuFileCommand extends Shell.ShellCommandExecutor {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java

Lines changed: 154 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
3535
import org.apache.hadoop.hbase.master.RackManager;
3636
import org.apache.hadoop.hbase.net.Address;
37+
import org.apache.hadoop.hbase.util.Pair;
3738
import org.apache.yetus.audience.InterfaceAudience;
3839
import org.slf4j.Logger;
3940
import org.slf4j.LoggerFactory;
@@ -114,6 +115,12 @@ class BalancerClusterState {
114115
private float[][] rackLocalities;
115116
// Maps localityType -> region -> [server|rack]Index with highest locality
116117
private int[][] regionsToMostLocalEntities;
118+
// Maps region -> serverIndex -> regionCacheRatio of a region on a server
119+
private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio;
120+
// Maps regionIndex -> serverIndex with best region cache ratio
121+
private int[] regionServerIndexWithBestRegionCachedRatio;
122+
// Maps regionName -> oldServerName -> cache ratio of the region on the old server
123+
Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
117124

118125
static class DefaultRackManager extends RackManager {
119126
@Override
@@ -125,13 +132,20 @@ public String getRack(ServerName server) {
125132
BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
126133
Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder,
127134
RackManager rackManager) {
128-
this(null, clusterState, loads, regionFinder, rackManager);
135+
this(null, clusterState, loads, regionFinder, rackManager, null);
136+
}
137+
138+
protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
139+
Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder,
140+
RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
141+
this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio);
129142
}
130143

131144
@SuppressWarnings("unchecked")
132145
BalancerClusterState(Collection<RegionInfo> unassignedRegions,
133146
Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
134-
RegionLocationFinder regionFinder, RackManager rackManager) {
147+
RegionLocationFinder regionFinder, RackManager rackManager,
148+
Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
135149
if (unassignedRegions == null) {
136150
unassignedRegions = Collections.emptyList();
137151
}
@@ -145,6 +159,8 @@ public String getRack(ServerName server) {
145159
tables = new ArrayList<>();
146160
this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
147161

162+
this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio;
163+
148164
numRegions = 0;
149165

150166
List<List<Integer>> serversPerHostList = new ArrayList<>();
@@ -527,6 +543,142 @@ private void computeCachedLocalities() {
527543

528544
}
529545

546+
/**
547+
* Returns the size of hFiles from the most recent RegionLoad for region
548+
*/
549+
public int getTotalRegionHFileSizeMB(int region) {
550+
Deque<BalancerRegionLoad> load = regionLoads[region];
551+
if (load == null) {
552+
// This means, that the region has no actual data on disk
553+
return 0;
554+
}
555+
return regionLoads[region].getLast().getRegionSizeMB();
556+
}
557+
558+
/**
559+
* Returns the weighted cache ratio of a region on the given region server
560+
*/
561+
public float getOrComputeWeightedRegionCacheRatio(int region, int server) {
562+
return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server);
563+
}
564+
565+
/**
566+
* Returns the amount by which a region is cached on a given region server. If the region is not
567+
* currently hosted on the given region server, then find out if it was previously hosted there
568+
* and return the old cache ratio.
569+
*/
570+
protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) {
571+
float regionCacheRatio = 0.0f;
572+
573+
// Get the current region cache ratio if the region is hosted on the server regionServerIndex
574+
for (int regionIndex : regionsPerServer[regionServerIndex]) {
575+
if (region != regionIndex) {
576+
continue;
577+
}
578+
579+
Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];
580+
581+
// The region is currently hosted on this region server. Get the region cache ratio for this
582+
// region on this server
583+
regionCacheRatio =
584+
regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio();
585+
586+
return regionCacheRatio;
587+
}
588+
589+
// Region is not currently hosted on this server. Check if the region was cached on this
590+
// server earlier. This can happen when the server was shutdown and the cache was persisted.
591+
// Search using the region name and server name and not the index id and server id as these ids
592+
// may change when a server is marked as dead or a new server is added.
593+
String regionEncodedName = regions[region].getEncodedName();
594+
ServerName serverName = servers[regionServerIndex];
595+
if (
596+
regionCacheRatioOnOldServerMap != null
597+
&& regionCacheRatioOnOldServerMap.containsKey(regionEncodedName)
598+
) {
599+
Pair<ServerName, Float> cacheRatioOfRegionOnServer =
600+
regionCacheRatioOnOldServerMap.get(regionEncodedName);
601+
if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) {
602+
regionCacheRatio = cacheRatioOfRegionOnServer.getSecond();
603+
if (LOG.isDebugEnabled()) {
604+
LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName,
605+
serverName, regionCacheRatio);
606+
}
607+
}
608+
}
609+
return regionCacheRatio;
610+
}
611+
612+
/**
613+
* Populate the maps containing information about how much a region is cached on a region server.
614+
*/
615+
private void computeRegionServerRegionCacheRatio() {
616+
regionIndexServerIndexRegionCachedRatio = new HashMap<>();
617+
regionServerIndexWithBestRegionCachedRatio = new int[numRegions];
618+
619+
for (int region = 0; region < numRegions; region++) {
620+
float bestRegionCacheRatio = 0.0f;
621+
int serverWithBestRegionCacheRatio = 0;
622+
for (int server = 0; server < numServers; server++) {
623+
float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server);
624+
if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) {
625+
// A region with cache ratio 0 on a server means nothing. Hence, just make a note of
626+
// cache ratio only if the cache ratio is greater than 0.
627+
Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
628+
regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio);
629+
}
630+
if (regionCacheRatio > bestRegionCacheRatio) {
631+
serverWithBestRegionCacheRatio = server;
632+
// If the server currently hosting the region has equal cache ratio to a historical
633+
// server, consider the current server to keep hosting the region
634+
bestRegionCacheRatio = regionCacheRatio;
635+
} else if (
636+
regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region]
637+
) {
638+
// If two servers have same region cache ratio, then the server currently hosting the
639+
// region
640+
// should retain the region
641+
serverWithBestRegionCacheRatio = server;
642+
}
643+
}
644+
regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio;
645+
Pair<Integer, Integer> regionServerPair =
646+
new Pair<>(region, regionIndexToServerIndex[region]);
647+
float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair);
648+
if (tempRegionCacheRatio > bestRegionCacheRatio) {
649+
LOG.warn(
650+
"INVALID CONDITION: region {} on server {} cache ratio {} is greater than the "
651+
+ "best region cache ratio {} on server {}",
652+
regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]],
653+
tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]);
654+
}
655+
}
656+
}
657+
658+
protected float getOrComputeRegionCacheRatio(int region, int server) {
659+
if (
660+
regionServerIndexWithBestRegionCachedRatio == null
661+
|| regionIndexServerIndexRegionCachedRatio.isEmpty()
662+
) {
663+
computeRegionServerRegionCacheRatio();
664+
}
665+
666+
Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
667+
return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair)
668+
? regionIndexServerIndexRegionCachedRatio.get(regionServerPair)
669+
: 0.0f;
670+
}
671+
672+
public int[] getOrComputeServerWithBestRegionCachedRatio() {
673+
if (
674+
regionServerIndexWithBestRegionCachedRatio == null
675+
|| regionIndexServerIndexRegionCachedRatio.isEmpty()
676+
) {
677+
computeRegionServerRegionCacheRatio();
678+
}
679+
return regionServerIndexWithBestRegionCachedRatio;
680+
}
681+
530682
/**
531683
* Maps region index to rack index
532684
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,16 @@ class BalancerRegionLoad {
3333
private final long writeRequestsCount;
3434
private final int memStoreSizeMB;
3535
private final int storefileSizeMB;
36+
private final int regionSizeMB;
37+
private final float currentRegionPrefetchRatio;
3638

3739
BalancerRegionLoad(RegionMetrics regionMetrics) {
3840
readRequestsCount = regionMetrics.getReadRequestCount();
3941
writeRequestsCount = regionMetrics.getWriteRequestCount();
4042
memStoreSizeMB = (int) regionMetrics.getMemStoreSize().get(Size.Unit.MEGABYTE);
4143
storefileSizeMB = (int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE);
44+
regionSizeMB = (int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE);
45+
currentRegionPrefetchRatio = regionMetrics.getCurrentRegionCachedRatio();
4246
}
4347

4448
public long getReadRequestsCount() {
@@ -56,4 +60,12 @@ public int getMemStoreSizeMB() {
5660
public int getStorefileSizeMB() {
5761
return storefileSizeMB;
5862
}
63+
64+
public int getRegionSizeMB() {
65+
return regionSizeMB;
66+
}
67+
68+
public float getCurrentRegionCacheRatio() {
69+
return currentRegionPrefetchRatio;
70+
}
5971
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,8 @@ private BalancerClusterState createCluster(List<ServerName> servers,
365365
clusterState.put(server, Collections.emptyList());
366366
}
367367
}
368-
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager);
368+
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager,
369+
null);
369370
}
370371

371372
private List<ServerName> findIdleServers(List<ServerName> servers) {

0 commit comments

Comments
 (0)