diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java index 18dfc7d493bf..7497877e67c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java @@ -35,7 +35,6 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; @@ -153,25 +152,19 @@ List createSplitWALProcedures(List splittingWALs, */ public ServerName acquireSplitWALWorker(Procedure procedure) throws ProcedureSuspendedException { - Optional worker = splitWorkerAssigner.acquire(); - if (worker.isPresent()) { - LOG.debug("Acquired split WAL worker={}", worker.get()); - return worker.get(); - } - splitWorkerAssigner.suspend(procedure); - throw new ProcedureSuspendedException(); + ServerName worker = splitWorkerAssigner.acquire(procedure); + LOG.debug("Acquired split WAL worker={}", worker); + return worker; } /** * After the worker finished the split WAL task, it will release the worker, and wake up all the * suspend procedures in the ProcedureEvent * @param worker worker which is about to release - * @param scheduler scheduler which is to wake up the procedure event */ - public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { + public void releaseSplitWALWorker(ServerName worker) { LOG.debug("Release split WAL worker={}", worker); splitWorkerAssigner.release(worker); - splitWorkerAssigner.wake(scheduler); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java index b6df41acee23..b1f2558045d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java @@ -23,9 +23,9 @@ import java.util.Map; import java.util.Optional; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.yetus.audience.InterfaceAudience; /** @@ -51,7 +51,7 @@ public WorkerAssigner(MasterServices master, int maxTasks, ProcedureEvent eve } } - public synchronized Optional acquire() { + public synchronized ServerName acquire(Procedure proc) throws ProcedureSuspendedException { List serverList = master.getServerManager().getOnlineServersList(); Collections.shuffle(serverList); Optional worker = serverList.stream() @@ -60,27 +60,30 @@ public synchronized Optional acquire() { .findAny(); worker.ifPresent(name -> currentWorkers.compute(name, (serverName, availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1)); - return worker; + if (worker.isPresent()) { + ServerName sn = worker.get(); + currentWorkers.compute(sn, (serverName, + availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1); + return sn; + } else { + event.suspend(); + event.suspendIfNotReady(proc); + throw new ProcedureSuspendedException(); + } } public synchronized void release(ServerName serverName) { currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); - } - - public void suspend(Procedure proc) { - event.suspend(); - event.suspendIfNotReady(proc); - } - - public void wake(MasterProcedureScheduler scheduler) { if (!event.isReady()) { - event.wake(scheduler); + event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); } } @Override - public void serverAdded(ServerName worker) { - this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + public synchronized void serverAdded(ServerName worker) { + if (!event.isReady()) { + event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + } } public synchronized void addUsedWorker(ServerName worker) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java index 651822ff5b2a..8ec261d768c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java @@ -109,8 +109,7 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) { setFailure("verify-snapshot", e); } finally { // release the worker - env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer, - env.getProcedureScheduler()); + env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java index 699834f9c1d7..98c2c0ec6930 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java @@ -90,7 +90,7 @@ protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.Sp skipPersistence(); throw new ProcedureSuspendedException(); } - splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + splitWALManager.releaseSplitWALWorker(worker); if (!finished) { LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker); setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 1ddcd2e3408e..677baff1b6bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner; import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure; import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure; @@ -1419,20 +1418,14 @@ public boolean snapshotProcedureEnabled() { public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure) throws ProcedureSuspendedException { - Optional worker = verifyWorkerAssigner.acquire(); - if (worker.isPresent()) { - LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get()); - return worker.get(); - } - verifyWorkerAssigner.suspend(procedure); - throw new ProcedureSuspendedException(); + ServerName worker = verifyWorkerAssigner.acquire(procedure); + LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker); + return worker; } - public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker, - MasterProcedureScheduler scheduler) { + public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) { LOG.debug("{} Release verify snapshot worker={}", procedure, worker); verifyWorkerAssigner.release(worker); - verifyWorkerAssigner.wake(scheduler); } private void restoreWorkers() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java index ea92f7922794..4665b9c16f8a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java @@ -115,8 +115,7 @@ public void testAcquireAndRelease() throws Exception { Assert.assertNotNull(e); Assert.assertTrue(e instanceof ProcedureSuspendedException); - splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster() - .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + splitWALManager.releaseSplitWALWorker(server); Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); } @@ -348,7 +347,7 @@ protected Flow executeFromState(MasterProcedureEnv env, setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); return Flow.HAS_MORE_STATE; case RELEASE_SPLIT_WORKER: - splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + splitWALManager.releaseSplitWALWorker(worker); return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state);