diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index 1634b13ec7e8..d564ff087d43 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -33,6 +33,7 @@ import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; @@ -60,9 +61,13 @@ class ZKConnectionRegistry implements ConnectionRegistry { private final ZNodePaths znodePaths; + private final boolean consumeMasterProxyPort; + ZKConnectionRegistry(Configuration conf) { this.znodePaths = new ZNodePaths(conf); this.zk = new ReadOnlyZKClient(conf); + consumeMasterProxyPort = conf.getBoolean(HConstants.CONSUME_MASTER_PROXY_PORT, + HConstants.CONSUME_MASTER_PROXY_PORT_DEFAULT); } private interface Converter { @@ -229,8 +234,13 @@ public CompletableFuture getActiveMaster() { return null; } HBaseProtos.ServerName snProto = proto.getMaster(); - return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), - snProto.getStartCode()); + if (consumeMasterProxyPort && proto.hasProxyPort()) { + return ServerName.valueOf(snProto.getHostName(), proto.getProxyPort(), + snProto.getStartCode()); + } else { + return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), + snProto.getStartCode()); + } }), "ZKConnectionRegistry.getActiveMaster"); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index f4d43a2da291..c42a2b7b2404 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1567,6 +1567,10 @@ public enum OperationStatusCode { */ public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; + public static final String CONSUME_MASTER_PROXY_PORT = + "hbase.client.consume.master.proxy.port"; + public static final boolean CONSUME_MASTER_PROXY_PORT_DEFAULT = false; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/zookeeper/ZooKeeper.proto b/hbase-protocol-shaded/src/main/protobuf/server/zookeeper/ZooKeeper.proto index 17fa31ffbe69..4f9af4b68859 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/zookeeper/ZooKeeper.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/zookeeper/ZooKeeper.proto @@ -55,6 +55,7 @@ message Master { // Major RPC version so that clients can know what version the master can accept. optional uint32 rpc_version = 2; optional uint32 info_port = 3; + optional uint32 proxy_port = 4; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java index 9a03db6cc45b..f86453238b30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java @@ -97,7 +97,7 @@ CompletableFuture bulkLoad(TableName tableName, List> getLiveRegionServers(MasterAddressTracker masterAddrTracker, - int count); + int count, boolean consumeMasterProxyPort); /** * Get the bootstrap node list of another region server. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java index 1dda6c32ca04..bcada10f5bfc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java @@ -130,10 +130,14 @@ CleanupBulkLoadResponse, Void> call(controller, loc, stub, bulkToken, (rn, bt) - @Override public CompletableFuture> - getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count) { + getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count, + boolean consumeMasterProxyPort) { CompletableFuture> future = new CompletableFuture<>(); + ServerName masterServerName = consumeMasterProxyPort ? + masterAddrTracker.getMasterAddressWithProxyPortIfAvailable(false) : + masterAddrTracker.getMasterAddress(); RegionServerStatusService.Interface stub = RegionServerStatusService - .newStub(rpcClient.createRpcChannel(masterAddrTracker.getMasterAddress(), user, rpcTimeout)); + .newStub(rpcClient.createRpcChannel(masterServerName, user, rpcTimeout)); HBaseRpcController controller = rpcControllerFactory.newController(); stub.getLiveRegionServers(controller, GetLiveRegionServersRequest.newBuilder().setCount(count).build(), resp -> { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java index 50628f8717b7..c6765e3f7cf7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java @@ -67,6 +67,8 @@ public class ActiveMasterManager extends ZKListener { final ServerName sn; final Server master; + private final int masterProxyPort; + // Active master's server name. Invalidated anytime active master changes (based on ZK // notifications) and lazily fetched on-demand. // ServerName is immutable, so we don't need heavy synchronization around it. @@ -80,13 +82,14 @@ public class ActiveMasterManager extends ZKListener { * @param sn ServerName * @param master In an instance of a Master. */ - ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master) + ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master, int masterProxyPort) throws InterruptedIOException { super(watcher); watcher.registerListener(this); this.sn = sn; this.master = master; updateBackupMasters(); + this.masterProxyPort = masterProxyPort; } // will be set after jetty server is started @@ -231,10 +234,8 @@ boolean blockUntilBecomingActiveMaster(int checkInterval, MonitoredTask startupS // Try to become the active master, watch if there is another master. // Write out our ServerName as versioned bytes. try { - if ( - MasterAddressTracker.setMasterAddress(this.watcher, - this.watcher.getZNodePaths().masterAddressZNode, this.sn, infoPort) - ) { + if (MasterAddressTracker.setMasterAddress(this.watcher, + this.watcher.getZNodePaths().masterAddressZNode, this.sn, infoPort, masterProxyPort)) { // If we were a backup master before, delete our ZNode from the backup // master directory since we are the active now) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2b818d9cc238..095695109721 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -462,6 +462,11 @@ public class HMaster extends HBaseServerBase implements Maste public static final String WARMUP_BEFORE_MOVE = "hbase.master.warmup.before.move"; private static final boolean DEFAULT_WARMUP_BEFORE_MOVE = true; + static final String MASTER_PROXY_PORT_EXPOSE = "hbase.master.expose.proxy.port"; + private static final int MASTER_PROXY_PORT_EXPOSE_DEFAULT = -1; + + private final int masterProxyPort; + /** * Initializes the HMaster. The steps are as follows: *

@@ -533,7 +538,9 @@ public HMaster(final Configuration conf) throws IOException { getChoreService().scheduleChore(clusterStatusPublisherChore); } } - this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this); + masterProxyPort = conf.getInt(MASTER_PROXY_PORT_EXPOSE, MASTER_PROXY_PORT_EXPOSE_DEFAULT); + this.activeMasterManager = + createActiveMasterManager(zooKeeper, serverName, this, masterProxyPort); cachedClusterId = new CachedClusterId(this, conf); this.regionServerTracker = new RegionServerTracker(zooKeeper, this); this.rpcServices.start(zooKeeper); @@ -554,8 +561,8 @@ public HMaster(final Configuration conf) throws IOException { * implementation. */ protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn, - org.apache.hadoop.hbase.Server server) throws InterruptedIOException { - return new ActiveMasterManager(zk, sn, server); + org.apache.hadoop.hbase.Server server, int proxyPort) throws InterruptedIOException { + return new ActiveMasterManager(zk, sn, server, proxyPort); } @Override @@ -2394,7 +2401,8 @@ private void startActiveMasterManager(int infoPort) throws KeeperException { * delete this node for us since it is ephemeral. */ LOG.info("Adding backup master ZNode " + backupZNode); - if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort)) { + if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort, + masterProxyPort)) { LOG.warn("Failed create of " + backupZNode + " by " + serverName); } this.activeMasterManager.setInfoPort(infoPort); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BootstrapNodeManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BootstrapNodeManager.java index cbc635f6e977..6e97f39d29bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BootstrapNodeManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BootstrapNodeManager.java @@ -107,7 +107,10 @@ public class BootstrapNodeManager { private long lastRequestMasterTime; - public BootstrapNodeManager(AsyncClusterConnection conn, MasterAddressTracker masterAddrTracker) { + private final boolean consumeMasterProxyPort; + + public BootstrapNodeManager(AsyncClusterConnection conn, MasterAddressTracker masterAddrTracker, + boolean consumeMasterProxyPort) { this.conn = conn; this.masterAddrTracker = masterAddrTracker; Configuration conf = conn.getConfiguration(); @@ -125,6 +128,7 @@ public BootstrapNodeManager(AsyncClusterConnection conn, MasterAddressTracker ma .setTimeUnit(TimeUnit.SECONDS)); executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs), TimeUnit.SECONDS); + this.consumeMasterProxyPort = consumeMasterProxyPort; } private long getDelay(long delay) { @@ -136,8 +140,8 @@ private void getFromMaster() { List liveRegionServers; try { // get 2 times number of node - liveRegionServers = - FutureUtils.get(conn.getLiveRegionServers(masterAddrTracker, maxNodeCount * 2)); + liveRegionServers = FutureUtils.get( + conn.getLiveRegionServers(masterAddrTracker, maxNodeCount * 2, consumeMasterProxyPort)); } catch (IOException e) { LOG.warn("failed to get live region servers from master", e); if (retryCounter == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e79f4bec612a..1fc67d40499c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -470,6 +470,8 @@ public class HRegionServer extends HBaseServerBase private RegionReplicationBufferManager regionReplicationBufferManager; + private final boolean consumeMasterProxyPort; + /** * Starts a HRegionServer at the default location. *

@@ -487,6 +489,9 @@ public HRegionServer(final Configuration conf) throws IOException { checkCodecs(this.conf); FSUtils.setupShortCircuitRead(this.conf); + consumeMasterProxyPort = + this.conf.getBoolean(HConstants.CONSUME_MASTER_PROXY_PORT, HConstants.CONSUME_MASTER_PROXY_PORT_DEFAULT); + // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); // Config'ed params @@ -661,7 +666,8 @@ private void preRegistrationInitialization() { try (Scope ignored = span.makeCurrent()) { initializeZooKeeper(); setupClusterConnection(); - bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker); + bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker, + this.consumeMasterProxyPort); regionReplicationBufferManager = new RegionReplicationBufferManager(this); // Setup RPC client for master communication this.rpcClient = asyncClusterConnection.getRpcClient(); @@ -2446,7 +2452,9 @@ private synchronized ServerName createRegionServerStatusStub() { @InterfaceAudience.Private protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { if (rssStub != null) { - return masterAddressTracker.getMasterAddress(); + return this.consumeMasterProxyPort ? + masterAddressTracker.getMasterAddressWithProxyPortIfAvailable(false) : + masterAddressTracker.getMasterAddress(); } ServerName sn = null; long previousLogTime = 0; @@ -2455,7 +2463,9 @@ protected synchronized ServerName createRegionServerStatusStub(boolean refresh) boolean interrupted = false; try { while (keepLooping()) { - sn = this.masterAddressTracker.getMasterAddress(refresh); + sn = this.consumeMasterProxyPort ? + this.masterAddressTracker.getMasterAddressWithProxyPortIfAvailable(refresh) : + this.masterAddressTracker.getMasterAddress(refresh); if (sn == null) { if (!keepLooping()) { // give up with no connection. @@ -3518,7 +3528,9 @@ public long getRetryPauseTime() { @Override public Optional getActiveMaster() { - return Optional.ofNullable(masterAddressTracker.getMasterAddress()); + return Optional.ofNullable(this.consumeMasterProxyPort ? + masterAddressTracker.getMasterAddressWithProxyPortIfAvailable(false) : + masterAddressTracker.getMasterAddress()); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java index cb54e6e72634..3f1e626c8124 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java @@ -155,7 +155,8 @@ public Connection toConnection() { @Override public CompletableFuture> - getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count) { + getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count, + boolean consumeMasterProxyPort) { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java index 2bef48a79556..3956a6645be8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java @@ -45,9 +45,9 @@ public class AlwaysStandByHMaster extends HMaster { private static class AlwaysStandByMasterManager extends ActiveMasterManager { private static final Logger LOG = LoggerFactory.getLogger(AlwaysStandByMasterManager.class); - AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master) + AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master, int proxyPort) throws InterruptedIOException { - super(watcher, sn, master); + super(watcher, sn, master, proxyPort); } /** @@ -94,7 +94,7 @@ public AlwaysStandByHMaster(Configuration conf) throws IOException { } protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn, - org.apache.hadoop.hbase.Server server) throws InterruptedIOException { - return new AlwaysStandByMasterManager(zk, sn, server); + org.apache.hadoop.hbase.Server server, int proxyPort) throws InterruptedIOException { + return new AlwaysStandByMasterManager(zk, sn, server, proxyPort); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java index 60fb712daffe..86ec9b4d4dc2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java @@ -220,7 +220,7 @@ public void testBackupMasterUpdates() throws Exception { String backupZn = ZNodePaths.joinZNode(zk.getZNodePaths().backupMasterAddressesZNode, backupSn.toString()); backupZNodes.add(backupZn); - MasterAddressTracker.setMasterAddress(zk, backupZn, backupSn, 1234); + MasterAddressTracker.setMasterAddress(zk, backupZn, backupSn, 1234, -1); TEST_UTIL.waitFor(10000, () -> activeMasterManager.getBackupMasters().size() == backupZNodes.size()); } @@ -305,7 +305,7 @@ public DummyMaster(ZKWatcher zk, ServerName master) throws InterruptedIOExceptio this.clusterStatusTracker = new ClusterStatusTracker(zk, this); clusterStatusTracker.start(); - this.activeMasterManager = new ActiveMasterManager(zk, master, this); + this.activeMasterManager = new ActiveMasterManager(zk, master, this, -1); zk.registerListener(activeMasterManager); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterProxyPort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterProxyPort.java new file mode 100644 index 000000000000..040bf42f23db --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterProxyPort.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestMasterProxyPort { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMasterProxyPort.class); + + private static HBaseTestingUtil TEST_UTIL1; + private static HBaseTestingUtil TEST_UTIL2; + + @Before + public void setUp() throws Exception { + TEST_UTIL1 = new HBaseTestingUtil(); + TEST_UTIL2 = new HBaseTestingUtil(); + } + + @Test + public void testProxyPortPublishByMaster() throws Exception { + TEST_UTIL1.startMiniCluster(); + Optional activeMaster1 = + TEST_UTIL1.getMiniHBaseCluster().getRegionServer(0).getActiveMaster(); + Configuration conf = TEST_UTIL2.getConfiguration(); + conf.setInt(HMaster.MASTER_PROXY_PORT_EXPOSE, activeMaster1.get().getPort()); + TEST_UTIL2.startMiniCluster(); + Optional activeMaster2 = + TEST_UTIL2.getMiniHBaseCluster().getRegionServer(0).getActiveMaster(); + Assert.assertNotEquals(activeMaster1.get().getPort(), activeMaster2.get().getPort()); + TEST_UTIL1.shutdownMiniCluster(); + TEST_UTIL2.shutdownMiniCluster(); + } + + @Test + public void testProxyPortConsume() throws Exception { + TEST_UTIL1.startMiniCluster(); + Optional activeMaster1 = + TEST_UTIL1.getMiniHBaseCluster().getRegionServer(0).getActiveMaster(); + Configuration conf = TEST_UTIL2.getConfiguration(); + conf.setBoolean(HConstants.CONSUME_MASTER_PROXY_PORT, true); + TEST_UTIL2.startMiniCluster(); + Optional activeMaster2 = + TEST_UTIL2.getMiniHBaseCluster().getRegionServer(0).getActiveMaster(); + Assert.assertNotEquals(activeMaster1.get().getPort(), activeMaster2.get().getPort()); + TEST_UTIL1.shutdownMiniCluster(); + TEST_UTIL2.shutdownMiniCluster(); + } + + @Test + public void testProxyPortPublishAndConsume() throws Exception { + TEST_UTIL1.startMiniCluster(); + Optional activeMaster1 = + TEST_UTIL1.getMiniHBaseCluster().getRegionServer(0).getActiveMaster(); + Configuration conf = TEST_UTIL2.getConfiguration(); + conf.setInt(HMaster.MASTER_PROXY_PORT_EXPOSE, activeMaster1.get().getPort()); + conf.setBoolean(HConstants.CONSUME_MASTER_PROXY_PORT, true); + ExecutorService executorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("testProxyPortPublishAndConsume-%d").setDaemon(true) + .build()); + executorService.submit(() -> TEST_UTIL2.startMiniCluster()); + for (int i = 0; i < 25; i++) { + Thread.sleep(1000); + // Cluster2 is not going to come up because Cluster2 RS is trying to connect to + // Cluster1 master port. + Assert.assertNull(TEST_UTIL2.getMiniHBaseCluster()); + } + TEST_UTIL2.shutdownMiniCluster(); + executorService.shutdown(); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBootstrapNodeManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBootstrapNodeManager.java index 359de3a3e627..e49873c7fcd2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBootstrapNodeManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBootstrapNodeManager.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; @@ -99,13 +100,13 @@ public void testNormal() throws Exception { ServerName.valueOf("server2", 12345, EnvironmentEdgeManager.currentTime()), ServerName.valueOf("server3", 12345, EnvironmentEdgeManager.currentTime()), ServerName.valueOf("server4", 12345, EnvironmentEdgeManager.currentTime())); - when(conn.getLiveRegionServers(any(), anyInt())) + when(conn.getLiveRegionServers(any(), anyInt(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(regionServers)); when(conn.getAllBootstrapNodes(any())) .thenReturn(CompletableFuture.completedFuture(regionServers)); - manager = new BootstrapNodeManager(conn, tracker); + manager = new BootstrapNodeManager(conn, tracker, false); Thread.sleep(3000); - verify(conn, times(1)).getLiveRegionServers(any(), anyInt()); + verify(conn, times(1)).getLiveRegionServers(any(), anyInt(), anyBoolean()); verify(conn, atLeastOnce()).getAllBootstrapNodes(any()); assertListEquals(regionServers, manager.getBootstrapNodes()); } @@ -115,13 +116,13 @@ public void testNormal() throws Exception { public void testOnlyMaster() throws Exception { List regionServers = Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime())); - when(conn.getLiveRegionServers(any(), anyInt())) + when(conn.getLiveRegionServers(any(), anyInt(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(regionServers)); when(conn.getAllBootstrapNodes(any())) .thenReturn(CompletableFuture.completedFuture(regionServers)); - manager = new BootstrapNodeManager(conn, tracker); + manager = new BootstrapNodeManager(conn, tracker, false); Thread.sleep(3000); - verify(conn, atLeast(2)).getLiveRegionServers(any(), anyInt()); + verify(conn, atLeast(2)).getLiveRegionServers(any(), anyInt(), anyBoolean()); verify(conn, never()).getAllBootstrapNodes(any()); assertListEquals(regionServers, manager.getBootstrapNodes()); } @@ -136,7 +137,7 @@ public void testRegionServerError() throws Exception { List newRegionServers = Arrays.asList(ServerName.valueOf("server5", 12345, EnvironmentEdgeManager.currentTime()), ServerName.valueOf("server6", 12345, EnvironmentEdgeManager.currentTime())); - when(conn.getLiveRegionServers(any(), anyInt())) + when(conn.getLiveRegionServers(any(), anyInt(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(regionServers)); when(conn.getAllBootstrapNodes(any())).thenAnswer(invocation -> { if (invocation.getArgument(0, ServerName.class).getHostname().equals("server4")) { @@ -145,11 +146,11 @@ public void testRegionServerError() throws Exception { return CompletableFuture.completedFuture(regionServers.subList(0, 3)); } }); - manager = new BootstrapNodeManager(conn, tracker); + manager = new BootstrapNodeManager(conn, tracker, false); // we should remove server4 from the list Waiter.waitFor(conf, 30000, () -> manager.getBootstrapNodes().size() == 3); assertListEquals(regionServers.subList(0, 3), manager.getBootstrapNodes()); - when(conn.getLiveRegionServers(any(), anyInt())) + when(conn.getLiveRegionServers(any(), anyInt(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(newRegionServers)); doAnswer(invocation -> { String hostname = invocation.getArgument(0, ServerName.class).getHostname(); diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java index 840ee2d215f6..0273eda350ce 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java @@ -156,6 +156,45 @@ public ServerName getMasterAddress(final boolean refresh) { } } + /** + * If master has exposed proxy port, return it with ServerName object. Else, return + * ServerName object with master's original binding port. + * + * @param refresh whether to refresh the data by calling ZK directly. + * @return Server name or null if timed out. + */ + public ServerName getMasterAddressWithProxyPortIfAvailable(final boolean refresh) { + try { + ServerName serverName = ProtobufUtil.parseServerNameFrom(super.getData(refresh)); + if (serverName == null) { + return null; + } + int masterProxyPort = getMasterProxyPortToConnect(); + if (masterProxyPort >= 0) { + return ServerName.valueOf(serverName.getHostname(), masterProxyPort, + serverName.getStartcode()); + } else { + return serverName; + } + } catch (DeserializationException e) { + LOG.warn("Failed parse master znode data", e); + return null; + } + } + + private int getMasterProxyPortToConnect() { + try { + final ZooKeeperProtos.Master master = parse(this.getData(false)); + if (master == null || !master.hasProxyPort()) { + return -1; + } + return master.getProxyPort(); + } catch (DeserializationException e) { + LOG.warn("Failed parse master zk node data", e); + return -1; + } + } + /** * Get master address. Use this instead of {@link #getMasterAddress()} if you do not have an * instance of this tracker in your context. @@ -251,16 +290,20 @@ public static int getBackupMasterInfoPort(ZKWatcher zkw, final ServerName sn) /** * Set master address into the master znode or into the backup subdirectory of backup * masters; switch off the passed in znode path. + * * @param zkw The ZKWatcher to use. * @param znode Where to create the znode; could be at the top level or it could be under backup * masters * @param master ServerName of the current master must not be null. + * @param infoPort Server info port. + * @param proxyPort Optional proxy port exposed by Server for clients to make connection to + * for security reasons if required. Value < 0 would be ignored. * @return true if node created, false if not; a watch is set in both cases * @throws KeeperException if a ZooKeeper operation fails */ public static boolean setMasterAddress(final ZKWatcher zkw, final String znode, - final ServerName master, int infoPort) throws KeeperException { - return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, toByteArray(master, infoPort)); + final ServerName master, int infoPort, int proxyPort) throws KeeperException { + return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, toByteArray(master, infoPort, proxyPort)); } /** @@ -272,10 +315,14 @@ public boolean hasMaster() { } /** + * Serialize Server info for znode. + * * @param sn must not be null + * @param infoPort Server info port. + * @param proxyPort Server proxy port. Valid value should be positive. * @return Content of the master znode as a serialized pb with the pb magic as prefix. */ - static byte[] toByteArray(final ServerName sn, int infoPort) { + static byte[] toByteArray(final ServerName sn, int infoPort, int proxyPort) { ZooKeeperProtos.Master.Builder mbuilder = ZooKeeperProtos.Master.newBuilder(); HBaseProtos.ServerName.Builder snbuilder = HBaseProtos.ServerName.newBuilder(); snbuilder.setHostName(sn.getHostname()); @@ -284,6 +331,9 @@ static byte[] toByteArray(final ServerName sn, int infoPort) { mbuilder.setMaster(snbuilder.build()); mbuilder.setRpcVersion(HConstants.RPC_CURRENT_VERSION); mbuilder.setInfoPort(infoPort); + if (proxyPort >= 0) { + mbuilder.setProxyPort(proxyPort); + } return ProtobufUtil.prependPBMagic(mbuilder.build().toByteArray()); } diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestMasterAddressTracker.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestMasterAddressTracker.java index e743fa76a959..fe6c11dbf5e9 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestMasterAddressTracker.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestMasterAddressTracker.java @@ -114,7 +114,7 @@ private MasterAddressTracker setupMasterTracker(final ServerName sn, final int i if (sn != null) { LOG.info("Creating master node"); MasterAddressTracker.setMasterAddress(zk, zk.getZNodePaths().masterAddressZNode, sn, - infoPort); + infoPort, -1); // Wait for the node to be created LOG.info("Waiting for master address manager to be notified"); @@ -192,8 +192,8 @@ public void testBackupMasters() throws Exception { String backupZNode2 = ZNodePaths.joinZNode(zk.getZNodePaths().backupMasterAddressesZNode, backupMaster2.toString()); // Add backup masters - MasterAddressTracker.setMasterAddress(zk, backupZNode1, backupMaster1, 2222); - MasterAddressTracker.setMasterAddress(zk, backupZNode2, backupMaster2, 3333); + MasterAddressTracker.setMasterAddress(zk, backupZNode1, backupMaster1, 2222, -1); + MasterAddressTracker.setMasterAddress(zk, backupZNode2, backupMaster2, 3333, -1); TEST_UTIL.waitFor(30000, () -> addressTracker.getBackupMasters().size() == 2); backupMasters = addressTracker.getBackupMasters(); assertEquals(2, backupMasters.size()); diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKNodeTracker.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKNodeTracker.java index ac1c0bac68ca..5ca65463e455 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKNodeTracker.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKNodeTracker.java @@ -321,12 +321,12 @@ public void testCleanZNode() throws Exception { assertNotNull(ZKUtil.getData(zkw, nodeName)); // Check that we don't delete if we're not supposed to - ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0)); + ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0, -1)); MasterAddressTracker.deleteIfEquals(zkw, ServerName.valueOf("127.0.0.2:52", 45L).toString()); assertNotNull(ZKUtil.getData(zkw, nodeName)); // Check that we delete when we're supposed to - ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0)); + ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0, -1)); MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); assertNull(ZKUtil.getData(zkw, nodeName));