Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
() -> {
if (workerThreads.isEmpty()) {
this.getSourceMetrics().clear();
this.terminate("Finished recovering queue");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we just need to call replicationEndpoint.stop? The terminate method is designed for terminate a replication source when removing a peer or refreshing a source when there are still data need to be replicated in the source, so it contains a lot of unnecessary logics when we just want to remove a finished recovered source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, calling replicationEndpoint.stop can solve this bug. Don't the workerThreads created during the initialization of RecoveredReplicationSource need to be stopped? I think the context simply removes it from the map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReplicationSourceShipper is just a thread, it will quit after we returning from the run method, so we do not need to stop it again. And even if you call terminate method here, there is no way to close these threads as there is a workerThreads.isEmpty check above...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks for your help. It is more reasonable to call replicationEndpoint.stop here without call terminate.

manager.finishRecoveredSource(this);
}
});
Expand Down