Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ public interface ProcedureExecutorListener {
*/
private TimeoutExecutorThread<TEnvironment> timeoutExecutor;

/**
* Use a dedicated thread for executing WorkerMonitor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more comments here to explain why we need a dedicated thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done.

*/
private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;

private int corePoolSize;
private int maxPoolSize;

Expand Down Expand Up @@ -560,7 +565,8 @@ public void init(int numThreads, boolean abortOnCorruption) throws IOException {
corePoolSize, maxPoolSize);

this.threadGroup = new ThreadGroup("PEWorkerGroup");
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup);
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");

// Create the workers
workerId.set(0);
Expand Down Expand Up @@ -604,12 +610,13 @@ public void startWorkers() throws IOException {
// Start the executors. Here we must have the lastProcId set.
LOG.trace("Start workers {}", workerThreads.size());
timeoutExecutor.start();
workerMonitorExecutor.start();
for (WorkerThread worker: workerThreads) {
worker.start();
}

// Internal chores
timeoutExecutor.add(new WorkerMonitor());
workerMonitorExecutor.add(new WorkerMonitor());

// Add completed cleaner chore
addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
Expand All @@ -624,6 +631,7 @@ public void stop() {
LOG.info("Stopping");
scheduler.stop();
timeoutExecutor.sendStopSignal();
workerMonitorExecutor.sendStopSignal();
}

@VisibleForTesting
Expand All @@ -632,6 +640,8 @@ public void join() {

// stop the timeout executor
timeoutExecutor.awaitTermination();
// stop the work monitor executor
workerMonitorExecutor.awaitTermination();

// stop the worker threads
for (WorkerThread worker: workerThreads) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {

private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();

public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group) {
super(group, "ProcExecTimeout");
public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group,
String name) {
super(group, name);
setDaemon(true);
this.executor = executor;
}
Expand Down