Skip to content

Commit 475383b

Browse files
author
Ray Mattingly
committed
HBASE-28513 The StochasticLoadBalancer should support discrete evaluations
1 parent d9ff32a commit 475383b

20 files changed

+1911
-28
lines changed

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ enum Type {
2828
ASSIGN_REGION,
2929
MOVE_REGION,
3030
SWAP_REGIONS,
31+
MOVE_BATCH,
3132
NULL,
3233
}
3334

@@ -51,6 +52,10 @@ Type getType() {
5152
return type;
5253
}
5354

55+
long getStepCount() {
56+
return 1;
57+
}
58+
5459
@Override
5560
public String toString() {
5661
return type + ":";

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,26 @@
2626
import java.util.HashMap;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.Set;
30+
import java.util.concurrent.TimeUnit;
2931
import org.agrona.collections.Hashing;
3032
import org.agrona.collections.Int2IntCounterMap;
3133
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
3234
import org.apache.hadoop.hbase.ServerName;
3335
import org.apache.hadoop.hbase.client.RegionInfo;
3436
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
3537
import org.apache.hadoop.hbase.master.RackManager;
38+
import org.apache.hadoop.hbase.master.RegionPlan;
3639
import org.apache.hadoop.hbase.net.Address;
3740
import org.apache.hadoop.hbase.util.Pair;
3841
import org.apache.yetus.audience.InterfaceAudience;
3942
import org.slf4j.Logger;
4043
import org.slf4j.LoggerFactory;
4144

45+
import org.apache.hbase.thirdparty.com.google.common.base.Supplier;
46+
import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
47+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
48+
4249
/**
4350
* An efficient array based implementation similar to ClusterState for keeping the status of the
4451
* cluster in terms of region assignment and distribution. LoadBalancers, such as
@@ -123,6 +130,14 @@ class BalancerClusterState {
123130
// Maps regionName -> oldServerName -> cache ratio of the region on the old server
124131
Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
125132

133+
private Supplier<List<Integer>> shuffledServerIndicesSupplier =
134+
Suppliers.memoizeWithExpiration(() -> {
135+
Collection<Integer> serverIndices = serversToIndex.values();
136+
List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices);
137+
Collections.shuffle(shuffledServerIndices);
138+
return shuffledServerIndices;
139+
}, 5, TimeUnit.SECONDS);
140+
126141
static class DefaultRackManager extends RackManager {
127142
@Override
128143
public String getRack(ServerName server) {
@@ -711,6 +726,44 @@ enum LocalityType {
711726
RACK
712727
}
713728

729+
public List<RegionPlan> convertActionToPlans(BalanceAction action) {
730+
switch (action.getType()) {
731+
case NULL:
732+
break;
733+
case ASSIGN_REGION:
734+
// FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
735+
assert action instanceof AssignRegionAction : action.getClass();
736+
AssignRegionAction ar = (AssignRegionAction) action;
737+
return ImmutableList
738+
.of(new RegionPlan(regions[ar.getRegion()], null, servers[ar.getServer()]));
739+
case MOVE_REGION:
740+
assert action instanceof MoveRegionAction : action.getClass();
741+
MoveRegionAction mra = (MoveRegionAction) action;
742+
return ImmutableList.of(new RegionPlan(regions[mra.getRegion()],
743+
servers[mra.getFromServer()], servers[mra.getToServer()]));
744+
case SWAP_REGIONS:
745+
assert action instanceof SwapRegionsAction : action.getClass();
746+
SwapRegionsAction a = (SwapRegionsAction) action;
747+
return ImmutableList.of(
748+
new RegionPlan(regions[a.getFromRegion()], servers[a.getFromServer()],
749+
servers[a.getToServer()]),
750+
new RegionPlan(regions[a.getToRegion()], servers[a.getToServer()],
751+
servers[a.getFromServer()]));
752+
case MOVE_BATCH:
753+
assert action instanceof MoveBatchAction : action.getClass();
754+
MoveBatchAction mba = (MoveBatchAction) action;
755+
List<RegionPlan> mbRegionPlans = new ArrayList<>();
756+
for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
757+
mbRegionPlans.add(new RegionPlan(regions[moveRegionAction.getRegion()],
758+
servers[moveRegionAction.getFromServer()], servers[moveRegionAction.getToServer()]));
759+
}
760+
return mbRegionPlans;
761+
default:
762+
throw new RuntimeException("Unknown action:" + action.getType());
763+
}
764+
return Collections.emptyList();
765+
}
766+
714767
public void doAction(BalanceAction action) {
715768
switch (action.getType()) {
716769
case NULL:
@@ -742,8 +795,25 @@ public void doAction(BalanceAction action) {
742795
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
743796
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
744797
break;
798+
case MOVE_BATCH:
799+
assert action instanceof MoveBatchAction : action.getClass();
800+
MoveBatchAction mba = (MoveBatchAction) action;
801+
for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) {
802+
Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex);
803+
regionsPerServer[serverIndex] =
804+
removeRegions(regionsPerServer[serverIndex], regionsToRemove);
805+
}
806+
for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) {
807+
Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex);
808+
regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd);
809+
}
810+
for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
811+
regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(),
812+
moveRegionAction.getToServer());
813+
}
814+
break;
745815
default:
746-
throw new RuntimeException("Uknown action:" + action.getType());
816+
throw new RuntimeException("Unknown action:" + action.getType());
747817
}
748818
}
749819

@@ -905,6 +975,52 @@ int[] addRegion(int[] regions, int regionIndex) {
905975
return newRegions;
906976
}
907977

978+
int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) {
979+
// Calculate the size of the new regions array
980+
int newSize = regions.length - regionIndicesToRemove.size();
981+
if (newSize < 0) {
982+
throw new IllegalStateException(
983+
"Region indices mismatch: more regions to remove than in the regions array");
984+
}
985+
986+
int[] newRegions = new int[newSize];
987+
int newIndex = 0;
988+
989+
// Copy only the regions not in the removal set
990+
for (int region : regions) {
991+
if (!regionIndicesToRemove.contains(region)) {
992+
newRegions[newIndex++] = region;
993+
}
994+
}
995+
996+
// If the newIndex is smaller than newSize, some regions were missing from the input array
997+
if (newIndex != newSize) {
998+
throw new IllegalStateException("Region indices mismatch: some regions in the removal "
999+
+ "set were not found in the regions array");
1000+
}
1001+
1002+
return newRegions;
1003+
}
1004+
1005+
int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) {
1006+
int[] newRegions = new int[regions.length + regionIndicesToAdd.size()];
1007+
1008+
// Copy the existing regions to the new array
1009+
System.arraycopy(regions, 0, newRegions, 0, regions.length);
1010+
1011+
// Add the new regions at the end of the array
1012+
int newIndex = regions.length;
1013+
for (int regionIndex : regionIndicesToAdd) {
1014+
newRegions[newIndex++] = regionIndex;
1015+
}
1016+
1017+
return newRegions;
1018+
}
1019+
1020+
List<Integer> getShuffledServerIndices() {
1021+
return shuffledServerIndicesSupplier.get();
1022+
}
1023+
9081024
int[] addRegionSorted(int[] regions, int regionIndex) {
9091025
int[] newRegions = new int[regions.length + 1];
9101026
int i = 0;
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.balancer;
19+
20+
import java.lang.reflect.Constructor;
21+
import java.util.Collection;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Objects;
25+
import java.util.Set;
26+
import java.util.stream.Collectors;
27+
import org.apache.hadoop.conf.Configurable;
28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.hbase.master.RegionPlan;
30+
import org.apache.hadoop.hbase.util.ReflectionUtils;
31+
import org.apache.yetus.audience.InterfaceAudience;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
36+
37+
/**
38+
* Balancer conditionals supplement cost functions in the {@link StochasticLoadBalancer}. Cost
39+
* functions are insufficient and difficult to work with when making discrete decisions; this is
40+
* because they operate on a continuous scale, and each cost function's multiplier affects the
41+
* relative importance of every other cost function. So it is difficult to meaningfully and clearly
42+
* value many aspects of your region distribution via cost functions alone. Conditionals allow you
43+
* to very clearly define discrete rules that your balancer would ideally follow. To clarify, a
44+
* conditional violation will not block a region assignment because we would prefer to have uptime
45+
* than have perfectly intentional balance. But conditionals allow you to, for example, define that
46+
* a region's primary and secondary should not live on the same rack. Another example, conditionals
47+
* make it easy to define that system tables will ideally be isolated on their own RegionServer
48+
* (without needing to manage distinct RegionServer groups). Use of conditionals may cause an
49+
* extremely unbalanced cluster to exceed its max balancer runtime. This is necessary because
50+
* conditional candidate generation is quite expensive, and cutting it off early could prevent us
51+
* from finding a solution.
52+
*/
53+
@InterfaceAudience.Private
54+
final class BalancerConditionals implements Configurable {
55+
56+
private static final Logger LOG = LoggerFactory.getLogger(BalancerConditionals.class);
57+
58+
static final BalancerConditionals INSTANCE = new BalancerConditionals();
59+
public static final String DISTRIBUTE_REPLICAS_KEY =
60+
"hbase.master.balancer.stochastic.conditionals.distributeReplicas";
61+
public static final boolean DISTRIBUTE_REPLICAS_DEFAULT = false;
62+
63+
public static final String ADDITIONAL_CONDITIONALS_KEY =
64+
"hbase.master.balancer.stochastic.additionalConditionals";
65+
66+
private Set<Class<? extends RegionPlanConditional>> conditionalClasses = Collections.emptySet();
67+
private Set<RegionPlanConditional> conditionals = Collections.emptySet();
68+
private Configuration conf;
69+
70+
private BalancerConditionals() {
71+
}
72+
73+
boolean shouldRunBalancer(BalancerClusterState cluster) {
74+
return isConditionalBalancingEnabled() && conditionals.stream()
75+
.map(RegionPlanConditional::getCandidateGenerators).flatMap(Collection::stream)
76+
.map(generator -> generator.getWeight(cluster)).anyMatch(weight -> weight > 0);
77+
}
78+
79+
Set<Class<? extends RegionPlanConditional>> getConditionalClasses() {
80+
return Set.copyOf(conditionalClasses);
81+
}
82+
83+
Collection<RegionPlanConditional> getConditionals() {
84+
return conditionals;
85+
}
86+
87+
boolean isReplicaDistributionEnabled() {
88+
return conditionalClasses.contains(DistributeReplicasConditional.class);
89+
}
90+
91+
boolean shouldSkipSloppyServerEvaluation() {
92+
return isConditionalBalancingEnabled();
93+
}
94+
95+
boolean isConditionalBalancingEnabled() {
96+
return !conditionalClasses.isEmpty();
97+
}
98+
99+
void clearConditionalWeightCaches() {
100+
conditionals.stream().map(RegionPlanConditional::getCandidateGenerators)
101+
.flatMap(Collection::stream)
102+
.forEach(RegionPlanConditionalCandidateGenerator::clearWeightCache);
103+
}
104+
105+
void loadClusterState(BalancerClusterState cluster) {
106+
conditionals = conditionalClasses.stream().map(clazz -> createConditional(clazz, conf, cluster))
107+
.filter(Objects::nonNull).collect(Collectors.toSet());
108+
}
109+
110+
/**
111+
* Indicates whether the action is good for our conditional compliance.
112+
* @param cluster The cluster state
113+
* @param action The proposed action
114+
* @return -1 if conditionals improve, 0 if neutral, 1 if conditionals degrade
115+
*/
116+
int getViolationCountChange(BalancerClusterState cluster, BalanceAction action) {
117+
boolean isViolatingPre = isViolating(cluster, action.undoAction());
118+
boolean isViolatingPost = isViolating(cluster, action);
119+
if (isViolatingPre && isViolatingPost) {
120+
return 0;
121+
} else if (!isViolatingPre && isViolatingPost) {
122+
return 1;
123+
} else {
124+
return -1;
125+
}
126+
}
127+
128+
/**
129+
* Check if the proposed action violates conditionals
130+
* @param cluster The cluster state
131+
* @param action The proposed action
132+
*/
133+
boolean isViolating(BalancerClusterState cluster, BalanceAction action) {
134+
conditionals.forEach(conditional -> conditional.refreshClusterState(cluster));
135+
if (conditionals.isEmpty()) {
136+
return false;
137+
}
138+
List<RegionPlan> regionPlans = cluster.convertActionToPlans(action);
139+
for (RegionPlan regionPlan : regionPlans) {
140+
if (isViolating(regionPlan)) {
141+
return true;
142+
}
143+
}
144+
return false;
145+
}
146+
147+
private boolean isViolating(RegionPlan regionPlan) {
148+
for (RegionPlanConditional conditional : conditionals) {
149+
if (conditional.isViolating(regionPlan)) {
150+
return true;
151+
}
152+
}
153+
return false;
154+
}
155+
156+
private RegionPlanConditional createConditional(Class<? extends RegionPlanConditional> clazz,
157+
Configuration conf, BalancerClusterState cluster) {
158+
if (conf == null) {
159+
conf = new Configuration();
160+
}
161+
if (cluster == null) {
162+
cluster = new BalancerClusterState(Collections.emptyMap(), null, null, null, null);
163+
}
164+
try {
165+
Constructor<? extends RegionPlanConditional> ctor =
166+
clazz.getDeclaredConstructor(Configuration.class, BalancerClusterState.class);
167+
return ReflectionUtils.instantiate(clazz.getName(), ctor, conf, cluster);
168+
} catch (NoSuchMethodException e) {
169+
LOG.warn("Cannot find constructor with Configuration and "
170+
+ "BalancerClusterState parameters for class '{}': {}", clazz.getName(), e.getMessage());
171+
}
172+
return null;
173+
}
174+
175+
@Override
176+
public void setConf(Configuration conf) {
177+
this.conf = conf;
178+
ImmutableSet.Builder<Class<? extends RegionPlanConditional>> conditionalClasses =
179+
ImmutableSet.builder();
180+
181+
boolean distributeReplicas =
182+
conf.getBoolean(DISTRIBUTE_REPLICAS_KEY, DISTRIBUTE_REPLICAS_DEFAULT);
183+
if (distributeReplicas) {
184+
conditionalClasses.add(DistributeReplicasConditional.class);
185+
}
186+
187+
Class<?>[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY);
188+
for (Class<?> clazz : classes) {
189+
if (!RegionPlanConditional.class.isAssignableFrom(clazz)) {
190+
LOG.warn("Class {} is not a RegionPlanConditional", clazz.getName());
191+
continue;
192+
}
193+
conditionalClasses.add(clazz.asSubclass(RegionPlanConditional.class));
194+
}
195+
this.conditionalClasses = conditionalClasses.build();
196+
loadClusterState(null);
197+
}
198+
199+
@Override
200+
public Configuration getConf() {
201+
return conf;
202+
}
203+
}

0 commit comments

Comments
 (0)