4646import java .util .concurrent .ExecutorService ;
4747import java .util .concurrent .Executors ;
4848import java .util .concurrent .Future ;
49+ import java .util .concurrent .LinkedBlockingQueue ;
50+ import java .util .concurrent .ThreadPoolExecutor ;
4951import java .util .concurrent .TimeUnit ;
5052import java .util .concurrent .TimeoutException ;
5153import java .util .concurrent .atomic .AtomicBoolean ;
@@ -329,8 +331,12 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
329331
330332 protected final AtomicBoolean rollRequested = new AtomicBoolean (false );
331333
332- private final ExecutorService logArchiveOrShutdownExecutor = Executors .newSingleThreadExecutor (
333- new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("WAL-Archive-Or-Shutdown-%d" ).build ());
334+ // Run in caller if we get reject execution exception, to avoid aborting region server when we get
335+ // reject execution exception. Usually this should not happen but let's make it more robust.
336+ private final ExecutorService logArchiveExecutor =
337+ new ThreadPoolExecutor (1 , 1 , 1L , TimeUnit .MINUTES , new LinkedBlockingQueue <Runnable >(),
338+ new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("WAL-Archive-%d" ).build (),
339+ new ThreadPoolExecutor .CallerRunsPolicy ());
334340
335341 private final int archiveRetries ;
336342
@@ -696,7 +702,7 @@ private void cleanOldLogs() throws IOException {
696702 final List <Pair <Path , Long >> localLogsToArchive = logsToArchive ;
697703 // make it async
698704 for (Pair <Path , Long > log : localLogsToArchive ) {
699- logArchiveOrShutdownExecutor .execute (() -> {
705+ logArchiveExecutor .execute (() -> {
700706 archive (log );
701707 });
702708 this .walFile2Props .remove (log .getFirst ());
@@ -903,7 +909,10 @@ public void shutdown() throws IOException {
903909 }
904910 }
905911
906- Future <Void > future = logArchiveOrShutdownExecutor .submit (new Callable <Void >() {
912+ ExecutorService shutdownExecutor = Executors .newSingleThreadExecutor (
913+ new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("WAL-Shutdown-%d" ).build ());
914+
915+ Future <Void > future = shutdownExecutor .submit (new Callable <Void >() {
907916 @ Override
908917 public Void call () throws Exception {
909918 if (rollWriterLock .tryLock (walShutdownTimeout , TimeUnit .SECONDS )) {
@@ -921,7 +930,7 @@ public Void call() throws Exception {
921930 return null ;
922931 }
923932 });
924- logArchiveOrShutdownExecutor .shutdown ();
933+ shutdownExecutor .shutdown ();
925934
926935 try {
927936 future .get (walShutdownTimeout , TimeUnit .MILLISECONDS );
@@ -938,6 +947,12 @@ public Void call() throws Exception {
938947 } else {
939948 throw new IOException (e .getCause ());
940949 }
950+ } finally {
951+ // in shutdown we may call cleanOldLogs so shutdown this executor in the end.
952+ // In sync replication implementation, we may shutdown a WAL without shutting down the whole
953+ // region server, if we shutdown this executor earlier we may get reject execution exception
954+ // and abort the region server
955+ logArchiveExecutor .shutdown ();
941956 }
942957 }
943958
0 commit comments