Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,47 @@
@InterfaceAudience.Private
abstract class CostFromRegionLoadFunction extends CostFunction {

private double[] stats;
private final DoubleArrayCost cost = new DoubleArrayCost();

@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
private double computeCostForRegionServer(int regionServerIndex) {
// Cost this server has from RegionLoad
double cost = 0;

// for every region on this server get the rl
for (int regionIndex : cluster.regionsPerServer[regionServerIndex]) {
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];

// Now if we found a region load get the type of cost that was requested.
if (regionLoadList != null) {
cost += getRegionLoadCost(regionLoadList);
}
}
return cost;
}

@Override
protected final double cost() {
for (int i = 0; i < stats.length; i++) {
// Cost this server has from RegionLoad
double cost = 0;

// for every region on this server get the rl
for (int regionIndex : cluster.regionsPerServer[i]) {
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];

// Now if we found a region load get the type of cost that was requested.
if (regionLoadList != null) {
cost += getRegionLoadCost(regionLoadList);
}
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
cost.prepare(cluster.numServers);
cost.setCosts(costs -> {
for (int i = 0; i < costs.length; i++) {
costs[i] = computeCostForRegionServer(i);
}
});
}

// Add the total cost to the stats.
stats[i] = cost;
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
// recompute the stat for the given two region servers
cost.setCosts(costs -> {
costs[oldServer] = computeCostForRegionServer(oldServer);
costs[newServer] = computeCostForRegionServer(newServer);
});
}

// Now return the scaled cost from data held in the stats object.
return costFromArray(stats);
@Override
protected final double cost() {
return cost.cost();
}

protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,65 +81,14 @@ protected void regionMoved(int region, int oldServer, int newServer) {

protected abstract double cost();

@SuppressWarnings("checkstyle:linelength")
/**
* Function to compute a scaled cost using
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
* It assumes that this is a zero sum set of costs. It assumes that the worst case possible is all
* of the elements in one region server and the rest having 0.
* @param stats the costs
* @return a scaled set of costs.
*/
protected final double costFromArray(double[] stats) {
double totalCost = 0;
double total = getSum(stats);

double count = stats.length;
double mean = total / count;

// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);

// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);

min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));

}
min = Math.max(0, min);
for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}

double scaled = scale(min, max, totalCost);
return scaled;
}

private double getSum(double[] stats) {
double total = 0;
for (double s : stats) {
total += s;
}
return total;
}

/**
* Scale the value between 0 and 1.
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
protected final double scale(double min, double max, double value) {
protected static double scale(double min, double max, double value) {
if (max <= min || value <= min) {
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.function.Consumer;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A helper class to compute a scaled cost using
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
* It assumes that this is a zero sum set of costs. It assumes that the worst case possible is all
* of the elements in one region server and the rest having 0.
*/
@InterfaceAudience.Private
final class DoubleArrayCost {

private double[] costs;

// computeCost call is expensive so we use this flag to indicate whether we need to recalculate
// the cost by calling computeCost
private boolean costsChanged;

private double cost;

void prepare(int length) {
if (costs == null || costs.length != length) {
costs = new double[length];
}
}

void setCosts(Consumer<double[]> consumer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: having a setFoo method that is not a simple POJO field assignment is surprising!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any suggestion on the naming? fillCosts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not off the top of my head, no. This interface for mutability by an external actor is a little strange. Maybe applyCostsConsumer ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let file another issue to land this naming change.

consumer.accept(costs);
costsChanged = true;
}

double cost() {
if (costsChanged) {
cost = computeCost(costs);
costsChanged = false;
}
return cost;
}

private static double computeCost(double[] stats) {
double totalCost = 0;
double total = getSum(stats);

double count = stats.length;
double mean = total / count;

// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);

// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);

min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));

}
min = Math.max(0, min);
for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}

double scaled = CostFunction.scale(min, max, totalCost);
return scaled;
}

private static double getSum(double[] stats) {
double total = 0;
for (double s : stats) {
total += s;
}
return total;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {
"hbase.master.balancer.stochastic.primaryRegionCountCost";
private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;

private final float primaryRegionCountCost;
private double[] stats;
private final DoubleArrayCost cost = new DoubleArrayCost();

PrimaryRegionCountSkewCostFunction(Configuration conf) {
// Load multiplier should be the greatest as primary regions serve majority of reads/writes.
primaryRegionCountCost =
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST);
this.setMultiplier(primaryRegionCountCost);
this.setMultiplier(
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
}

private double computeCostForRegionServer(int regionServerIndex) {
int cost = 0;
for (int regionIdx : cluster.regionsPerServer[regionServerIndex]) {
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
cost++;
}
}
return cost;
}

@Override
Expand All @@ -47,9 +55,20 @@ void prepare(BalancerClusterState cluster) {
if (!isNeeded()) {
return;
}
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
cost.prepare(cluster.numServers);
cost.setCosts(costs -> {
for (int i = 0; i < costs.length; i++) {
costs[i] = computeCostForRegionServer(i);
}
});
}

@Override
protected void regionMoved(int region, int oldServer, int newServer) {
cost.setCosts(costs -> {
costs[oldServer] = computeCostForRegionServer(oldServer);
costs[newServer] = computeCostForRegionServer(newServer);
});
}

@Override
Expand All @@ -59,15 +78,6 @@ boolean isNeeded() {

@Override
protected double cost() {
for (int i = 0; i < cluster.numServers; i++) {
stats[i] = 0;
for (int regionIdx : cluster.regionsPerServer[i]) {
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
stats[i]++;
}
}
}

return costFromArray(stats);
return cost.cost();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class RegionCountSkewCostFunction extends CostFunction {
"hbase.master.balancer.stochastic.regionCountCost";
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;

private double[] stats;
private final DoubleArrayCost cost = new DoubleArrayCost();

RegionCountSkewCostFunction(Configuration conf) {
// Load multiplier should be the greatest as it is the most general way to balance data.
Expand All @@ -44,9 +44,12 @@ class RegionCountSkewCostFunction extends CostFunction {
@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
cost.prepare(cluster.numServers);
cost.setCosts(costs -> {
for (int i = 0; i < cluster.numServers; i++) {
costs[i] = cluster.regionsPerServer[i].length;
}
});
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
cluster.numServers, cluster.numRegions);
if (LOG.isTraceEnabled()) {
Expand All @@ -59,9 +62,14 @@ void prepare(BalancerClusterState cluster) {

@Override
protected double cost() {
for (int i = 0; i < cluster.numServers; i++) {
stats[i] = cluster.regionsPerServer[i].length;
}
return costFromArray(stats);
return cost.cost();
}

@Override
protected void regionMoved(int region, int oldServer, int newServer) {
cost.setCosts(costs -> {
costs[oldServer] = cluster.regionsPerServer[oldServer].length;
costs[newServer] = cluster.regionsPerServer[newServer].length;
});
}
}
Loading