implements Maste
private TaskGroup startupTaskGroup;
+ /**
+ * Store whether we allow replication peer modification operations.
+ */
+ private ReplicationPeerModificationStateStore replicationPeerModificationStateStore;
+
/**
* Initializes the HMaster. The steps are as follows:
*
@@ -785,6 +791,8 @@ private void initializeZKBasedSystemTrackers()
this.replicationPeerManager =
ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId);
+ this.replicationPeerModificationStateStore =
+ new ReplicationPeerModificationStateStore(masterRegion);
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start();
@@ -3787,6 +3795,9 @@ public FavoredNodesManager getFavoredNodesManager() {
}
private long executePeerProcedure(AbstractPeerProcedure> procedure) throws IOException {
+ if (!isReplicationPeerModificationEnabled()) {
+ throw new IOException("Replication peer modification disabled");
+ }
long procId = procedureExecutor.submitProcedure(procedure);
procedure.getLatch().await();
return procId;
@@ -3866,6 +3877,16 @@ public long transitReplicationPeerSyncReplicationState(String peerId, SyncReplic
return executePeerProcedure(new TransitPeerSyncReplicationStateProcedure(peerId, state));
}
+ @Override
+ public boolean replicationPeerModificationSwitch(boolean on) throws IOException {
+ return replicationPeerModificationStateStore.set(on);
+ }
+
+ @Override
+ public boolean isReplicationPeerModificationEnabled() {
+ return replicationPeerModificationStateStore.get();
+ }
+
/**
* Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
* regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
@@ -4290,5 +4311,4 @@ private void initializeCoprocessorHost(Configuration conf) {
// initialize master side coprocessors before we start handling requests
this.cpHost = new MasterCoprocessorHost(this, conf);
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 5f629b45c21d..a2d5e8a16ecd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -77,6 +77,7 @@
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.master.replication.AbstractPeerNoLockProcedure;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
@@ -421,12 +422,18 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
@@ -2170,6 +2177,56 @@ public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController co
return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build();
}
+ @Override
+ public ReplicationPeerModificationSwitchResponse replicationPeerModificationSwitch(
+ RpcController controller, ReplicationPeerModificationSwitchRequest request)
+ throws ServiceException {
+ try {
+ server.checkInitialized();
+ boolean prevValue = server.replicationPeerModificationSwitch(request.getOn());
+ return ReplicationPeerModificationSwitchResponse.newBuilder().setPreviousValue(prevValue)
+ .build();
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
+ @Override
+ public GetReplicationPeerModificationProceduresResponse getReplicationPeerModificationProcedures(
+ RpcController controller, GetReplicationPeerModificationProceduresRequest request)
+ throws ServiceException {
+ try {
+ server.checkInitialized();
+ GetReplicationPeerModificationProceduresResponse.Builder builder =
+ GetReplicationPeerModificationProceduresResponse.newBuilder();
+ for (Procedure> proc : server.getProcedures()) {
+ if (proc.isFinished()) {
+ continue;
+ }
+ if (!(proc instanceof AbstractPeerNoLockProcedure)) {
+ continue;
+ }
+ builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
+ }
+ return builder.build();
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
+ @Override
+ public IsReplicationPeerModificationEnabledResponse isReplicationPeerModificationEnabled(
+ RpcController controller, IsReplicationPeerModificationEnabledRequest request)
+ throws ServiceException {
+ try {
+ server.checkInitialized();
+ return IsReplicationPeerModificationEnabledResponse.newBuilder()
+ .setEnabled(server.isReplicationPeerModificationEnabled()).build();
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
@Override
public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
RpcController controller, ListDecommissionedRegionServersRequest request)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index f62d4bbfad60..1958e64767eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -390,6 +390,10 @@ List listReplicationPeers(String regex)
long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState clusterState)
throws ReplicationException, IOException;
+ boolean replicationPeerModificationSwitch(boolean on) throws IOException;
+
+ boolean isReplicationPeerModificationEnabled();
+
/** Returns {@link LockManager} to lock namespaces/tables/regions. */
LockManager getLockManager();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStateStore.java
index b7aaa3e1b7b9..b18850083748 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStateStore.java
@@ -86,6 +86,10 @@ private byte[] migrate(ZKWatcher watcher, String zkPath) throws KeeperException,
}
private void tryMigrate(ZKWatcher watcher, String zkPath) throws IOException, KeeperException {
+ if (zkPath == null) {
+ // this means we do not store this state in zk, skip migrating
+ return;
+ }
Result result = get();
if (result.isEmpty()) {
// migrate
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
index dc2e7de3bffb..cd4cca0f9186 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
@@ -164,4 +164,10 @@ protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, Tab
queueStorage);
}
}
+
+ protected final void checkPeerModificationEnabled(MasterProcedureEnv env) throws IOException {
+ if (!env.getMasterServices().isReplicationPeerModificationEnabled()) {
+ throw new IOException("Replication peer modification disabled");
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 67d70a166bee..3af902e1d8a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -158,6 +158,7 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st
switch (state) {
case PRE_PEER_MODIFICATION:
try {
+ checkPeerModificationEnabled(env);
prePeerModification(env);
} catch (IOException e) {
LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, "
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerModificationStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerModificationStateStore.java
new file mode 100644
index 000000000000..f68d5e41d5ba
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerModificationStateStore.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.master.BooleanStateStore;
+import org.apache.hadoop.hbase.master.region.MasterRegion;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+
+/**
+ * Store the peer modification state.
+ */
+@InterfaceAudience.Private
+public class ReplicationPeerModificationStateStore extends BooleanStateStore {
+
+ public static final String STATE_NAME = "replication_peer_modification_on";
+
+ public ReplicationPeerModificationStateStore(MasterRegion masterRegion)
+ throws DeserializationException, IOException, KeeperException {
+ super(masterRegion, STATE_NAME, null, null);
+ }
+
+ @Override
+ protected byte[] toByteArray(boolean on) {
+ ReplicationProtos.ReplicationPeerModificationState.Builder builder =
+ ReplicationProtos.ReplicationPeerModificationState.newBuilder();
+ builder.setOn(on);
+ return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
+ }
+
+ @Override
+ protected boolean parseFrom(byte[] bytes) throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(bytes);
+ ReplicationProtos.ReplicationPeerModificationState.Builder builder =
+ ReplicationProtos.ReplicationPeerModificationState.newBuilder();
+ try {
+ int magicLen = ProtobufUtil.lengthOfPBMagic();
+ ProtobufUtil.mergeFrom(builder, bytes, magicLen, bytes.length - magicLen);
+ } catch (IOException e) {
+ throw new DeserializationException(e);
+ }
+ return builder.build().getOn();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 2de10cb2778c..ed0760c69924 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -236,6 +236,7 @@ protected Flow executeFromState(MasterProcedureEnv env,
switch (state) {
case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try {
+ checkPeerModificationEnabled(env);
preTransit(env);
} catch (IOException e) {
LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} "
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java
index 76321f26abff..e52d8ee92c3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java
@@ -17,12 +17,20 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.*;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -61,4 +69,24 @@ public void testDecommissionAndStopRegionServers() throws Exception {
assertEquals(-1, ZKUtil.checkExists(zkw,
ZNodePaths.joinZNode(zkw.getZNodePaths().drainingZNode, serverName.getServerName())));
}
+
+ @Test
+ public void testReplicationPeerModificationSwitch() throws Exception {
+ assertTrue(ADMIN.isReplicationPeerModificationEnabled());
+ try {
+ // disable modification, should returns true as it is enabled by default and the above
+ // assertion has confirmed it
+ assertTrue(ADMIN.replicationPeerModificationSwitch(false));
+ IOException error =
+ assertThrows(IOException.class, () -> ADMIN.addReplicationPeer("peer", ReplicationPeerConfig
+ .newBuilder().setClusterKey(TEST_UTIL.getClusterKey() + "-test").build()));
+ assertThat(error.getCause().getMessage(),
+ containsString("Replication peer modification disabled"));
+ // enable again, and the previous value should be false
+ assertFalse(ADMIN.replicationPeerModificationSwitch(true));
+ } finally {
+ // always reset to avoid mess up other tests
+ ADMIN.replicationPeerModificationSwitch(true);
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index e50b14aa3ff0..f942f4ed99db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -18,13 +18,15 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -106,6 +108,7 @@ public void clearPeerAndQueues() throws IOException, ReplicationException {
queueStorage.removeQueue(serverName, queue);
}
}
+ admin.replicationPeerModificationSwitch(true).join();
}
@Test
@@ -509,7 +512,7 @@ public void testSetReplicationEndpoint() throws InterruptedException, ExecutionE
}
}
- /*
+ /**
* Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist.
*/
@Test
@@ -522,4 +525,19 @@ public void testReplicationPeerNotFoundException() throws InterruptedException {
assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class));
}
}
+
+ @Test
+ public void testReplicationPeerModificationSwitch() throws Exception {
+ assertTrue(admin.isReplicationPeerModificationEnabled().get());
+ // disable modification, should returns true as it is enabled by default and the above
+ // assertion has confirmed it
+ assertTrue(admin.replicationPeerModificationSwitch(false).get());
+ ExecutionException error = assertThrows(ExecutionException.class, () -> admin
+ .addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build())
+ .get());
+ assertThat(error.getCause().getMessage(),
+ containsString("Replication peer modification disabled"));
+ // enable again, and the previous value should be false
+ assertFalse(admin.replicationPeerModificationSwitch(true).get());
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index bc3969ffd518..4ff69f419fd6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -518,4 +518,14 @@ public long modifyColumnStoreFileTracker(TableName tableName, byte[] family, Str
long nonceGroup, long nonce) throws IOException {
return -1;
}
+
+ @Override
+ public boolean replicationPeerModificationSwitch(boolean on) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean isReplicationPeerModificationEnabled() {
+ return false;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java
new file mode 100644
index 000000000000..2c038b6798ba
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.client.AsyncAdmin;
+import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.FSReplicationPeerStorage;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, LargeTests.class })
+public class TestDisablePeerModification {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestDisablePeerModification.class);
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ private static CountDownLatch ARRIVE = new CountDownLatch(1);
+
+ private static CountDownLatch RESUME = new CountDownLatch(1);
+
+ public static final class MockPeerStorage extends FSReplicationPeerStorage {
+
+ public MockPeerStorage(FileSystem fs, Configuration conf) throws IOException {
+ super(fs, conf);
+ }
+
+ @Override
+ public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled,
+ SyncReplicationState syncReplicationState) throws ReplicationException {
+ ARRIVE.countDown();
+ try {
+ RESUME.await();
+ } catch (InterruptedException e) {
+ throw new ReplicationException(e);
+ }
+ super.addPeer(peerId, peerConfig, enabled, syncReplicationState);
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.getConfiguration().setClass(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+ MockPeerStorage.class, ReplicationPeerStorage.class);
+ UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testDrainProcs() throws Exception {
+ AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin();
+ ReplicationPeerConfig rpc =
+ ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
+ .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
+ CompletableFuture addFuture = admin.addReplicationPeer("test_peer", rpc);
+ ARRIVE.await();
+
+ // we have a pending add peer procedure which has already passed the first state, let's issue a
+ // peer modification switch request to disable peer modification and set drainProcs to true
+ CompletableFuture switchFuture = admin.replicationPeerModificationSwitch(false, true);
+
+ // sleep a while, the switchFuture should not finish yet
+ // the sleep is necessary as we can not join on the switchFuture, so there is no stable way to
+ // make sure we have already changed the flag at master side, sleep a while is the most suitable
+ // way here
+ Thread.sleep(5000);
+ assertFalse(switchFuture.isDone());
+
+ // also verify that we can not schedule a new peer modification procedure
+ AddPeerProcedure proc = new AddPeerProcedure("failure", rpc, true);
+ UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(proc);
+ UTIL.waitFor(15000, () -> proc.isFinished());
+ // make sure the procedure is failed because of peer modification disabled
+ assertTrue(proc.isFailed());
+ assertThat(proc.getException().getCause().getMessage(),
+ containsString("Replication peer modification disabled"));
+
+ // sleep a while and check again, make sure the switchFuture is still not done
+ Thread.sleep(5000);
+ assertFalse(switchFuture.isDone());
+
+ // resume the add peer procedure and wait it done
+ RESUME.countDown();
+ addFuture.get();
+
+ // this time the switchFuture should be able to finish
+ assertTrue(switchFuture.get());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
index fcc45c4be9db..1c94affd1dd1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
@@ -954,4 +954,15 @@ public Future modifyTableStoreFileTrackerAsync(TableName tableName, String
public void flushMasterStore() throws IOException {
admin.flushMasterStore();
}
+
+ @Override
+ public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures)
+ throws IOException {
+ return admin.replicationPeerModificationSwitch(on, drainProcedures);
+ }
+
+ @Override
+ public boolean isReplicationPeerModificationEnabled() throws IOException {
+ return admin.isReplicationPeerModificationEnabled();
+ }
}
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index ce9f4c731cd2..f2f277008de1 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -479,5 +479,21 @@ def update_peer_config(id, args = {})
@admin.updateReplicationPeerConfig(id, builder.build)
end
+
+ #----------------------------------------------------------------------------------------------
+ # Enable/disable replication peer modification
+ # Returns previous switch setting.
+ def peer_modification_switch(enable_or_disable, drain_procs)
+ @admin.replicationPeerModificationSwitch(
+ java.lang.Boolean.valueOf(enable_or_disable), java.lang.Boolean.valueOf(drain_procs)
+ )
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Query whether replication peer modification is enabled.
+ # Returns whether replication peer modification is enabled (true is enabled).
+ def peer_modification_enabled?
+ @admin.isReplicationPeerModificationEnabled
+ end
end
end
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index eeac0ba95f7e..1d579319a5ca 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -526,6 +526,8 @@ def self.exception_handler(hide_traceback)
list_peer_configs
update_peer_config
transit_peer_sync_replication_state
+ peer_modification_enabled
+ peer_modification_switch
]
)
diff --git a/hbase-shell/src/main/ruby/shell/commands/peer_modification_enabled.rb b/hbase-shell/src/main/ruby/shell/commands/peer_modification_enabled.rb
new file mode 100644
index 000000000000..dd12b5ba9ac7
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/peer_modification_enabled.rb
@@ -0,0 +1,40 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# frozen_string_literal: true
+
+module Shell
+ module Commands
+ # Prints whether peer modification operations are enabled
+ class PeerModificationEnabled < Command
+ def help
+ <<~EOF
+Query whether peer modification operations are enabled
+Examples:
+
+ hbase> peer_modification_enabled
+EOF
+ end
+
+ def command
+ state = replication_admin.peer_modification_enabled?
+ formatter.row([state.to_s])
+ state
+ end
+ end
+ end
+end
diff --git a/hbase-shell/src/main/ruby/shell/commands/peer_modification_switch.rb b/hbase-shell/src/main/ruby/shell/commands/peer_modification_switch.rb
new file mode 100644
index 000000000000..45ae167c9655
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/peer_modification_switch.rb
@@ -0,0 +1,46 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# frozen_string_literal: true
+
+module Shell
+ module Commands
+ # Enable or disable peer modification operations
+ class PeerModificationSwitch < Command
+ def help
+ <<~EOF
+Enable/Disable peer modification. Returns previous state.
+Examples:
+
+ hbase> peer_modification_switch true
+ hbase> peer_modification_switch false, true
+
+The second boolean parameter means whether you want to wait until all remaining peer modification
+finished, before the command returns.
+EOF
+ end
+
+ def command(enable_or_disable, drain_procs = false)
+ prev_state = !!replication_admin.peer_modification_switch(enable_or_disable, drain_procs)
+ formatter.row(["Previous peer modification state : #{prev_state}"])
+ prev_state
+ end
+ end
+ end
+end
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index c6ed817ad4ea..2b19ecb59a40 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -606,7 +606,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map)
assert_equal(0, command(:list_peers).length)
end
- define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
+ define_test 'set_peer_bandwidth: works with peer bandwidth upper limit' do
cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test"
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args)
@@ -621,7 +621,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map)
command(:remove_peer, @peer_id)
end
- define_test "transit_peer_sync_replication_state: test" do
+ define_test 'transit_peer_sync_replication_state: test' do
cluster_key = "server1.cie.com:2181:/hbase"
remote_wal_dir = "hdfs://srv1:9999/hbase"
table_cfs = { "ns3:table1" => [], "ns3:table2" => [],
@@ -652,7 +652,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map)
command(:remove_peer, @peer_id)
end
- define_test "get_peer_config: works with simple clusterKey peer" do
+ define_test 'get_peer_config: works with simple clusterKey peer' do
cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test"
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args)
@@ -696,7 +696,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map)
command(:remove_peer, peer_id_second)
end
- define_test "update_peer_config: can update peer config and data" do
+ define_test 'update_peer_config: can update peer config and data' do
config_params = { "config1" => "value1", "config2" => "value2" }
data_params = {"data1" => "value1", "data2" => "value2"}
args = {ENDPOINT_CLASSNAME => @dummy_endpoint, CONFIG => config_params, DATA => data_params}
@@ -717,7 +717,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map)
assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2"))))
end
- define_test "append_peer_exclude_namespaces: works with namespaces array" do
+ define_test 'append_peer_exclude_namespaces: works with namespaces array' do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args)
@@ -753,7 +753,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map)
command(:remove_peer, @peer_id)
end
- define_test "remove_peer_exclude_namespaces: works with namespaces array" do
+ define_test 'remove_peer_exclude_namespaces: works with namespaces array' do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args)
@@ -791,6 +791,20 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map)
command(:remove_peer, @peer_id)
end
+ define_test 'peer_modification_switch' do
+ command(:peer_modification_switch, true)
+ output = capture_stdout { command(:peer_modification_enabled) }
+ assert(output.include?('true'))
+
+ output = capture_stdout { command(:peer_modification_switch, false, true) }
+ assert(output.include?('true'))
+ output = capture_stdout { command(:peer_modification_enabled) }
+ assert(output.include?('false'))
+
+ output = capture_stdout { command(:peer_modification_switch, true) }
+ assert(output.include?('false'))
+ end
+
# assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279
# Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below.
# define_test "add_peer: adding a second peer with same id should error" do
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 0089da4a9a3b..1b3c29ebe665 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -1329,4 +1329,17 @@ public Future modifyTableStoreFileTrackerAsync(TableName tableName, String
public void flushMasterStore() throws IOException {
throw new NotImplementedException("flushMasterStore not supported in ThriftAdmin");
}
+
+ @Override
+ public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures)
+ throws IOException {
+ throw new NotImplementedException(
+ "replicationPeerModificationSwitch not supported in ThriftAdmin");
+ }
+
+ @Override
+ public boolean isReplicationPeerModificationEnabled() throws IOException {
+ throw new NotImplementedException(
+ "isReplicationPeerModificationEnabled not supported in ThriftAdmin");
+ }
}