diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 8cb40cb58803..1c77e8dfaafa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -229,6 +229,7 @@ import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.ReplicationSyncUpToolInfo; @@ -364,6 +365,9 @@ public class HMaster extends HBaseServerBase implements Maste private RSGroupInfoManager rsGroupInfoManager; + private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier = + new ReplicationLogCleanerBarrier(); + // manager of replication private ReplicationPeerManager replicationPeerManager; @@ -4106,6 +4110,11 @@ public ReplicationPeerManager getReplicationPeerManager() { return replicationPeerManager; } + @Override + public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() { + return replicationLogCleanerBarrier; + } + public HashMap>> getReplicationLoad(ServerName[] serverNames) { List peerList = this.getReplicationPeerManager().listPeers(null); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 1958e64767eb..d450fbb45ac0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; @@ -361,6 +362,12 @@ ReplicationPeerConfig getReplicationPeerConfig(String peerId) */ ReplicationPeerManager getReplicationPeerManager(); + /** + * Returns the {@link ReplicationLogCleanerBarrier}. It will be used at multiple places so we put + * it in MasterServices directly. + */ + ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier(); + /** * Returns the {@link SyncReplicationReplayWALManager}. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index 1d02fab5f194..c469896d3e7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -86,7 +86,7 @@ protected ReplicationPeerConfig getNewPeerConfig() { @Override protected void releaseLatch(MasterProcedureEnv env) { if (cleanerDisabled) { - env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable(); + env.getMasterServices().getReplicationLogCleanerBarrier().enable(); } if (peerConfig.isSyncReplication()) { env.getReplicationPeerManager().releaseSyncReplicationPeerLock(); @@ -97,7 +97,7 @@ protected void releaseLatch(MasterProcedureEnv env) { @Override protected void prePeerModification(MasterProcedureEnv env) throws IOException, ReplicationException, ProcedureSuspendedException { - if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) { + if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) { throw suspend(env.getMasterConfiguration(), backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs", peerId, backoff / 1000)); @@ -142,7 +142,7 @@ protected void afterReplay(MasterProcedureEnv env) { // when executing the procedure we will try to disable and acquire. return; } - if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) { + if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) { throw new IllegalStateException("can not disable log cleaner, this should not happen"); } cleanerDisabled = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index b7c4e33ef858..c88d613e5260 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -115,7 +115,7 @@ private void shutdownExecutorService() { private void disableReplicationLogCleaner(MasterProcedureEnv env) throws ProcedureSuspendedException { - if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) { + if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) { // it is not likely that we can reach here as we will schedule this procedure immediately // after master restarting, where ReplicationLogCleaner should have not started its first run // yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since @@ -130,7 +130,7 @@ private void disableReplicationLogCleaner(MasterProcedureEnv env) } private void enableReplicationLogCleaner(MasterProcedureEnv env) { - env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable(); + env.getMasterServices().getReplicationLogCleanerBarrier().enable(); } private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException { @@ -224,7 +224,7 @@ protected Flow executeFromState(MasterProcedureEnv env, lockEntry = procLock.getLockEntry(getProcId()); } catch (IOException ioe) { LOG.error("Error while acquiring execution lock for procedure {}" - + " when trying to wake it up, aborting...", ioe); + + " when trying to wake it up, aborting...", this, ioe); env.getMasterServices().abort("Can not acquire procedure execution lock", e); return; } @@ -304,7 +304,7 @@ protected void afterReplay(MasterProcedureEnv env) { // when executing the procedure we will try to disable and acquire. return; } - if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) { + if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) { throw new IllegalStateException("can not disable log cleaner, this should not happen"); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 8b01225e553e..53a7a6f00146 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; -import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -115,9 +114,6 @@ public class ReplicationPeerManager implements ConfigurationObserver { // Only allow to add one sync replication peer concurrently private final Semaphore syncReplicationPeerLock = new Semaphore(1); - private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier = - new ReplicationLogCleanerBarrier(); - private final String clusterId; private volatile Configuration conf; @@ -725,10 +721,6 @@ public void releaseSyncReplicationPeerLock() { syncReplicationPeerLock.release(); } - public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() { - return replicationLogCleanerBarrier; - } - @Override public void onConfigurationChange(Configuration conf) { this.conf = conf; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 3ab52da6158e..6ebcac7e453a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -65,6 +65,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { // queue for a given peer, that why we can use a String peerId as key instead of // ReplicationQueueId. private Map>> replicationOffsets; + private ReplicationLogCleanerBarrier barrier; private ReplicationPeerManager rpm; private Supplier> getNotFullyDeadServers; @@ -84,7 +85,7 @@ public void preClean() { LOG.error("Error occurred while executing queueStorage.hasData()", e); return; } - canFilter = rpm.getReplicationLogCleanerBarrier().start(); + canFilter = barrier.start(); if (canFilter) { notFullyDeadServers = getNotFullyDeadServers.get(); peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId) @@ -98,7 +99,7 @@ public void preClean() { allQueueData = rpm.getQueueStorage().listAllQueues(); } catch (ReplicationException e) { LOG.error("Can not list all replication queues, give up cleaning", e); - rpm.getReplicationLogCleanerBarrier().stop(); + barrier.stop(); canFilter = false; notFullyDeadServers = null; peerIds = null; @@ -122,7 +123,7 @@ public void preClean() { @Override public void postClean() { if (canFilter) { - rpm.getReplicationLogCleanerBarrier().stop(); + barrier.stop(); canFilter = false; // release memory notFullyDeadServers = null; @@ -244,6 +245,7 @@ public void init(Map params) { Object master = params.get(HMaster.MASTER); if (master != null && master instanceof MasterServices) { MasterServices m = (MasterServices) master; + barrier = m.getReplicationLogCleanerBarrier(); rpm = m.getReplicationPeerManager(); getNotFullyDeadServers = () -> getNotFullyDeadServers(m); return; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 26f6ac512f2f..d526358ceb4e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; @@ -524,4 +525,9 @@ public boolean replicationPeerModificationSwitch(boolean on) throws IOException public boolean isReplicationPeerModificationEnabled() { return false; } + + @Override + public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 5d474bc21640..699d9f963da7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -132,10 +132,11 @@ public void beforeTest() throws Exception { masterServices = mock(MasterServices.class); when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection()); + when(masterServices.getReplicationLogCleanerBarrier()) + .thenReturn(new ReplicationLogCleanerBarrier()); ReplicationPeerManager rpm = mock(ReplicationPeerManager.class); when(masterServices.getReplicationPeerManager()).thenReturn(rpm); when(rpm.getQueueStorage()).thenReturn(queueStorage); - when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier()); when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); ServerManager sm = mock(ServerManager.class); when(masterServices.getServerManager()).thenReturn(sm); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java index cb795edcd623..a2709548bc3e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java @@ -214,8 +214,8 @@ public void testDisablePeerAndWaitStates() throws Exception { EXTRA_REGION_SERVERS .put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics); - ReplicationLogCleanerBarrier barrier = UTIL.getHBaseCluster().getMaster() - .getReplicationPeerManager().getReplicationLogCleanerBarrier(); + ReplicationLogCleanerBarrier barrier = + UTIL.getHBaseCluster().getMaster().getReplicationLogCleanerBarrier(); assertTrue(barrier.start()); ProcedureExecutor procExec = getMasterProcedureExecutor(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java index 7edadae03b14..a1850b68eba5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java @@ -80,8 +80,8 @@ public class TestReplicationLogCleaner { @Before public void setUp() throws ReplicationException { services = mock(MasterServices.class); + when(services.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier()); ReplicationPeerManager rpm = mock(ReplicationPeerManager.class); - when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier()); when(services.getReplicationPeerManager()).thenReturn(rpm); when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class); @@ -157,7 +157,7 @@ public void testNoConf() { @Test public void testCanNotFilter() { - assertTrue(services.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()); + assertTrue(services.getReplicationLogCleanerBarrier().disable()); List files = Arrays.asList(new FileStatus()); assertSame(Collections.emptyList(), runCleaner(cleaner, files)); }