Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -35,7 +34,6 @@
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
Expand Down Expand Up @@ -153,25 +151,19 @@ List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs,
*/
public ServerName acquireSplitWALWorker(Procedure<?> procedure)
throws ProcedureSuspendedException {
Optional<ServerName> worker = splitWorkerAssigner.acquire();
if (worker.isPresent()) {
LOG.debug("Acquired split WAL worker={}", worker.get());
return worker.get();
}
splitWorkerAssigner.suspend(procedure);
throw new ProcedureSuspendedException();
ServerName worker = splitWorkerAssigner.acquire(procedure);
LOG.debug("Acquired split WAL worker={}", worker);
return worker;
}

/**
* After the worker finished the split WAL task, it will release the worker, and wake up all the
* suspend procedures in the ProcedureEvent
* @param worker worker which is about to release
* @param scheduler scheduler which is to wake up the procedure event
* @param worker worker which is about to release
*/
public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
public void releaseSplitWALWorker(ServerName worker) {
LOG.debug("Release split WAL worker={}", worker);
splitWorkerAssigner.release(worker);
splitWorkerAssigner.wake(scheduler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -51,36 +51,37 @@ public WorkerAssigner(MasterServices master, int maxTasks, ProcedureEvent<?> eve
}
}

public synchronized Optional<ServerName> acquire() {
public synchronized ServerName acquire(Procedure<?> proc) throws ProcedureSuspendedException {
List<ServerName> serverList = master.getServerManager().getOnlineServersList();
Collections.shuffle(serverList);
Optional<ServerName> worker = serverList.stream()
.filter(
serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
.findAny();
worker.ifPresent(name -> currentWorkers.compute(name, (serverName,
availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1));
return worker;
if (worker.isPresent()) {
ServerName sn = worker.get();
currentWorkers.compute(sn, (serverName,
availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1);
return sn;
} else {
event.suspend();
event.suspendIfNotReady(proc);
throw new ProcedureSuspendedException();
}
}

public synchronized void release(ServerName serverName) {
currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
}

public void suspend(Procedure<?> proc) {
event.suspend();
event.suspendIfNotReady(proc);
}

public void wake(MasterProcedureScheduler scheduler) {
if (!event.isReady()) {
event.wake(scheduler);
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
}
}

@Override
public void serverAdded(ServerName worker) {
this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
public synchronized void serverAdded(ServerName worker) {
if (!event.isReady()) {
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
}
}

public synchronized void addUsedWorker(ServerName worker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ protected synchronized boolean complete(MasterProcedureEnv env, Throwable error)
setFailure("verify-snapshot", e);
} finally {
// release the worker
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer,
env.getProcedureScheduler());
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer);
}
return isProcedureCompleted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.Sp
skipPersistence();
throw new ProcedureSuspendedException();
}
splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
splitWALManager.releaseSplitWALWorker(worker);
if (!finished) {
LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker);
setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -66,7 +65,6 @@
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
Expand Down Expand Up @@ -1474,20 +1472,14 @@ public boolean snapshotProcedureEnabled() {

public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure)
throws ProcedureSuspendedException {
Optional<ServerName> worker = verifyWorkerAssigner.acquire();
if (worker.isPresent()) {
LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get());
return worker.get();
}
verifyWorkerAssigner.suspend(procedure);
throw new ProcedureSuspendedException();
ServerName worker = verifyWorkerAssigner.acquire(procedure);
LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker);
return worker;
}

public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker,
MasterProcedureScheduler scheduler) {
public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) {
LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
verifyWorkerAssigner.release(worker);
verifyWorkerAssigner.wake(scheduler);
}

private void restoreWorkers() {
Expand Down
Loading