@@ -373,8 +373,8 @@ private void tryStartNewShipper(String walGroupId) {
373373 Threads .setDaemonThreadRunning (
374374 walReader , Thread .currentThread ().getName () + ".replicationSource.wal-reader."
375375 + walGroupId + "," + queueId ,
376- (t , e ) -> this .uncaughtException (t , e , this .manager , this .getPeerId ()));
377- worker .startup ((t , e ) -> this .uncaughtException (t , e , this .manager , this .getPeerId ()));
376+ (t , e ) -> this .retryRefreshing (t , e , this .manager , this .getPeerId ()));
377+ worker .startup ((t , e ) -> this .retryRefreshing (t , e , this .manager , this .getPeerId ()));
378378 return worker ;
379379 }
380380 });
@@ -448,22 +448,26 @@ WALEntryFilter getWalEntryFilter() {
448448 return walEntryFilter ;
449449 }
450450
451- private void uncaughtException (Thread t , Throwable e , ReplicationSourceManager manager ,
451+ private void retryRefreshing (Thread t , Throwable error , ReplicationSourceManager manager ,
452452 String peerId ) {
453- OOMEChecker .exitIfOOME (e , getClass ().getSimpleName ());
454- LOG .error ("Unexpected exception in {} currentPath={}" , t .getName (), getCurrentPath (), e );
453+ OOMEChecker .exitIfOOME (error , getClass ().getSimpleName ());
454+ LOG .error ("Unexpected exception in {} currentPath={}" , t .getName (), getCurrentPath (), error );
455455 if (abortOnError ) {
456- server .abort ("Unexpected exception in " + t .getName (), e );
456+ server .abort ("Unexpected exception in " + t .getName (), error );
457457 }
458458 if (manager != null ) {
459459 while (true ) {
460+ if (server .isAborted () || server .isStopped () || server .isStopping ()) {
461+ LOG .warn ("Server is shutting down, give up refreshing source for peer {}" , peerId );
462+ return ;
463+ }
460464 try {
461465 LOG .info ("Refreshing replication sources now due to previous error on thread: {}" ,
462466 t .getName ());
463467 manager .refreshSources (peerId );
464468 break ;
465- } catch (IOException | ReplicationException e1 ) {
466- LOG .error ("Replication sources refresh failed." , e1 );
469+ } catch (Exception e ) {
470+ LOG .error ("Replication sources refresh failed." , e );
467471 sleepForRetries ("Sleeping before try refreshing sources again" , maxRetriesMultiplier );
468472 }
469473 }
@@ -630,7 +634,7 @@ public ReplicationSourceInterface startup() {
630634 // keep looping in this thread until initialize eventually succeeds,
631635 // while the server main startup one can go on with its work.
632636 sourceRunning = false ;
633- uncaughtException (t , e , null , null );
637+ retryRefreshing (t , e , null , null );
634638 retryStartup .set (!this .abortOnError );
635639 do {
636640 if (retryStartup .get ()) {
@@ -641,7 +645,7 @@ public ReplicationSourceInterface startup() {
641645 initialize ();
642646 } catch (Throwable error ) {
643647 setSourceStartupStatus (false );
644- uncaughtException (t , error , null , null );
648+ retryRefreshing (t , error , null , null );
645649 retryStartup .set (!this .abortOnError );
646650 }
647651 }
0 commit comments