Skip to content

Commit 70ab0dc

Browse files
authored
HBASE-24743 Reject to add a peer which replicate to itself earlier (apache#2124)
Signed-off-by: Wellington Chevreuil <[email protected]> Signed-off-by: Bharath Vissapragada <[email protected]>
1 parent ce4e692 commit 70ab0dc

File tree

11 files changed

+115
-101
lines changed

11 files changed

+115
-101
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,7 @@ protected void initializeZKBasedSystemTrackers()
801801
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
802802
this.splitOrMergeTracker.start();
803803

804-
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
804+
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId);
805805

806806
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
807807
this.drainingServerTracker.start();

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.commons.lang3.StringUtils;
3232
import org.apache.hadoop.conf.Configuration;
3333
import org.apache.hadoop.hbase.DoNotRetryIOException;
34+
import org.apache.hadoop.hbase.HBaseConfiguration;
3435
import org.apache.hadoop.hbase.ServerName;
3536
import org.apache.hadoop.hbase.TableName;
3637
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
@@ -45,9 +46,11 @@
4546
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
4647
import org.apache.hadoop.hbase.replication.ReplicationUtils;
4748
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
49+
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
4850
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
4951
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
5052
import org.apache.yetus.audience.InterfaceAudience;
53+
import org.apache.zookeeper.KeeperException;
5154

5255
/**
5356
* Manages and performs all replication admin operations.
@@ -63,11 +66,17 @@ public class ReplicationPeerManager {
6366

6467
private final ConcurrentMap<String, ReplicationPeerDescription> peers;
6568

69+
private final String clusterId;
70+
71+
private final Configuration conf;
72+
6673
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
67-
ConcurrentMap<String, ReplicationPeerDescription> peers) {
74+
ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
6875
this.peerStorage = peerStorage;
6976
this.queueStorage = queueStorage;
7077
this.peers = peers;
78+
this.conf = conf;
79+
this.clusterId = clusterId;
7180
}
7281

7382
private void checkQueuesDeleted(String peerId)
@@ -245,26 +254,26 @@ void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
245254

246255
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
247256
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
248-
boolean checkClusterKey = true;
257+
ReplicationEndpoint endpoint = null;
249258
if (!StringUtils.isBlank(replicationEndpointImpl)) {
250-
// try creating a instance
251-
ReplicationEndpoint endpoint;
252259
try {
260+
// try creating a instance
253261
endpoint = Class.forName(replicationEndpointImpl)
254262
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
255263
} catch (Throwable e) {
256264
throw new DoNotRetryIOException(
257265
"Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
258266
e);
259267
}
260-
// do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
261-
if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
262-
checkClusterKey = false;
263-
}
264268
}
265-
if (checkClusterKey) {
269+
// Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key
270+
if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) {
266271
checkClusterKey(peerConfig.getClusterKey());
267272
}
273+
// Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster
274+
if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
275+
checkClusterId(peerConfig.getClusterKey());
276+
}
268277

269278
if (peerConfig.replicateAllUserTables()) {
270279
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
@@ -357,6 +366,25 @@ private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
357366
}
358367
}
359368

369+
private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
370+
String peerClusterId = "";
371+
try {
372+
// Create the peer cluster config for get peer cluster id
373+
Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
374+
try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
375+
peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
376+
}
377+
} catch (IOException | KeeperException e) {
378+
throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e);
379+
}
380+
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
381+
// peerClusterId value, which is the same as the source clusterId
382+
if (clusterId.equals(peerClusterId)) {
383+
throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
384+
+ ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
385+
}
386+
}
387+
360388
public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
361389
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
362390
.filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
@@ -367,7 +395,7 @@ public ReplicationQueueStorage getQueueStorage() {
367395
return queueStorage;
368396
}
369397

370-
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
398+
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId)
371399
throws ReplicationException {
372400
ReplicationPeerStorage peerStorage =
373401
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
@@ -378,7 +406,7 @@ public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
378406
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
379407
}
380408
return new ReplicationPeerManager(peerStorage,
381-
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
409+
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
382410
}
383411

384412
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -524,16 +524,6 @@ private void initialize() {
524524
if (!this.isSourceActive()) {
525525
return;
526526
}
527-
528-
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
529-
// peerClusterId value, which is the same as the source clusterId
530-
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
531-
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
532-
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
533-
+ replicationEndpoint.getClass().getName(), null, false);
534-
this.manager.removeSource(this);
535-
return;
536-
}
537527
LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
538528
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
539529

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
7171
HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
7272

7373
private final String ID_ONE = "1";
74-
private final String KEY_ONE = "127.0.0.1:2181:/hbase";
74+
private static String KEY_ONE;
7575
private final String ID_TWO = "2";
76-
private final String KEY_TWO = "127.0.0.1:2181:/hbase2";
76+
private static String KEY_TWO;
7777

7878
@BeforeClass
7979
public static void setUpBeforeClass() throws Exception {
@@ -82,6 +82,8 @@ public static void setUpBeforeClass() throws Exception {
8282
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
8383
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
8484
TEST_UTIL.startMiniCluster();
85+
KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
86+
KEY_TWO = TEST_UTIL.getClusterKey() + "-test2";
8587
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
8688
}
8789

hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ public class TestReplicationAdmin {
7878
new HBaseTestingUtility();
7979

8080
private final String ID_ONE = "1";
81-
private final String KEY_ONE = "127.0.0.1:2181:/hbase";
81+
private static String KEY_ONE;
8282
private final String ID_SECOND = "2";
83-
private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
83+
private static String KEY_SECOND;
8484

8585
private static ReplicationAdmin admin;
8686
private static Admin hbaseAdmin;
@@ -97,6 +97,8 @@ public static void setUpBeforeClass() throws Exception {
9797
TEST_UTIL.startMiniCluster();
9898
admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
9999
hbaseAdmin = TEST_UTIL.getAdmin();
100+
KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
101+
KEY_SECOND = TEST_UTIL.getClusterKey() + "-test2";
100102
}
101103

102104
@AfterClass

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@ protected void doStart() {
113113
protected void doStop() {
114114
notifyStopped();
115115
}
116+
117+
@Override
118+
public boolean canReplicateToSameCluster() {
119+
return true;
120+
}
116121
}
117122

118123
@BeforeClass

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.io.Closeable;
2626
import java.io.IOException;
2727
import java.util.Arrays;
28-
import java.util.EnumSet;
2928
import java.util.List;
3029
import java.util.Optional;
3130
import java.util.Random;
@@ -34,17 +33,14 @@
3433
import org.apache.hadoop.fs.FileSystem;
3534
import org.apache.hadoop.fs.Path;
3635
import org.apache.hadoop.hbase.Cell;
37-
import org.apache.hadoop.hbase.ClusterMetrics;
36+
import org.apache.hadoop.hbase.DoNotRetryIOException;
3837
import org.apache.hadoop.hbase.HBaseClassTestRule;
3938
import org.apache.hadoop.hbase.HBaseConfiguration;
4039
import org.apache.hadoop.hbase.HBaseTestingUtility;
4140
import org.apache.hadoop.hbase.HConstants;
4241
import org.apache.hadoop.hbase.KeyValue;
4342
import org.apache.hadoop.hbase.MiniHBaseCluster;
44-
import org.apache.hadoop.hbase.ServerMetrics;
45-
import org.apache.hadoop.hbase.ServerName;
4643
import org.apache.hadoop.hbase.TableName;
47-
import org.apache.hadoop.hbase.Waiter;
4844
import org.apache.hadoop.hbase.client.Admin;
4945
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
5046
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -72,9 +68,7 @@
7268
import org.apache.hadoop.hbase.util.HFileTestUtil;
7369
import org.apache.hadoop.hbase.wal.WALEdit;
7470
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
75-
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
7671
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
77-
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
7872
import org.junit.After;
7973
import org.junit.Before;
8074
import org.junit.ClassRule;
@@ -176,40 +170,16 @@ public void testCyclicReplication1() throws Exception {
176170

177171
/**
178172
* Tests the replication scenario 0 -> 0. By default
179-
* {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
180-
* ReplicationSource should terminate, and no further logs should get enqueued
173+
* {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint},
174+
* the replication peer should not be added.
181175
*/
182-
@Test
183-
public void testLoopedReplication() throws Exception {
176+
@Test(expected = DoNotRetryIOException.class)
177+
public void testLoopedReplication()
178+
throws Exception {
184179
LOG.info("testLoopedReplication");
185180
startMiniClusters(1);
186181
createTableOnClusters(table);
187182
addPeer("1", 0, 0);
188-
Thread.sleep(SLEEP_TIME);
189-
190-
// wait for source to terminate
191-
final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
192-
Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
193-
@Override
194-
public boolean evaluate() throws Exception {
195-
ClusterMetrics clusterStatus = utilities[0].getAdmin()
196-
.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS));
197-
ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(rsName);
198-
List<ReplicationLoadSource> replicationLoadSourceList =
199-
serverLoad.getReplicationLoadSourceList();
200-
return replicationLoadSourceList.isEmpty();
201-
}
202-
});
203-
204-
Table[] htables = getHTablesOnClusters(tableName);
205-
putAndWait(row, famName, htables[0], htables[0]);
206-
rollWALAndWait(utilities[0], table.getTableName(), row);
207-
ZKWatcher zkw = utilities[0].getZooKeeperWatcher();
208-
String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode,
209-
ZNodePaths.joinZNode("replication", "rs"));
210-
List<String> listChildrenNoWatch =
211-
ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, rsName.toString()));
212-
assertEquals(0, listChildrenNoWatch.size());
213183
}
214184

215185
/**

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,11 @@ protected void doStop() {
500500
stoppedCount.incrementAndGet();
501501
notifyStopped();
502502
}
503+
504+
@Override
505+
public boolean canReplicateToSameCluster() {
506+
return true;
507+
}
503508
}
504509

505510
public static class InterClusterReplicationEndpointForTest

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ protected void doStart() {
127127
protected void doStop() {
128128
notifyStopped();
129129
}
130+
131+
@Override
132+
public boolean canReplicateToSameCluster() {
133+
return true;
134+
}
130135
}
131136

132137
@BeforeClass

hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,9 @@ public void testCleanReplicationBarrierWithExistTable() throws Exception {
174174
}
175175

176176
public static void createPeer() throws IOException {
177-
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
178-
.setClusterKey(UTIL.getClusterKey()).setSerial(true).build();
177+
ReplicationPeerConfig rpc =
178+
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
179+
.setSerial(true).build();
179180
UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
180181
UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
181182
}

0 commit comments

Comments
 (0)