diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 564f43324ccd..f0ea993a41ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -19,28 +19,25 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZKListener; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.ReservoirSample; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.AuthFailedException; -import org.apache.zookeeper.KeeperException.ConnectionLossException; -import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,12 +53,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); - private ZKWatcher zkw = null; - private final Object zkwLock = new Object(); - protected Configuration conf; - private AsyncClusterConnection conn; + private final Object connLock = new Object(); + + private volatile AsyncClusterConnection conn; /** * Default maximum number of times a replication sink can be reported as bad before it will no @@ -106,36 +102,15 @@ public void init(Context context) throws IOException { this.badReportCounts = Maps.newHashMap(); } - protected void disconnect() { - synchronized (zkwLock) { - if (zkw != null) { - zkw.close(); - } - } - if (this.conn != null) { - try { - this.conn.close(); - this.conn = null; - } catch (IOException e) { - LOG.warn("{} Failed to close the connection", ctx.getPeerId()); - } - } - } - - /** - * A private method used to re-establish a zookeeper session with a peer cluster. - */ - private void reconnect(KeeperException ke) { - if ( - ke instanceof ConnectionLossException || ke instanceof SessionExpiredException - || ke instanceof AuthFailedException - ) { - String clusterKey = ctx.getPeerConfig().getClusterKey(); - LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, ke); - try { - reloadZkWatcher(); - } catch (IOException io) { - LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, io); + private void disconnect() { + synchronized (connLock) { + if (this.conn != null) { + try { + this.conn.close(); + this.conn = null; + } catch (IOException e) { + LOG.warn("{} Failed to close the connection", ctx.getPeerId()); + } } } } @@ -152,13 +127,7 @@ public void stop() { @Override protected void doStart() { - try { - reloadZkWatcher(); - connectPeerCluster(); - notifyStarted(); - } catch (IOException e) { - notifyFailed(e); - } + notifyStarted(); } @Override @@ -168,44 +137,40 @@ protected void doStop() { } @Override - // Synchronize peer cluster connection attempts to avoid races and rate - // limit connections when multiple replication sources try to connect to - // the peer cluster. If the peer cluster is down we can get out of control - // over time. public UUID getPeerUUID() { - UUID peerUUID = null; try { - synchronized (zkwLock) { - peerUUID = ZKClusterId.getUUIDForCluster(zkw); - } - } catch (KeeperException ke) { - reconnect(ke); + AsyncClusterConnection conn = connect(); + String clusterId = FutureUtils + .get(conn.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID))) + .getClusterId(); + return UUID.fromString(clusterId); + } catch (IOException e) { + LOG.warn("Failed to get cluster id for cluster", e); + return null; } - return peerUUID; } - /** - * Closes the current ZKW (if not null) and creates a new one - * @throws IOException If anything goes wrong connecting - */ - private void reloadZkWatcher() throws IOException { - synchronized (zkwLock) { - if (zkw != null) { - zkw.close(); - } - zkw = - new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this); - zkw.registerListener(new PeerRegionServerListener(this)); + // do not call this method in doStart method, only initialize the connection to remote cluster + // when you actually wants to make use of it. The problem here is that, starting the replication + // endpoint is part of the region server initialization work, so if the peer cluster is fully + // down and we can not connect to it, we will cause the initialization to fail and crash the + // region server, as we need the cluster id while setting up the AsyncClusterConnection, which + // needs to at least connect to zookeeper or some other servers in the peer cluster based on + // different connection registry implementation + private AsyncClusterConnection connect() throws IOException { + AsyncClusterConnection c = this.conn; + if (c != null) { + return c; } - } - - private void connectPeerCluster() throws IOException { - try { - conn = createConnection(this.conf); - } catch (IOException ioe) { - LOG.warn("{} Failed to create connection for peer cluster", ctx.getPeerId(), ioe); - throw ioe; + synchronized (connLock) { + c = this.conn; + if (c != null) { + return c; + } + c = createConnection(this.conf); + conn = c; } + return c; } @Override @@ -224,36 +189,27 @@ public boolean isAborted() { * Get the list of all the region servers from the specified peer * @return list of region server addresses or an empty list if the slave is unavailable */ - protected List fetchSlavesAddresses() { - List children = null; + // will be overrided in tests so protected + protected Collection fetchPeerAddresses() { try { - synchronized (zkwLock) { - children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode); - } - } catch (KeeperException ke) { - if (LOG.isDebugEnabled()) { - LOG.debug("Fetch slaves addresses failed", ke); - } - reconnect(ke); - } - if (children == null) { + return FutureUtils.get(connect().getAdmin().getRegionServers(true)); + } catch (IOException e) { + LOG.debug("Fetch peer addresses failed", e); return Collections.emptyList(); } - List addresses = new ArrayList<>(children.size()); - for (String child : children) { - addresses.add(ServerName.parseServerName(child)); - } - return addresses; } protected synchronized void chooseSinks() { - List slaveAddresses = fetchSlavesAddresses(); + Collection slaveAddresses = fetchPeerAddresses(); if (slaveAddresses.isEmpty()) { LOG.warn("No sinks available at peer. Will not be able to replicate"); + this.sinkServers = Collections.emptyList(); + } else { + int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); + ReservoirSample sample = new ReservoirSample<>(numSinks); + sample.add(slaveAddresses.iterator()); + this.sinkServers = sample.getSamplingResult(); } - Collections.shuffle(slaveAddresses, ThreadLocalRandom.current()); - int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); - this.sinkServers = slaveAddresses.subList(0, numSinks); badReportCounts.clear(); } @@ -275,7 +231,7 @@ protected synchronized SinkPeer getReplicationSink() throws IOException { } ServerName serverName = sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size())); - return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName)); + return new SinkPeer(serverName, connect().getRegionServerAdmin(serverName)); } /** @@ -307,29 +263,6 @@ List getSinkServers() { return sinkServers; } - /** - * Tracks changes to the list of region servers in a peer's cluster. - */ - public static class PeerRegionServerListener extends ZKListener { - - private final HBaseReplicationEndpoint replicationEndpoint; - private final String regionServerListNode; - - public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) { - super(endpoint.zkw); - this.replicationEndpoint = endpoint; - this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode; - } - - @Override - public synchronized void nodeChildrenChanged(String path) { - if (path.equals(regionServerListNode)) { - LOG.info("Detected change to peer region servers, fetching updated list"); - replicationEndpoint.chooseSinks(); - } - } - } - /** * Wraps a replication region server sink to provide the ability to identify it. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index d895920a51a8..6bdc97732644 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -106,7 +106,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private boolean isSerial = false; // Initialising as 0 to guarantee at least one logging message private long lastSinkFetchTime = 0; - private volatile boolean stopping = false; @Override public void init(Context context) throws IOException { @@ -449,7 +448,7 @@ public boolean replicate(ReplicateContext replicateContext) { } List> batches = createBatches(replicateContext.getEntries()); - while (this.isRunning() && !this.stopping) { + while (this.isRunning()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; @@ -514,14 +513,6 @@ protected boolean isPeerEnabled() { return ctx.getReplicationPeer().isPeerEnabled(); } - @Override - protected void doStop() { - // Allow currently running replication tasks to finish - this.stopping = true; - disconnect(); // don't call super.doStop() - notifyStopped(); - } - protected CompletableFuture replicateEntries(List entries, int batchIndex, int timeout) { int entriesHashCode = System.identityHashCode(entries); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java index 7e1df9d415aa..95adc8a365cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock; import java.io.IOException; +import java.util.Collection; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -166,6 +167,9 @@ public void testReportBadSinkDownToZeroSinks() { ServerName serverNameA = endpoint.getSinkServers().get(0); ServerName serverNameB = endpoint.getSinkServers().get(1); + serverNames.remove(serverNameA); + serverNames.remove(serverNameB); + SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class)); @@ -191,7 +195,7 @@ public void setRegionServers(List regionServers) { } @Override - public List fetchSlavesAddresses() { + protected Collection fetchPeerAddresses() { return regionServers; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 27477527277f..1429c3277371 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -25,12 +25,14 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -369,6 +371,14 @@ protected static void runSmallBatchTest() throws IOException, InterruptedExcepti waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); } + protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException { + List rses = util.getMiniHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList()); + for (ServerName rs : rses) { + util.getMiniHBaseCluster().stopRegionServer(rs); + } + } + @AfterClass public static void tearDownAfterClass() throws Exception { if (htable2 != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java index 161e3c848f78..de19d0f5f4a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java @@ -44,7 +44,11 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli @Test public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception { - UTIL2.shutdownMiniHBaseCluster(); + // stop all region servers, we need to keep the master up as the below assertions need to get + // cluster id from remote cluster, if master is also down, we can not get any information from + // the remote cluster after source cluster restarts + stopAllRegionServers(UTIL2); + // add some values to cluster 1 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { Put p = new Put(Bytes.toBytes("row" + i)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java index 92688cb2575a..c9ef613a21f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java @@ -45,7 +45,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe @Test public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception { - UTIL2.shutdownMiniHBaseCluster(); + // stop all region servers, we need to keep the master up as the below assertions need to get + // cluster id from remote cluster, if master is also down, we can not get any information from + // the remote cluster after source cluster restarts + stopAllRegionServers(UTIL2); restartSourceCluster(1); Admin hbaseAdmin = UTIL1.getAdmin(); // add some values to source cluster diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java index 018bfb98c6e5..b3e52e858a7e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java @@ -42,7 +42,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedNoOps extends TestRe @Test public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception { - UTIL2.shutdownMiniHBaseCluster(); + // stop all region servers, we need to keep the master up as the below assertions need to get + // cluster id from remote cluster, if master is also down, we can not get any information from + // the remote cluster after source cluster restarts + stopAllRegionServers(UTIL2); restartSourceCluster(1); Admin hbaseAdmin = UTIL1.getAdmin(); ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java index 3b097cff970f..269fa1b38c70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java @@ -46,7 +46,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedWithRecovery @Test public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception { - UTIL2.shutdownMiniHBaseCluster(); + // stop all region servers, we need to keep the master up as the below assertions need to get + // cluster id from remote cluster, if master is also down, we can not get any information from + // the remote cluster after source cluster restarts + stopAllRegionServers(UTIL2); // add some values to cluster 1 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { Put p = new Put(Bytes.toBytes("row" + i));