Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
Expand Down Expand Up @@ -60,9 +61,13 @@ class ZKConnectionRegistry implements ConnectionRegistry {

private final ZNodePaths znodePaths;

private final boolean consumeMasterProxyPort;

ZKConnectionRegistry(Configuration conf) {
this.znodePaths = new ZNodePaths(conf);
this.zk = new ReadOnlyZKClient(conf);
consumeMasterProxyPort = conf.getBoolean(HConstants.CONSUME_MASTER_PROXY_PORT,
HConstants.CONSUME_MASTER_PROXY_PORT_DEFAULT);
}

private interface Converter<T> {
Expand Down Expand Up @@ -229,8 +234,13 @@ public CompletableFuture<ServerName> getActiveMaster() {
return null;
}
HBaseProtos.ServerName snProto = proto.getMaster();
return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
snProto.getStartCode());
if (consumeMasterProxyPort && proto.hasProxyPort()) {
return ServerName.valueOf(snProto.getHostName(), proto.getProxyPort(),
snProto.getStartCode());
} else {
return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
snProto.getStartCode());
}
}),
"ZKConnectionRegistry.getActiveMaster");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,10 @@ public enum OperationStatusCode {
*/
public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;

public static final String CONSUME_MASTER_PROXY_PORT =
"hbase.client.consume.master.proxy.port";
public static final boolean CONSUME_MASTER_PROXY_PORT_DEFAULT = false;

private HConstants() {
// Can't be instantiated with this ctor.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ message Master {
// Major RPC version so that clients can know what version the master can accept.
optional uint32 rpc_version = 2;
optional uint32 info_port = 3;
optional uint32 proxy_port = 4;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], Strin
* Get live region servers from masters.
*/
CompletableFuture<List<ServerName>> getLiveRegionServers(MasterAddressTracker masterAddrTracker,
int count);
int count, boolean consumeMasterProxyPort);

/**
* Get the bootstrap node list of another region server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,14 @@ CleanupBulkLoadResponse, Void> call(controller, loc, stub, bulkToken, (rn, bt) -

@Override
public CompletableFuture<List<ServerName>>
getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count) {
getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count,
boolean consumeMasterProxyPort) {
CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
ServerName masterServerName = consumeMasterProxyPort ?
masterAddrTracker.getMasterAddressWithProxyPortIfAvailable(false) :
masterAddrTracker.getMasterAddress();
RegionServerStatusService.Interface stub = RegionServerStatusService
.newStub(rpcClient.createRpcChannel(masterAddrTracker.getMasterAddress(), user, rpcTimeout));
.newStub(rpcClient.createRpcChannel(masterServerName, user, rpcTimeout));
HBaseRpcController controller = rpcControllerFactory.newController();
stub.getLiveRegionServers(controller,
GetLiveRegionServersRequest.newBuilder().setCount(count).build(), resp -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class ActiveMasterManager extends ZKListener {
final ServerName sn;
final Server master;

private final int masterProxyPort;

// Active master's server name. Invalidated anytime active master changes (based on ZK
// notifications) and lazily fetched on-demand.
// ServerName is immutable, so we don't need heavy synchronization around it.
Expand All @@ -80,13 +82,14 @@ public class ActiveMasterManager extends ZKListener {
* @param sn ServerName
* @param master In an instance of a Master.
*/
ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master)
ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master, int masterProxyPort)
throws InterruptedIOException {
super(watcher);
watcher.registerListener(this);
this.sn = sn;
this.master = master;
updateBackupMasters();
this.masterProxyPort = masterProxyPort;
}

// will be set after jetty server is started
Expand Down Expand Up @@ -231,10 +234,8 @@ boolean blockUntilBecomingActiveMaster(int checkInterval, MonitoredTask startupS
// Try to become the active master, watch if there is another master.
// Write out our ServerName as versioned bytes.
try {
if (
MasterAddressTracker.setMasterAddress(this.watcher,
this.watcher.getZNodePaths().masterAddressZNode, this.sn, infoPort)
) {
if (MasterAddressTracker.setMasterAddress(this.watcher,
this.watcher.getZNodePaths().masterAddressZNode, this.sn, infoPort, masterProxyPort)) {

// If we were a backup master before, delete our ZNode from the backup
// master directory since we are the active now)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,11 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
public static final String WARMUP_BEFORE_MOVE = "hbase.master.warmup.before.move";
private static final boolean DEFAULT_WARMUP_BEFORE_MOVE = true;

static final String MASTER_PROXY_PORT_EXPOSE = "hbase.master.expose.proxy.port";
private static final int MASTER_PROXY_PORT_EXPOSE_DEFAULT = -1;

private final int masterProxyPort;

/**
* Initializes the HMaster. The steps are as follows:
* <p>
Expand Down Expand Up @@ -533,7 +538,9 @@ public HMaster(final Configuration conf) throws IOException {
getChoreService().scheduleChore(clusterStatusPublisherChore);
}
}
this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
masterProxyPort = conf.getInt(MASTER_PROXY_PORT_EXPOSE, MASTER_PROXY_PORT_EXPOSE_DEFAULT);
this.activeMasterManager =
createActiveMasterManager(zooKeeper, serverName, this, masterProxyPort);
cachedClusterId = new CachedClusterId(this, conf);
this.regionServerTracker = new RegionServerTracker(zooKeeper, this);
this.rpcServices.start(zooKeeper);
Expand All @@ -554,8 +561,8 @@ public HMaster(final Configuration conf) throws IOException {
* implementation.
*/
protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn,
org.apache.hadoop.hbase.Server server) throws InterruptedIOException {
return new ActiveMasterManager(zk, sn, server);
org.apache.hadoop.hbase.Server server, int proxyPort) throws InterruptedIOException {
return new ActiveMasterManager(zk, sn, server, proxyPort);
}

@Override
Expand Down Expand Up @@ -2394,7 +2401,8 @@ private void startActiveMasterManager(int infoPort) throws KeeperException {
* delete this node for us since it is ephemeral.
*/
LOG.info("Adding backup master ZNode " + backupZNode);
if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort)) {
if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort,
masterProxyPort)) {
LOG.warn("Failed create of " + backupZNode + " by " + serverName);
}
this.activeMasterManager.setInfoPort(infoPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ public class BootstrapNodeManager {

private long lastRequestMasterTime;

public BootstrapNodeManager(AsyncClusterConnection conn, MasterAddressTracker masterAddrTracker) {
private final boolean consumeMasterProxyPort;

public BootstrapNodeManager(AsyncClusterConnection conn, MasterAddressTracker masterAddrTracker,
boolean consumeMasterProxyPort) {
this.conn = conn;
this.masterAddrTracker = masterAddrTracker;
Configuration conf = conn.getConfiguration();
Expand All @@ -125,6 +128,7 @@ public BootstrapNodeManager(AsyncClusterConnection conn, MasterAddressTracker ma
.setTimeUnit(TimeUnit.SECONDS));
executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs),
TimeUnit.SECONDS);
this.consumeMasterProxyPort = consumeMasterProxyPort;
}

private long getDelay(long delay) {
Expand All @@ -136,8 +140,8 @@ private void getFromMaster() {
List<ServerName> liveRegionServers;
try {
// get 2 times number of node
liveRegionServers =
FutureUtils.get(conn.getLiveRegionServers(masterAddrTracker, maxNodeCount * 2));
liveRegionServers = FutureUtils.get(
conn.getLiveRegionServers(masterAddrTracker, maxNodeCount * 2, consumeMasterProxyPort));
} catch (IOException e) {
LOG.warn("failed to get live region servers from master", e);
if (retryCounter == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>

private RegionReplicationBufferManager regionReplicationBufferManager;

private final boolean consumeMasterProxyPort;

/**
* Starts a HRegionServer at the default location.
* <p/>
Expand All @@ -487,6 +489,9 @@ public HRegionServer(final Configuration conf) throws IOException {
checkCodecs(this.conf);
FSUtils.setupShortCircuitRead(this.conf);

consumeMasterProxyPort =
this.conf.getBoolean(HConstants.CONSUME_MASTER_PROXY_PORT, HConstants.CONSUME_MASTER_PROXY_PORT_DEFAULT);

// Disable usage of meta replicas in the regionserver
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
// Config'ed params
Expand Down Expand Up @@ -661,7 +666,8 @@ private void preRegistrationInitialization() {
try (Scope ignored = span.makeCurrent()) {
initializeZooKeeper();
setupClusterConnection();
bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker,
this.consumeMasterProxyPort);
regionReplicationBufferManager = new RegionReplicationBufferManager(this);
// Setup RPC client for master communication
this.rpcClient = asyncClusterConnection.getRpcClient();
Expand Down Expand Up @@ -2446,7 +2452,9 @@ private synchronized ServerName createRegionServerStatusStub() {
@InterfaceAudience.Private
protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
if (rssStub != null) {
return masterAddressTracker.getMasterAddress();
return this.consumeMasterProxyPort ?
masterAddressTracker.getMasterAddressWithProxyPortIfAvailable(false) :
masterAddressTracker.getMasterAddress();
}
ServerName sn = null;
long previousLogTime = 0;
Expand All @@ -2455,7 +2463,9 @@ protected synchronized ServerName createRegionServerStatusStub(boolean refresh)
boolean interrupted = false;
try {
while (keepLooping()) {
sn = this.masterAddressTracker.getMasterAddress(refresh);
sn = this.consumeMasterProxyPort ?
this.masterAddressTracker.getMasterAddressWithProxyPortIfAvailable(refresh) :
this.masterAddressTracker.getMasterAddress(refresh);
if (sn == null) {
if (!keepLooping()) {
// give up with no connection.
Expand Down Expand Up @@ -3518,7 +3528,9 @@ public long getRetryPauseTime() {

@Override
public Optional<ServerName> getActiveMaster() {
return Optional.ofNullable(masterAddressTracker.getMasterAddress());
return Optional.ofNullable(this.consumeMasterProxyPort ?
masterAddressTracker.getMasterAddressWithProxyPortIfAvailable(false) :
masterAddressTracker.getMasterAddress());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public Connection toConnection() {

@Override
public CompletableFuture<List<ServerName>>
getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count) {
getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count,
boolean consumeMasterProxyPort) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class AlwaysStandByHMaster extends HMaster {
private static class AlwaysStandByMasterManager extends ActiveMasterManager {
private static final Logger LOG = LoggerFactory.getLogger(AlwaysStandByMasterManager.class);

AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master)
AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master, int proxyPort)
throws InterruptedIOException {
super(watcher, sn, master);
super(watcher, sn, master, proxyPort);
}

/**
Expand Down Expand Up @@ -94,7 +94,7 @@ public AlwaysStandByHMaster(Configuration conf) throws IOException {
}

protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn,
org.apache.hadoop.hbase.Server server) throws InterruptedIOException {
return new AlwaysStandByMasterManager(zk, sn, server);
org.apache.hadoop.hbase.Server server, int proxyPort) throws InterruptedIOException {
return new AlwaysStandByMasterManager(zk, sn, server, proxyPort);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public void testBackupMasterUpdates() throws Exception {
String backupZn =
ZNodePaths.joinZNode(zk.getZNodePaths().backupMasterAddressesZNode, backupSn.toString());
backupZNodes.add(backupZn);
MasterAddressTracker.setMasterAddress(zk, backupZn, backupSn, 1234);
MasterAddressTracker.setMasterAddress(zk, backupZn, backupSn, 1234, -1);
TEST_UTIL.waitFor(10000,
() -> activeMasterManager.getBackupMasters().size() == backupZNodes.size());
}
Expand Down Expand Up @@ -305,7 +305,7 @@ public DummyMaster(ZKWatcher zk, ServerName master) throws InterruptedIOExceptio
this.clusterStatusTracker = new ClusterStatusTracker(zk, this);
clusterStatusTracker.start();

this.activeMasterManager = new ActiveMasterManager(zk, master, this);
this.activeMasterManager = new ActiveMasterManager(zk, master, this, -1);
zk.registerListener(activeMasterManager);
}

Expand Down
Loading