Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hbase.rsgroup;

import com.google.errorprone.annotations.RestrictedApi;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -70,7 +71,6 @@
public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);

private ClusterMetrics clusterStatus;
private MasterServices masterServices;
private volatile RSGroupInfoManager rsGroupInfoManager;
private LoadBalancer internalBalancer;
Expand All @@ -92,24 +92,29 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
@InterfaceAudience.Private
public RSGroupBasedLoadBalancer() {}

// must be called after calling initialize
@Override
public void setClusterMetrics(ClusterMetrics sm) {
this.clusterStatus = sm;
if (internalBalancer != null) {
internalBalancer.setClusterMetrics(sm);
}
public synchronized void updateClusterMetrics(ClusterMetrics sm) {
assert internalBalancer != null;
internalBalancer.updateClusterMetrics(sm);
}

@Override
public void setMasterServices(MasterServices masterServices) {
this.masterServices = masterServices;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) {
this.rsGroupInfoManager = rsGroupInfoManager;
}

/**
* Balance by RSGroup.
*/
@Override
public List<RegionPlan> balanceCluster(
public synchronized List<RegionPlan> balanceCluster(
Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
if (!isOnline()) {
throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
Expand Down Expand Up @@ -328,7 +333,7 @@ public void initialize() throws HBaseIOException {
throw new HBaseIOException(msg);
}
rsGroupInfoManager = cps.get(0).getGroupInfoManager();
if(rsGroupInfoManager == null){
if (rsGroupInfoManager == null) {
String msg = "RSGroupInfoManager hasn't been initialized";
LOG.error(msg);
throw new HBaseIOException(msg);
Expand All @@ -342,17 +347,14 @@ public void initialize() throws HBaseIOException {
Configuration conf = masterServices.getConfiguration();
// Create the balancer
Class<? extends LoadBalancer> balancerClass = conf.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS,
StochasticLoadBalancer.class, LoadBalancer.class);
StochasticLoadBalancer.class, LoadBalancer.class);
if (this.getClass().isAssignableFrom(balancerClass)) {
LOG.warn("The internal balancer of RSGroupBasedLoadBalancer cannot be itself, " +
"falling back to the default LoadBalancer class");
"falling back to the default LoadBalancer class");
balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass();
}
internalBalancer = ReflectionUtils.newInstance(balancerClass);
internalBalancer.setMasterServices(masterServices);
if (clusterStatus != null) {
internalBalancer.setClusterMetrics(clusterStatus);
}
internalBalancer.initialize();
// init fallback groups
this.fallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
Expand All @@ -379,7 +381,7 @@ public void regionOffline(RegionInfo regionInfo) {
}

@Override
public void onConfigurationChange(Configuration conf) {
public synchronized void onConfigurationChange(Configuration conf) {
boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
if (fallbackEnabled != newFallbackEnabled) {
LOG.info("Changing the value of {} from {} to {}", FALLBACK_GROUP_ENABLE_KEY,
Expand All @@ -391,19 +393,16 @@ public void onConfigurationChange(Configuration conf) {

@Override
public void stop(String why) {
internalBalancer.stop(why);
}

@Override
public boolean isStopped() {
return false;
}

public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) {
this.rsGroupInfoManager = rsGroupInfoManager;
return internalBalancer.isStopped();
}

@Override
public void postMasterStartupInitialize() {
public synchronized void postMasterStartupInitialize() {
this.internalBalancer.postMasterStartupInitialize();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -67,6 +68,7 @@ public static void beforeAllTests() throws Exception {
tableDescs = constructTableDesc(true);
conf.set("hbase.regions.slop", "0");
conf.set("hbase.rsgroup.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName());
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
loadBalancer = new RSGroupBasedLoadBalancer();
loadBalancer.setRsGroupInfoManager(getMockedGroupInfoManager());
loadBalancer.setMasterServices(getMockedMaster());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -67,7 +68,8 @@ public static void beforeAllTests() throws Exception {
conf.set("hbase.regions.slop", "0");
conf.setFloat("hbase.master.balancer.stochastic.readRequestCost", 10000f);
conf.set("hbase.rsgroup.grouploadbalancer.class",
StochasticLoadBalancer.class.getCanonicalName());
StochasticLoadBalancer.class.getCanonicalName());
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
loadBalancer = new RSGroupBasedLoadBalancer();
loadBalancer.setRsGroupInfoManager(getMockedGroupInfoManager());
loadBalancer.setMasterServices(getMockedMaster());
Expand Down Expand Up @@ -113,7 +115,7 @@ public void testBalanceCluster() throws IOException {
serverMetricsMap.put(serverC, mockServerMetricsWithReadRequests(serverC, regionsOnServerC, 0));
ClusterMetrics clusterStatus = mock(ClusterMetrics.class);
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
loadBalancer.setClusterMetrics(clusterStatus);
loadBalancer.updateClusterMetrics(clusterStatus);

// ReadRequestCostFunction are Rate based, So doing setClusterMetrics again
// this time, regions on serverA with more readRequestCount load
Expand All @@ -128,7 +130,7 @@ public void testBalanceCluster() throws IOException {
serverMetricsMap.put(serverC, mockServerMetricsWithReadRequests(serverC, regionsOnServerC, 0));
clusterStatus = mock(ClusterMetrics.class);
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
loadBalancer.setClusterMetrics(clusterStatus);
loadBalancer.updateClusterMetrics(clusterStatus);

Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
Expand Down Expand Up @@ -68,14 +67,12 @@
public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements FavoredNodesPromoter {
private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeLoadBalancer.class);

private RackManager rackManager;
private FavoredNodesManager fnm;

@Override
public synchronized void initialize() throws HBaseIOException {
public void initialize() {
super.initialize();
this.fnm = services.getFavoredNodesManager();
this.rackManager = new RackManager(getConf());
}

@Override
Expand Down Expand Up @@ -316,7 +313,7 @@ private void addRegionToMap(Map<ServerName, List<RegionInfo>> assignmentMapForFa
regionsOnServer.add(region);
}

public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
public List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
return this.fnm.getFavoredNodes(regionInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,8 +919,8 @@ private void finishActiveMasterInitialization(MonitoredTask status)

// initialize load balancer
this.balancer.setMasterServices(this);
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
this.balancer.initialize();
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());

// start up all service threads.
status.setStatus("Initializing master service threads");
Expand Down Expand Up @@ -1003,7 +1003,7 @@ private void finishActiveMasterInitialization(MonitoredTask status)
}

// set cluster status again after user regions are assigned
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());

// Start balancer and meta catalog janitor after meta and regions have been assigned.
status.setStatus("Starting balancer and catalog janitor");
Expand Down Expand Up @@ -1718,7 +1718,7 @@ public boolean balance(boolean force) throws IOException {
}

//Give the balancer the current cluster state.
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());

List<RegionPlan> plans = this.balancer.balanceCluster(assignments);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ public interface LoadBalancer extends Stoppable, ConfigurationObserver {
ServerName BOGUS_SERVER_NAME = ServerName.valueOf("localhost,1,1");

/**
* Set the current cluster status. This allows a LoadBalancer to map host name to a server
* @param st
* Set the current cluster status. This allows a LoadBalancer to map host name to a server
*/
void setClusterMetrics(ClusterMetrics st);
void updateClusterMetrics(ClusterMetrics metrics);

/**
* Set the master service.
Expand Down
Loading