Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,6 +115,12 @@ class BalancerClusterState {
private float[][] rackLocalities;
// Maps localityType -> region -> [server|rack]Index with highest locality
private int[][] regionsToMostLocalEntities;
// Maps region -> serverIndex -> prefetchRatio of a region on a server
private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexPrefetchRatio;
// Maps regionIndex -> serverIndex with best prefetch ratio
private int[] regionServerIndexWithBestPrefetchRatio;
// Maps regionName -> oldServerName -> oldPrefetchRatio
Map<String, Pair<ServerName, Float>> oldRegionServerPrefetchRatio;

static class DefaultRackManager extends RackManager {
@Override
Expand All @@ -125,13 +132,20 @@ public String getRack(ServerName server) {
BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager) {
this(null, clusterState, loads, regionFinder, rackManager);
this(null, clusterState, loads, regionFinder, rackManager, null);
}

protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerPrefetchRatio) {
this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerPrefetchRatio);
}

@SuppressWarnings("unchecked")
BalancerClusterState(Collection<RegionInfo> unassignedRegions,
Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) {
RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager,
Map<String, Pair<ServerName, Float>> oldRegionServerPrefetchRatio) {
if (unassignedRegions == null) {
unassignedRegions = Collections.emptyList();
}
Expand All @@ -145,6 +159,8 @@ public String getRack(ServerName server) {
tables = new ArrayList<>();
this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();

this.oldRegionServerPrefetchRatio = oldRegionServerPrefetchRatio;

numRegions = 0;

List<List<Integer>> serversPerHostList = new ArrayList<>();
Expand Down Expand Up @@ -541,6 +557,141 @@ private void computeCachedLocalities() {

}

/**
* Returns the size of hFiles from the most recent RegionLoad for region
*/
public int getTotalRegionHFileSizeMB(int region) {
Deque<BalancerRegionLoad> load = regionLoads[region];
if (load == null) {
// This means, that the region has no actual data on disk
return 0;
}
return regionLoads[region].getLast().getRegionSizeMB();
}

/**
* Returns the weighted prefetch ratio of a region on the given region server
*/
public float getOrComputeWeightedPrefetchRatio(int region, int server) {
return getTotalRegionHFileSizeMB(region) * getOrComputeRegionPrefetchRatio(region, server);
}

/**
* Returns the amount by which a region is prefetched on a given region server. If the region is
* not currently hosted on the given region server, then find out if it was previously hosted
* there and return the old prefetch ratio.
*/
protected float getRegionServerPrefetchRatio(int region, int regionServerIndex) {
float prefetchRatio = 0.0f;

// Get the current prefetch ratio if the region is hosted on the server regionServerIndex
for (int regionIndex : regionsPerServer[regionServerIndex]) {
if (region != regionIndex) {
continue;
}

Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];

// The region is currently hosted on this region server. Get the prefetch ratio for this
// region on this server
prefetchRatio =
regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionPrefetchRatio();

return prefetchRatio;
}

// Region is not currently hosted on this server. Check if the region was prefetched on this
// server earlier. This can happen when the server was shutdown and the cache was persisted.
// Search using the region name and server name and not the index id and server id as these ids
// may change when a server is marked as dead or a new server is added.
String regionEncodedName = regions[region].getEncodedName();
ServerName serverName = servers[regionServerIndex];
if (
oldRegionServerPrefetchRatio != null
&& oldRegionServerPrefetchRatio.containsKey(regionEncodedName)
) {
Pair<ServerName, Float> serverPrefetchRatio =
oldRegionServerPrefetchRatio.get(regionEncodedName);
if (ServerName.isSameAddress(serverPrefetchRatio.getFirst(), serverName)) {
prefetchRatio = serverPrefetchRatio.getSecond();
if (LOG.isDebugEnabled()) {
LOG.debug("Old prefetch ratio found for region {} on server {}: {}", regionEncodedName,
serverName, prefetchRatio);
}
}
}
return prefetchRatio;
}

/**
* Populate the maps containing information about how much a region is prefetched on a region
* server.
*/
private void computeRegionServerPrefetchRatio() {
regionIndexServerIndexPrefetchRatio = new HashMap<>();
regionServerIndexWithBestPrefetchRatio = new int[numRegions];

for (int region = 0; region < numRegions; region++) {
float bestPrefetchRatio = 0.0f;
int serverWithBestPrefetchRatio = 0;
for (int server = 0; server < numServers; server++) {
float prefetchRatio = getRegionServerPrefetchRatio(region, server);
if (prefetchRatio > 0.0f || server == regionIndexToServerIndex[region]) {
// A region with prefetch ratio 0 on a server means nothing. Hence, just make a note of
// prefetch only if the prefetch ratio is greater than 0.
Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
regionIndexServerIndexPrefetchRatio.put(regionServerPair, prefetchRatio);
}
if (prefetchRatio > bestPrefetchRatio) {
serverWithBestPrefetchRatio = server;
// If the server currently hosting the region has equal prefetch ratio to a historical
// server, consider the current server to keep hosting the region
bestPrefetchRatio = prefetchRatio;
} else
if (prefetchRatio == bestPrefetchRatio && server == regionIndexToServerIndex[region]) {
// If two servers have same prefetch ratio, then the server currently hosting the region
// should retain the region
serverWithBestPrefetchRatio = server;
}
}
regionServerIndexWithBestPrefetchRatio[region] = serverWithBestPrefetchRatio;
Pair<Integer, Integer> regionServerPair =
new Pair<>(region, regionIndexToServerIndex[region]);
float tempPrefetchRatio = regionIndexServerIndexPrefetchRatio.get(regionServerPair);
if (tempPrefetchRatio > bestPrefetchRatio) {
LOG.warn(
"INVALID CONDITION: region {} on server {} prefetch ratio {} is greater than the "
+ "best prefetch ratio {} on server {}",
regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]],
tempPrefetchRatio, bestPrefetchRatio, servers[serverWithBestPrefetchRatio]);
}
}
}

protected float getOrComputeRegionPrefetchRatio(int region, int server) {
if (
regionServerIndexWithBestPrefetchRatio == null
|| regionIndexServerIndexPrefetchRatio.isEmpty()
) {
computeRegionServerPrefetchRatio();
}

Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
return regionIndexServerIndexPrefetchRatio.containsKey(regionServerPair)
? regionIndexServerIndexPrefetchRatio.get(regionServerPair)
: 0.0f;
}

public int[] getOrComputeServerWithBestPrefetchRatio() {
if (
regionServerIndexWithBestPrefetchRatio == null
|| regionIndexServerIndexPrefetchRatio.isEmpty()
) {
computeRegionServerPrefetchRatio();
}
return regionServerIndexWithBestPrefetchRatio;
}

/**
* Maps region index to rack index
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@ class BalancerRegionLoad {
private final long writeRequestsCount;
private final int memStoreSizeMB;
private final int storefileSizeMB;
private final int regionSizeMB;
private final float currentRegionPrefetchRatio;

BalancerRegionLoad(RegionMetrics regionMetrics) {
readRequestsCount = regionMetrics.getReadRequestCount();
cpRequestsCount = regionMetrics.getCpRequestCount();
writeRequestsCount = regionMetrics.getWriteRequestCount();
memStoreSizeMB = (int) regionMetrics.getMemStoreSize().get(Size.Unit.MEGABYTE);
storefileSizeMB = (int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE);
regionSizeMB = (int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE);
currentRegionPrefetchRatio = regionMetrics.getCurrentRegionCachedRatio();
}

public long getReadRequestsCount() {
Expand All @@ -62,4 +66,12 @@ public int getMemStoreSizeMB() {
public int getStorefileSizeMB() {
return storefileSizeMB;
}

public int getRegionSizeMB() {
return regionSizeMB;
}

public float getCurrentRegionPrefetchRatio() {
return currentRegionPrefetchRatio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ private BalancerClusterState createCluster(List<ServerName> servers,
clusterState.put(server, Collections.emptyList());
}
}
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager);
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager,
null);
}

private List<ServerName> findIdleServers(List<ServerName> servers) {
Expand Down
Loading