Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -48,6 +50,9 @@ class FSEditLogAsync extends FSEditLog implements Runnable {
// requires concurrent access from caller threads and syncing thread.
private final BlockingQueue<Edit> editPendingQ;

// Thread pool for executing logSyncNotify
private final ExecutorService logSyncNotifyExecutor;

// only accessed by syncing thread so no synchronization required.
// queue is unbounded because it's effectively limited by the size
// of the edit log buffer - ie. a sync will eventually be forced.
Expand All @@ -63,6 +68,9 @@ class FSEditLogAsync extends FSEditLog implements Runnable {
DFS_NAMENODE_EDITS_ASYNC_LOGGING_PENDING_QUEUE_SIZE_DEFAULT);

editPendingQ = new ArrayBlockingQueue<>(editPendingQSize);

// the thread pool size should be configurable later, and justified with a rationale
logSyncNotifyExecutor = Executors.newFixedThreadPool(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to make this improvement be more configurable to use. Can we make thread pool size be configurable in this PR? if the pool size is configured as 0, that means this improvements is disabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. What should be the default value? Many users use the default value, so probably we shouldn't set it as 0 by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@functioner , we could make 10 as the default pool size.

}

private boolean isSyncThreadAlive() {
Expand Down Expand Up @@ -117,6 +125,7 @@ void openForWrite(int layoutVersion) throws IOException {
public void close() {
super.close();
stopSyncThread();
logSyncNotifyExecutor.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could meet issue when transition active to standby then transition back here. Because executor has been shutdown and no response will be sent. Moreover I suspect that namenode process could fatal in this case. In my opinion, we do not need to shutdown this executor. FYI. Thanks.

}

@Override
Expand Down Expand Up @@ -251,7 +260,9 @@ public void run() {
syncEx = ex;
}
while ((edit = syncWaitQ.poll()) != null) {
edit.logSyncNotify(syncEx);
final Edit notifyEdit = edit;
final RuntimeException ex = syncEx;
logSyncNotifyExecutor.submit(() -> notifyEdit.logSyncNotify(ex));
}
}
}
Expand Down