Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ public synchronized void loadConf(Configuration configuration) {
}

@Override
protected List<CandidateGenerator> createCandidateGenerators() {
List<CandidateGenerator> candidateGenerators = new ArrayList<>(2);
candidateGenerators.add(GeneratorFunctionType.LOAD.ordinal(),
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
createCandidateGenerators() {
Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators =
new HashMap<>(2);
candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class,
new CacheAwareSkewnessCandidateGenerator());
candidateGenerators.add(GeneratorFunctionType.CACHE_RATIO.ordinal(),
new CacheAwareCandidateGenerator());
candidateGenerators.put(CacheAwareCandidateGenerator.class, new CacheAwareCandidateGenerator());
return candidateGenerators;
}

Expand Down Expand Up @@ -409,8 +410,9 @@ protected void regionMoved(int region, int oldServer, int newServer) {
});
}

public final void updateWeight(double[] weights) {
weights[GeneratorFunctionType.LOAD.ordinal()] += cost();
@Override
public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
}
}

Expand Down Expand Up @@ -478,8 +480,8 @@ private int getServerWithBestCacheRatioForRegion(int region) {
}

@Override
public final void updateWeight(double[] weights) {
weights[GeneratorFunctionType.CACHE_RATIO.ordinal()] += cost();
public void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.Map;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -91,8 +92,8 @@ protected void regionMoved(int region, int oldServer, int newServer) {
* Called once per init or after postAction.
* @param weights the weights for every generator.
*/
public void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.RANDOM.ordinal()] += cost();
public void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(RandomCandidateGenerator.class, cost(), Double::sum);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,20 @@ public void setFavoredNodesManager(FavoredNodesManager fnm) {
}

@Override
protected List<CandidateGenerator> createCandidateGenerators() {
List<CandidateGenerator> fnPickers = new ArrayList<>(2);
fnPickers.add(new FavoredNodeLoadPicker());
fnPickers.add(new FavoredNodeLocalityPicker());
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
createCandidateGenerators() {
Map<Class<? extends CandidateGenerator>, CandidateGenerator> fnPickers = new HashMap<>(2);
fnPickers.put(FavoredNodeLoadPicker.class, new FavoredNodeLoadPicker());
fnPickers.put(FavoredNodeLocalityPicker.class, new FavoredNodeLocalityPicker());
return fnPickers;
}

/** Returns any candidate generator in random */
@Override
protected CandidateGenerator getRandomGenerator() {
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
Class<? extends CandidateGenerator> clazz = shuffledGeneratorClasses.get()
.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
return candidateGenerators.get(clazz);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -89,7 +90,7 @@ private double getWeightedLocality(int region, int entity) {
}

@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.LOCALITY.ordinal()] += cost();
public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(LocalityBasedCandidateGenerator.class, cost(), Double::sum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -61,7 +62,7 @@ protected void regionMoved(int region, int oldServer, int newServer) {
}

@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.LOAD.ordinal()] += cost();
public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.agrona.collections.Hashing;
import org.agrona.collections.Int2IntCounterMap;
Expand Down Expand Up @@ -73,8 +74,8 @@ protected double cost() {
}

@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.RACK.ordinal()] += cost();
public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(RegionReplicaRackCandidateGenerator.class, cost(), Double::sum);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
Expand All @@ -48,6 +51,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;

/**
* <p>
* This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will randomly try and
Expand Down Expand Up @@ -147,7 +153,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private double curOverallCost = 0d;
private double[] tempFunctionCosts;
private double[] curFunctionCosts;
private double[] weightsOfGenerators;

// Keep locality based picker and cost function to alert them
// when new services are offered
Expand All @@ -157,14 +162,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;

protected List<CandidateGenerator> candidateGenerators;

public enum GeneratorType {
RANDOM,
LOAD,
LOCALITY,
RACK
}
private final Map<Class<? extends CandidateGenerator>, Double> weightsOfGenerators =
new HashMap<>();
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
protected final Supplier<List<Class<? extends CandidateGenerator>>> shuffledGeneratorClasses =
Suppliers.memoizeWithExpiration(() -> {
List<Class<? extends CandidateGenerator>> shuffled =
new ArrayList<>(candidateGenerators.keySet());
Collections.shuffle(shuffled);
return shuffled;
}, 5, TimeUnit.SECONDS);

/**
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
Expand Down Expand Up @@ -213,16 +220,20 @@ private void loadCustomCostFunctions(Configuration conf) {

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
List<CandidateGenerator> getCandidateGenerators() {
Map<Class<? extends CandidateGenerator>, CandidateGenerator> getCandidateGenerators() {
return this.candidateGenerators;
}

protected List<CandidateGenerator> createCandidateGenerators() {
List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4);
candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator());
candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator());
candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator);
candidateGenerators.add(GeneratorType.RACK.ordinal(),
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
createCandidateGenerators() {
Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators =
new HashMap<>(5);
candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator());
candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator());
candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator);
candidateGenerators.put(RegionReplicaCandidateGenerator.class,
new RegionReplicaCandidateGenerator());
candidateGenerators.put(RegionReplicaRackCandidateGenerator.class,
new RegionReplicaRackCandidateGenerator());
return candidateGenerators;
}
Expand Down Expand Up @@ -409,34 +420,54 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
BalanceAction nextAction(BalancerClusterState cluster) {
return getRandomGenerator().generate(cluster);
Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState cluster) {
CandidateGenerator generator = getRandomGenerator();
return Pair.newPair(generator, generator.generate(cluster));
}

/**
* Select the candidate generator to use based on the cost of cost functions. The chance of
* selecting a candidate generator is propotional to the share of cost of all cost functions among
* all cost functions that benefit from it.
* selecting a candidate generator is proportional to the share of cost of all cost functions
* among all cost functions that benefit from it.
*/
protected CandidateGenerator getRandomGenerator() {
double sum = 0;
for (int i = 0; i < weightsOfGenerators.length; i++) {
sum += weightsOfGenerators[i];
weightsOfGenerators[i] = sum;
}
if (sum == 0) {
return candidateGenerators.get(0);
Preconditions.checkState(!candidateGenerators.isEmpty(), "No candidate generators available.");
List<Class<? extends CandidateGenerator>> generatorClasses = shuffledGeneratorClasses.get();
List<Double> partialSums = new ArrayList<>(generatorClasses.size());
double sum = 0.0;
for (Class<? extends CandidateGenerator> clazz : generatorClasses) {
double weight = weightsOfGenerators.getOrDefault(clazz, 0.0);
sum += weight;
partialSums.add(sum);
}
for (int i = 0; i < weightsOfGenerators.length; i++) {
weightsOfGenerators[i] /= sum;

// If the sum of all weights is zero, fall back to any generator
if (sum == 0.0) {
return pickAnyGenerator(generatorClasses);
}

double rand = ThreadLocalRandom.current().nextDouble();
for (int i = 0; i < weightsOfGenerators.length; i++) {
if (rand <= weightsOfGenerators[i]) {
return candidateGenerators.get(i);
// Normalize partial sums so that the last one should be exactly 1.0
for (int i = 0; i < partialSums.size(); i++) {
partialSums.set(i, partialSums.get(i) / sum);
}

// Generate a random number and pick the first generator whose partial sum is >= rand
for (int i = 0; i < partialSums.size(); i++) {
if (rand <= partialSums.get(i)) {
return candidateGenerators.get(generatorClasses.get(i));
}
}
return candidateGenerators.get(candidateGenerators.size() - 1);

// Fallback: if for some reason we didn't return above, return any generator
return pickAnyGenerator(generatorClasses);
}

private CandidateGenerator
pickAnyGenerator(List<Class<? extends CandidateGenerator>> generatorClasses) {
Class<? extends CandidateGenerator> randomClass =
generatorClasses.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
return candidateGenerators.get(randomClass);
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
Expand Down Expand Up @@ -521,22 +552,27 @@ protected List<RegionPlan> balanceTable(TableName tableName,
final String initFunctionTotalCosts = totalCostsPerFunc();
// Perform a stochastic walk to see if we can get a good fit.
long step;

Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new HashMap<>();
Map<Class<? extends CandidateGenerator>, Long> generatorToApprovedActionCount = new HashMap<>();
for (step = 0; step < computedMaxSteps; step++) {
BalanceAction action = nextAction(cluster);
Pair<CandidateGenerator, BalanceAction> nextAction = nextAction(cluster);
CandidateGenerator generator = nextAction.getFirst();
BalanceAction action = nextAction.getSecond();

if (action.getType() == BalanceAction.Type.NULL) {
continue;
}

cluster.doAction(action);
updateCostsAndWeightsWithAction(cluster, action);
generatorToStepCount.merge(generator.getClass(), 1L, Long::sum);

newCost = computeCost(cluster, currentCost);

// Should this be kept?
if (newCost < currentCost) {
currentCost = newCost;
generatorToApprovedActionCount.merge(generator.getClass(), 1L, Long::sum);

// save for JMX
curOverallCost = currentCost;
Expand All @@ -555,6 +591,15 @@ protected List<RegionPlan> balanceTable(TableName tableName,
}
long endTime = EnvironmentEdgeManager.currentTime();

StringJoiner joiner = new StringJoiner("\n");
joiner.add("CandidateGenerator activity summary:");
generatorToStepCount.forEach((generator, count) -> {
long approvals = generatorToApprovedActionCount.getOrDefault(generator, 0L);
joiner.add(String.format(" - %s: %d steps, %d approvals", generator.getSimpleName(), count,
approvals));
});
LOG.debug(joiner.toString());

metricsBalancer.balanceCluster(endTime - startTime);

if (initCost > currentCost) {
Expand Down Expand Up @@ -747,8 +792,10 @@ private void updateRegionLoad() {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void initCosts(BalancerClusterState cluster) {
// Initialize the weights of generator every time
weightsOfGenerators = new double[this.candidateGenerators.size()];
weightsOfGenerators.clear();
for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
weightsOfGenerators.put(clazz, 0.0);
}
for (CostFunction c : costFunctions) {
c.prepare(cluster);
c.updateWeight(weightsOfGenerators);
Expand All @@ -762,8 +809,8 @@ void initCosts(BalancerClusterState cluster) {
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
// Reset all the weights to 0
for (int i = 0; i < weightsOfGenerators.length; i++) {
weightsOfGenerators[i] = 0;
for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
weightsOfGenerators.put(clazz, 0.0);
}
for (CostFunction c : costFunctions) {
if (c.isNeeded()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ public void testCostAfterUndoAction() {
loadBalancer.initCosts(cluster);
for (int i = 0; i != runs; ++i) {
final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
BalanceAction action = loadBalancer.nextAction(cluster);
BalanceAction action = loadBalancer.nextAction(cluster).getSecond();
cluster.doAction(action);
loadBalancer.updateCostsAndWeightsWithAction(cluster, action);
BalanceAction undoAction = action.undoAction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class TestStochasticLoadBalancerBalanceCluster extends StochasticBalancer
*/
@Test
public void testBalanceCluster() throws Exception {
setMaxRunTime(Duration.ofMillis(1500));
setMaxRunTime(Duration.ofMillis(2500));
loadBalancer.onConfigurationChange(conf);
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testRegionReplicationOnMidClusterWithRacks() {
public void testRegionReplicationOnLargeClusterWithRacks() {
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L);
setMaxRunTime(Duration.ofSeconds(5));
setMaxRunTime(Duration.ofSeconds(10));
loadBalancer.onConfigurationChange(conf);
int numNodes = 100;
int numRegions = numNodes * 30;
Expand Down
Loading