Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public String getRack(ServerName server) {
serversPerHostList.get(hostIndex).add(serverIndex);

String rack = this.rackManager.getRack(sn);

if (!racksToIndex.containsKey(rack)) {
racksToIndex.put(rack, numRacks++);
serversPerRackList.add(new ArrayList<>());
Expand All @@ -187,6 +188,7 @@ public String getRack(ServerName server) {
serversPerRackList.get(rackIndex).add(serverIndex);
}

LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
// Count how many regions there are.
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
numRegions += entry.getValue().size();
Expand Down Expand Up @@ -285,6 +287,7 @@ public String getRack(ServerName server) {
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
for (int j = 0; j < serversPerHost[i].length; j++) {
serversPerHost[i][j] = serversPerHostList.get(i).get(j);
LOG.debug("server {} is on host {}",serversPerHostList.get(i).get(j), i);
}
if (serversPerHost[i].length > 1) {
multiServersPerHost = true;
Expand All @@ -295,6 +298,7 @@ public String getRack(ServerName server) {
serversPerRack[i] = new int[serversPerRackList.get(i).size()];
for (int j = 0; j < serversPerRack[i].length; j++) {
serversPerRack[i][j] = serversPerRackList.get(i).get(j);
LOG.info("server {} is on rack {}",serversPerRackList.get(i).get(j), i);
}
}

Expand Down Expand Up @@ -792,6 +796,10 @@ boolean contains(int[] arr, int val) {

private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);

public Comparator<Integer> getNumRegionsComparator() {
return numRegionsComparator;
}

int getLowestLocalityRegionOnServer(int serverIndex) {
if (regionFinder != null) {
float lowestLocality = 1.0f;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hbase.master.balancer;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
Expand All @@ -34,27 +35,53 @@ BalanceAction generate(BalancerClusterState cluster) {
private int pickLeastLoadedServer(final BalancerClusterState cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;

int index = 0;
while (servers[index] == null || servers[index] == thisServer) {
index++;
if (index == servers.length) {
return -1;
int selectedIndex = -1;
double currentLargestRandom = -1;
for (int i = 0; i < servers.length; i++) {
if (servers[i] == null || servers[i] == thisServer) {
continue;
}
if (selectedIndex != -1
&& cluster.getNumRegionsComparator().compare(servers[i], servers[selectedIndex]) != 0) {
// Exhausted servers of the same region count
break;
}
// we don't know how many servers have the same region count, we will randomly select one
// using a simplified inline reservoir sampling by assignmening a random number to stream
// data and choose the greatest one. (http://gregable.com/2007/10/reservoir-sampling.html)
double currentRandom = ThreadLocalRandom.current().nextDouble();
if (currentRandom > currentLargestRandom) {
selectedIndex = i;
currentLargestRandom = currentRandom;
}
}
return servers[index];
return selectedIndex == -1 ? -1 : servers[selectedIndex];
}

private int pickMostLoadedServer(final BalancerClusterState cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;

int index = servers.length - 1;
while (servers[index] == null || servers[index] == thisServer) {
index--;
if (index < 0) {
return -1;
int selectedIndex = -1;
double currentLargestRandom = -1;
for (int i = servers.length - 1; i >= 0; i--) {
if (servers[i] == null || servers[i] == thisServer) {
continue;
}
if (selectedIndex != -1 && cluster.getNumRegionsComparator().compare(servers[i],
servers[selectedIndex]) != 0) {
// Exhausted servers of the same region count
break;
}
// we don't know how many servers have the same region count, we will randomly select one
// using a simplified inline reservoir sampling by assignmening a random number to stream
// data and choose the greatest one. (http://gregable.com/2007/10/reservoir-sampling.html)
double currentRandom = ThreadLocalRandom.current().nextDouble();
if (currentRandom > currentLargestRandom) {
selectedIndex = i;
currentLargestRandom = currentRandom;
}
}
return servers[index];
return selectedIndex == -1? -1 : servers[selectedIndex];
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,6 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
}

if (idleRegionServerExist(cluster)){
LOG.info("Running balancer because at least one server hosts replicas of the same region." +
"regionReplicaRackCostFunction={}", regionReplicaRackCostFunction.cost());
LOG.info("Running balancer because cluster has idle server(s)."+
" function cost={}", functionCost());
return true;
Expand Down Expand Up @@ -510,9 +508,9 @@ protected List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
LOG.info("Finished computing new moving plan. Computation took {} ms" +
" to try {} different iterations. Found a solution that moves " +
"{} regions; Going from a computed imbalance of {}" +
" to a new imbalance of {}. ",
" to a new imbalance of {}. funtionCost={}",
endTime - startTime, step, plans.size(),
initCost / sumMultiplier, currentCost / sumMultiplier);
initCost / sumMultiplier, currentCost / sumMultiplier, functionCost());
sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
return plans;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected void testWithCluster(Map<ServerName, List<RegionInfo>> serverMap,
List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);

// Print out the cluster loads to make debugging easier.
LOG.info("Mock Balance : " + printMock(balancedCluster));
LOG.info("Mock after Balance : " + printMock(balancedCluster));

if (assertFullyBalanced) {
assertClusterAsBalanced(balancedCluster);
Expand All @@ -95,4 +95,40 @@ protected void testWithCluster(Map<ServerName, List<RegionInfo>> serverMap,
}
}
}

protected void testWithClusterWithIteration(Map<ServerName, List<RegionInfo>> serverMap,
RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) {
List<ServerAndLoad> list = convertToList(serverMap);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));

loadBalancer.setRackManager(rackManager);
// Run the balancer.
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
assertNotNull("Initial cluster balance should produce plans.", plans);

List<ServerAndLoad> balancedCluster = null;
// Run through iteration until done. Otherwise will be killed as test time out
while (plans != null && (assertFullyBalanced || assertFullyBalancedForReplicas)) {
// Apply the plan to the mock cluster.
balancedCluster = reconcile(list, plans, serverMap);

// Print out the cluster loads to make debugging easier.
LOG.info("Mock after balance: " + printMock(balancedCluster));

LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap);
plans = loadBalancer.balanceCluster(LoadOfAllTable);
}

// Print out the cluster loads to make debugging easier.
LOG.info("Mock Final balance: " + printMock(balancedCluster));

if (assertFullyBalanced) {
assertNull("Given a requirement to be fully balanced, second attempt at plans should " +
"produce none.", plans);
}
if (assertFullyBalancedForReplicas) {
assertRegionReplicaPlacement(serverMap, rackManager);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand All @@ -37,25 +38,36 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends Stochastic
HBaseClassTestRule.forClass(TestStochasticLoadBalancerRegionReplicaWithRacks.class);

private static class ForTestRackManager extends RackManager {

int numRacks;
Map<String, Integer> serverIndexes = new HashMap<String, Integer>();
int numServers = 0;

public ForTestRackManager(int numRacks) {
this.numRacks = numRacks;
}


@Override
public String getRack(ServerName server) {
return "rack_" + (server.hashCode() % numRacks);
String key = server.getServerName();
if (!serverIndexes.containsKey(key)) {
serverIndexes.put(key, numServers++);
}
return "rack_" + serverIndexes.get(key) % numRacks;
}
}

@Test
public void testRegionReplicationOnMidClusterWithRacks() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 10000000L);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L);
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
// for full balance
// conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.001f);
loadBalancer.onConfigurationChange(conf);
int numNodes = 4;
int numNodes = 5;
int numRegions = numNodes * 1;
int replication = 3; // 3 replicas per region
int numRegionsPerServer = 1;
Expand All @@ -65,6 +77,26 @@ public void testRegionReplicationOnMidClusterWithRacks() {
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
RackManager rm = new ForTestRackManager(numRacks);

testWithCluster(serverMap, rm, false, true);
testWithClusterWithIteration(serverMap, rm, true, true);
}

@Test
public void testRegionReplicationOnLargeClusterWithRacks() {
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", false);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 5000L);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10 * 1000); // 10 sec
loadBalancer.onConfigurationChange(conf);
int numNodes = 100;
int numRegions = numNodes * 30;
int replication = 3; // 3 replicas per region
int numRegionsPerServer = 28;
int numTables = 1;
int numRacks = 4; // all replicas should be on a different rack
Map<ServerName, List<RegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
RackManager rm = new ForTestRackManager(numRacks);

testWithClusterWithIteration(serverMap, rm, true, true);
}
}