Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,25 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReservoirSample;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.AuthFailedException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,12 +53,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint

private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);

private ZKWatcher zkw = null;
private final Object zkwLock = new Object();

protected Configuration conf;

private AsyncClusterConnection conn;
private final Object connLock = new Object();

private volatile AsyncClusterConnection conn;

/**
* Default maximum number of times a replication sink can be reported as bad before it will no
Expand Down Expand Up @@ -106,36 +102,15 @@ public void init(Context context) throws IOException {
this.badReportCounts = Maps.newHashMap();
}

protected void disconnect() {
synchronized (zkwLock) {
if (zkw != null) {
zkw.close();
}
}
if (this.conn != null) {
try {
this.conn.close();
this.conn = null;
} catch (IOException e) {
LOG.warn("{} Failed to close the connection", ctx.getPeerId());
}
}
}

/**
* A private method used to re-establish a zookeeper session with a peer cluster.
*/
private void reconnect(KeeperException ke) {
if (
ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
|| ke instanceof AuthFailedException
) {
String clusterKey = ctx.getPeerConfig().getClusterKey();
LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, ke);
try {
reloadZkWatcher();
} catch (IOException io) {
LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, io);
private void disconnect() {
synchronized (connLock) {
if (this.conn != null) {
try {
this.conn.close();
this.conn = null;
} catch (IOException e) {
LOG.warn("{} Failed to close the connection", ctx.getPeerId());
}
}
}
}
Expand All @@ -152,13 +127,7 @@ public void stop() {

@Override
protected void doStart() {
try {
reloadZkWatcher();
connectPeerCluster();
notifyStarted();
} catch (IOException e) {
notifyFailed(e);
}
notifyStarted();
}

@Override
Expand All @@ -168,44 +137,40 @@ protected void doStop() {
}

@Override
// Synchronize peer cluster connection attempts to avoid races and rate
// limit connections when multiple replication sources try to connect to
// the peer cluster. If the peer cluster is down we can get out of control
// over time.
public UUID getPeerUUID() {
UUID peerUUID = null;
try {
synchronized (zkwLock) {
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
}
} catch (KeeperException ke) {
reconnect(ke);
AsyncClusterConnection conn = connect();
String clusterId = FutureUtils
.get(conn.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)))
.getClusterId();
return UUID.fromString(clusterId);
} catch (IOException e) {
LOG.warn("Failed to get cluster id for cluster", e);
return null;
}
return peerUUID;
}

/**
* Closes the current ZKW (if not null) and creates a new one
* @throws IOException If anything goes wrong connecting
*/
private void reloadZkWatcher() throws IOException {
synchronized (zkwLock) {
if (zkw != null) {
zkw.close();
}
zkw =
new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this);
zkw.registerListener(new PeerRegionServerListener(this));
// do not call this method in doStart method, only initialize the connection to remote cluster
// when you actually wants to make use of it. The problem here is that, starting the replication
// endpoint is part of the region server initialization work, so if the peer cluster is fully
// down and we can not connect to it, we will cause the initialization to fail and crash the
// region server, as we need the cluster id while setting up the AsyncClusterConnection, which
// needs to at least connect to zookeeper or some other servers in the peer cluster based on
// different connection registry implementation
private AsyncClusterConnection connect() throws IOException {
AsyncClusterConnection c = this.conn;
if (c != null) {
return c;
}
}

private void connectPeerCluster() throws IOException {
try {
conn = createConnection(this.conf);
} catch (IOException ioe) {
LOG.warn("{} Failed to create connection for peer cluster", ctx.getPeerId(), ioe);
throw ioe;
synchronized (connLock) {
c = this.conn;
if (c != null) {
return c;
}
c = createConnection(this.conf);
conn = c;
}
return c;
}

@Override
Expand All @@ -224,36 +189,27 @@ public boolean isAborted() {
* Get the list of all the region servers from the specified peer
* @return list of region server addresses or an empty list if the slave is unavailable
*/
protected List<ServerName> fetchSlavesAddresses() {
List<String> children = null;
// will be overrided in tests so protected
protected Collection<ServerName> fetchPeerAddresses() {
try {
synchronized (zkwLock) {
children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
}
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch slaves addresses failed", ke);
}
reconnect(ke);
}
if (children == null) {
return FutureUtils.get(connect().getAdmin().getRegionServers(true));
} catch (IOException e) {
LOG.debug("Fetch peer addresses failed", e);
return Collections.emptyList();
}
List<ServerName> addresses = new ArrayList<>(children.size());
for (String child : children) {
addresses.add(ServerName.parseServerName(child));
}
return addresses;
}

protected synchronized void chooseSinks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have removed PeerRegionServerListener and I don't see anything replacing it. How will the replication endpoint notice if region servers are changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lazy refresh logic in our code. Once there are failures connecting to a region server in the remote peer cluster, we will increase the failure count of this region server. There is a Map in this class for tracking the failure counts. If there are too many failure region servers, we will refresh the region server list of the remote peer cluster.

List<ServerName> slaveAddresses = fetchSlavesAddresses();
Collection<ServerName> slaveAddresses = fetchPeerAddresses();
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
this.sinkServers = Collections.emptyList();
} else {
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
ReservoirSample<ServerName> sample = new ReservoirSample<>(numSinks);
sample.add(slaveAddresses.iterator());
this.sinkServers = sample.getSamplingResult();
}
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
this.sinkServers = slaveAddresses.subList(0, numSinks);
badReportCounts.clear();
}

Expand All @@ -275,7 +231,7 @@ protected synchronized SinkPeer getReplicationSink() throws IOException {
}
ServerName serverName =
sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
return new SinkPeer(serverName, connect().getRegionServerAdmin(serverName));
}

/**
Expand Down Expand Up @@ -307,29 +263,6 @@ List<ServerName> getSinkServers() {
return sinkServers;
}

/**
* Tracks changes to the list of region servers in a peer's cluster.
*/
public static class PeerRegionServerListener extends ZKListener {

private final HBaseReplicationEndpoint replicationEndpoint;
private final String regionServerListNode;

public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) {
super(endpoint.zkw);
this.replicationEndpoint = endpoint;
this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode;
}

@Override
public synchronized void nodeChildrenChanged(String path) {
if (path.equals(regionServerListNode)) {
LOG.info("Detected change to peer region servers, fetching updated list");
replicationEndpoint.chooseSinks();
}
}
}

/**
* Wraps a replication region server sink to provide the ability to identify it.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private boolean isSerial = false;
// Initialising as 0 to guarantee at least one logging message
private long lastSinkFetchTime = 0;
private volatile boolean stopping = false;

@Override
public void init(Context context) throws IOException {
Expand Down Expand Up @@ -449,7 +448,7 @@ public boolean replicate(ReplicateContext replicateContext) {
}

List<List<Entry>> batches = createBatches(replicateContext.getEntries());
while (this.isRunning() && !this.stopping) {
while (this.isRunning()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
Expand Down Expand Up @@ -514,14 +513,6 @@ protected boolean isPeerEnabled() {
return ctx.getReplicationPeer().isPeerEnabled();
}

@Override
protected void doStop() {
// Allow currently running replication tasks to finish
this.stopping = true;
disconnect(); // don't call super.doStop()
notifyStopped();
}

protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
int timeout) {
int entriesHashCode = System.identityHashCode(entries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -166,6 +167,9 @@ public void testReportBadSinkDownToZeroSinks() {
ServerName serverNameA = endpoint.getSinkServers().get(0);
ServerName serverNameB = endpoint.getSinkServers().get(1);

serverNames.remove(serverNameA);
serverNames.remove(serverNameB);

SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));

Expand All @@ -191,7 +195,7 @@ public void setRegionServers(List<ServerName> regionServers) {
}

@Override
public List<ServerName> fetchSlavesAddresses() {
protected Collection<ServerName> fetchPeerAddresses() {
return regionServers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand Down Expand Up @@ -369,6 +371,14 @@ protected static void runSmallBatchTest() throws IOException, InterruptedExcepti
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}

protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException {
List<ServerName> rses = util.getMiniHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList());
for (ServerName rs : rses) {
util.getMiniHBaseCluster().stopRegionServer(rs);
}
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
if (htable2 != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli

@Test
public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
UTIL2.shutdownMiniHBaseCluster();
// stop all region servers, we need to keep the master up as the below assertions need to get
// cluster id from remote cluster, if master is also down, we can not get any information from
// the remote cluster after source cluster restarts
stopAllRegionServers(UTIL2);

// add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe

@Test
public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
UTIL2.shutdownMiniHBaseCluster();
// stop all region servers, we need to keep the master up as the below assertions need to get
// cluster id from remote cluster, if master is also down, we can not get any information from
// the remote cluster after source cluster restarts
stopAllRegionServers(UTIL2);
restartSourceCluster(1);
Admin hbaseAdmin = UTIL1.getAdmin();
// add some values to source cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedNoOps extends TestRe

@Test
public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
UTIL2.shutdownMiniHBaseCluster();
// stop all region servers, we need to keep the master up as the below assertions need to get
// cluster id from remote cluster, if master is also down, we can not get any information from
// the remote cluster after source cluster restarts
stopAllRegionServers(UTIL2);
restartSourceCluster(1);
Admin hbaseAdmin = UTIL1.getAdmin();
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
Expand Down
Loading