9191 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
9292 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
9393 * operations.</li>
94- * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
95- * {@link #addPeer(String)}, {@link #removePeer(String)},
96- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}.
97- * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
98- * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
99- * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
100- * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
101- * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
102- * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
103- * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
104- * {@link #preLogRoll(Path)}.</li>
105- * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
106- * modify it, {@link #removePeer(String)} ,
107- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
108- * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
109- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
110- * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
111- * {@link ReplicationSourceInterface} firstly, then remove the wals from
112- * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
113- * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
114- * {@link ReplicationSourceInterface}. So there is no race here. For
115- * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
116- * is already synchronized on {@link #oldsources}. So no need synchronized on
117- * {@link #walsByIdRecoveredQueues}.</li>
11894 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
11995 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
12096 * to-be-removed peer.</li>
@@ -135,15 +111,6 @@ public class ReplicationSourceManager implements ReplicationListener {
135111 // All about stopping
136112 private final Server server ;
137113
138- // All logs we are currently tracking
139- // Index structure of the map is: queue_id->logPrefix/logGroup->logs
140- // For normal replication source, the peer id is same with the queue id
141- private final ConcurrentMap <String , Map <String , NavigableSet <String >>> walsById ;
142- // Logs for recovered sources we are currently tracking
143- // the map is: queue_id->logPrefix/logGroup->logs
144- // For recovered source, the queue id's format is peer_id-servername-*
145- private final ConcurrentMap <String , Map <String , NavigableSet <String >>> walsByIdRecoveredQueues ;
146-
147114 private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager ;
148115
149116 private final Configuration conf ;
@@ -195,8 +162,6 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
195162 this .replicationPeers = replicationPeers ;
196163 this .replicationTracker = replicationTracker ;
197164 this .server = server ;
198- this .walsById = new ConcurrentHashMap <>();
199- this .walsByIdRecoveredQueues = new ConcurrentHashMap <>();
200165 this .oldsources = new ArrayList <>();
201166 this .conf = conf ;
202167 this .fs = fs ;
@@ -331,7 +296,6 @@ public void removePeer(String peerId) {
331296 // Delete queue from storage and memory and queue id is same with peer id for normal
332297 // source
333298 deleteQueue (peerId );
334- this .walsById .remove (peerId );
335299 }
336300 ReplicationPeerConfig peerConfig = peer .getPeerConfig ();
337301 if (peerConfig .isSyncReplication ()) {
@@ -372,15 +336,10 @@ ReplicationSourceInterface addSource(String peerId) throws IOException {
372336 // synchronized on latestPaths to avoid missing the new log
373337 synchronized (this .latestPaths ) {
374338 this .sources .put (peerId , src );
375- Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
376- this .walsById .put (peerId , walsByGroup );
377339 // Add the latest wal to that source's queue
378340 if (!latestPaths .isEmpty ()) {
379341 for (Map .Entry <String , Path > walPrefixAndPath : latestPaths .entrySet ()) {
380342 Path walPath = walPrefixAndPath .getValue ();
381- NavigableSet <String > wals = new TreeSet <>();
382- wals .add (walPath .getName ());
383- walsByGroup .put (walPrefixAndPath .getKey (), wals );
384343 // Abort RS and throw exception to make add peer failed
385344 abortAndThrowIOExceptionWhenFail (
386345 () -> this .queueStorage .addWAL (server .getServerName (), peerId , walPath .getName ()));
@@ -434,7 +393,10 @@ public void drainSources(String peerId) throws IOException, ReplicationException
434393 // map from walsById since later we may fail to delete them from the replication queue
435394 // storage, and when we retry next time, we can not know the wal files that need to be deleted
436395 // from the replication queue storage.
437- walsById .get (peerId ).forEach ((k , v ) -> wals .put (k , new TreeSet <>(v )));
396+ this .queueStorage .getWALsInQueue (this .server .getServerName (), peerId ).forEach (wal -> {
397+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
398+ wals .computeIfAbsent (walPrefix , p -> new TreeSet <>()).add (wal );
399+ });
438400 }
439401 LOG .info ("Startup replication source for " + src .getPeerId ());
440402 src .startup ();
@@ -443,15 +405,6 @@ public void drainSources(String peerId) throws IOException, ReplicationException
443405 queueStorage .removeWAL (server .getServerName (), peerId , wal );
444406 }
445407 }
446- synchronized (walsById ) {
447- Map <String , NavigableSet <String >> oldWals = walsById .get (peerId );
448- wals .forEach ((k , v ) -> {
449- NavigableSet <String > walsByGroup = oldWals .get (k );
450- if (walsByGroup != null ) {
451- walsByGroup .removeAll (v );
452- }
453- });
454- }
455408 // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
456409 // a background task, we will delete the file from replication queue storage under the lock to
457410 // simplify the logic.
@@ -463,7 +416,6 @@ public void drainSources(String peerId) throws IOException, ReplicationException
463416 oldSource .terminate (terminateMessage );
464417 oldSource .getSourceMetrics ().clear ();
465418 queueStorage .removeQueue (server .getServerName (), queueId );
466- walsByIdRecoveredQueues .remove (queueId );
467419 iter .remove ();
468420 }
469421 }
@@ -476,7 +428,7 @@ public void drainSources(String peerId) throws IOException, ReplicationException
476428 * replication queue storage and only to enqueue all logs to the new replication source
477429 * @param peerId the id of the replication peer
478430 */
479- public void refreshSources (String peerId ) throws IOException {
431+ public void refreshSources (String peerId ) throws ReplicationException , IOException {
480432 String terminateMessage = "Peer " + peerId +
481433 " state or config changed. Will close the previous replication source and open a new one" ;
482434 ReplicationPeer peer = replicationPeers .getPeer (peerId );
@@ -489,9 +441,8 @@ public void refreshSources(String peerId) throws IOException {
489441 // Do not clear metrics
490442 toRemove .terminate (terminateMessage , null , false );
491443 }
492- for (NavigableSet <String > walsByGroup : walsById .get (peerId ).values ()) {
493- walsByGroup .forEach (wal -> src .enqueueLog (new Path (this .logDir , wal )));
494- }
444+ this .queueStorage .getWALsInQueue (this .server .getServerName (), peerId )
445+ .forEach (wal -> src .enqueueLog (new Path (this .logDir , wal )));
495446 }
496447 LOG .info ("Startup replication source for " + src .getPeerId ());
497448 src .startup ();
@@ -512,9 +463,8 @@ public void refreshSources(String peerId) throws IOException {
512463 for (String queueId : previousQueueIds ) {
513464 ReplicationSourceInterface replicationSource = createSource (queueId , peer );
514465 this .oldsources .add (replicationSource );
515- for (SortedSet <String > walsByGroup : walsByIdRecoveredQueues .get (queueId ).values ()) {
516- walsByGroup .forEach (wal -> src .enqueueLog (new Path (wal )));
517- }
466+ this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId )
467+ .forEach (wal -> src .enqueueLog (new Path (wal )));
518468 toStartup .add (replicationSource );
519469 }
520470 }
@@ -534,7 +484,6 @@ private boolean removeRecoveredSource(ReplicationSourceInterface src) {
534484 LOG .info ("Done with the recovered queue {}" , src .getQueueId ());
535485 // Delete queue from storage and memory
536486 deleteQueue (src .getQueueId ());
537- this .walsByIdRecoveredQueues .remove (src .getQueueId ());
538487 return true ;
539488 }
540489
@@ -557,8 +506,6 @@ void removeSource(ReplicationSourceInterface src) {
557506 this .sources .remove (src .getPeerId ());
558507 // Delete queue from storage and memory
559508 deleteQueue (src .getQueueId ());
560- this .walsById .remove (src .getQueueId ());
561-
562509 }
563510
564511 /**
@@ -644,42 +591,19 @@ public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
644591 * @param source the replication source
645592 */
646593 @ VisibleForTesting
647- void cleanOldLogs (String log , boolean inclusive , ReplicationSourceInterface source ) {
648- String logPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (log );
649- if (source .isRecovered ()) {
650- NavigableSet <String > wals = walsByIdRecoveredQueues .get (source .getQueueId ()).get (logPrefix );
651- if (wals != null ) {
652- NavigableSet <String > walsToRemove = wals .headSet (log , inclusive );
653- if (walsToRemove .isEmpty ()) {
654- return ;
655- }
656- cleanOldLogs (walsToRemove , source );
657- walsToRemove .clear ();
658- }
659- } else {
660- NavigableSet <String > wals ;
661- NavigableSet <String > walsToRemove ;
662- // synchronized on walsById to avoid race with preLogRoll
663- synchronized (this .walsById ) {
664- wals = walsById .get (source .getQueueId ()).get (logPrefix );
665- if (wals == null ) {
666- return ;
667- }
668- walsToRemove = wals .headSet (log , inclusive );
669- if (walsToRemove .isEmpty ()) {
670- return ;
671- }
672- walsToRemove = new TreeSet <>(walsToRemove );
673- }
674- // cleanOldLogs may spend some time, especially for sync replication where we may want to
675- // remove remote wals as the remote cluster may have already been down, so we do it outside
676- // the lock to avoid block preLogRoll
677- cleanOldLogs (walsToRemove , source );
678- // now let's remove the files in the set
679- synchronized (this .walsById ) {
680- wals .removeAll (walsToRemove );
681- }
594+ void cleanOldLogs (String log , boolean inclusive ,
595+ ReplicationSourceInterface source ) {
596+ NavigableSet <String > walsToRemove ;
597+ synchronized (this .latestPaths ) {
598+ walsToRemove = getWalsToRemove (source .getQueueId (), log , inclusive );
682599 }
600+ if (walsToRemove .isEmpty ()) {
601+ return ;
602+ }
603+ // cleanOldLogs may spend some time, especially for sync replication where we may want to
604+ // remove remote wals as the remote cluster may have already been down, so we do it outside
605+ // the lock to avoid block preLogRoll
606+ cleanOldLogs (walsToRemove , source );
683607 }
684608
685609 private void removeRemoteWALs (String peerId , String remoteWALDir , Collection <String > wals )
@@ -760,37 +684,6 @@ public void preLogRoll(Path newLog) throws IOException {
760684 abortAndThrowIOExceptionWhenFail (
761685 () -> this .queueStorage .addWAL (server .getServerName (), source .getQueueId (), logName ));
762686 }
763-
764- // synchronized on walsById to avoid race with cleanOldLogs
765- synchronized (this .walsById ) {
766- // Update walsById map
767- for (Map .Entry <String , Map <String , NavigableSet <String >>> entry : this .walsById
768- .entrySet ()) {
769- String peerId = entry .getKey ();
770- Map <String , NavigableSet <String >> walsByPrefix = entry .getValue ();
771- boolean existingPrefix = false ;
772- for (Map .Entry <String , NavigableSet <String >> walsEntry : walsByPrefix .entrySet ()) {
773- SortedSet <String > wals = walsEntry .getValue ();
774- if (this .sources .isEmpty ()) {
775- // If there's no slaves, don't need to keep the old wals since
776- // we only consider the last one when a new slave comes in
777- wals .clear ();
778- }
779- if (logPrefix .equals (walsEntry .getKey ())) {
780- wals .add (logName );
781- existingPrefix = true ;
782- }
783- }
784- if (!existingPrefix ) {
785- // The new log belongs to a new group, add it into this peer
786- LOG .debug ("Start tracking logs for wal group {} for peer {}" , logPrefix , peerId );
787- NavigableSet <String > wals = new TreeSet <>();
788- wals .add (logName );
789- walsByPrefix .put (logPrefix , wals );
790- }
791- }
792- }
793-
794687 // Add to latestPaths
795688 latestPaths .put (logPrefix , newLog );
796689 }
@@ -962,18 +855,6 @@ public void run() {
962855 continue ;
963856 }
964857 }
965- // track sources in walsByIdRecoveredQueues
966- Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
967- walsByIdRecoveredQueues .put (queueId , walsByGroup );
968- for (String wal : walsSet ) {
969- String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
970- NavigableSet <String > wals = walsByGroup .get (walPrefix );
971- if (wals == null ) {
972- wals = new TreeSet <>();
973- walsByGroup .put (walPrefix , wals );
974- }
975- wals .add (wal );
976- }
977858 oldsources .add (src );
978859 LOG .trace ("Added source for recovered queue: " + src .getQueueId ());
979860 for (String wal : walsSet ) {
@@ -1005,7 +886,18 @@ public void join() {
1005886 * @return a sorted set of wal names
1006887 */
1007888 @ VisibleForTesting
1008- public Map <String , Map <String , NavigableSet <String >>> getWALs () {
889+ public Map <String , Map <String , NavigableSet <String >>> getWALs ()
890+ throws ReplicationException {
891+ Map <String , Map <String , NavigableSet <String >>> walsById = new HashMap <>();
892+ for (ReplicationSourceInterface source : sources .values ()) {
893+ String queueId = source .getQueueId ();
894+ Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
895+ walsById .put (queueId , walsByGroup );
896+ for (String wal : this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId )) {
897+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
898+ walsByGroup .computeIfAbsent (walPrefix , p -> new TreeSet <>()).add (wal );
899+ }
900+ }
1009901 return Collections .unmodifiableMap (walsById );
1010902 }
1011903
@@ -1014,7 +906,18 @@ public Map<String, Map<String, NavigableSet<String>>> getWALs() {
1014906 * @return a sorted set of wal names
1015907 */
1016908 @ VisibleForTesting
1017- Map <String , Map <String , NavigableSet <String >>> getWalsByIdRecoveredQueues () {
909+ Map <String , Map <String , NavigableSet <String >>> getWalsByIdRecoveredQueues ()
910+ throws ReplicationException {
911+ Map <String , Map <String , NavigableSet <String >>> walsByIdRecoveredQueues = new HashMap <>();
912+ for (ReplicationSourceInterface source : oldsources ) {
913+ String queueId = source .getQueueId ();
914+ Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
915+ walsByIdRecoveredQueues .put (queueId , walsByGroup );
916+ for (String wal : this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId )) {
917+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
918+ walsByGroup .computeIfAbsent (walPrefix , p -> new TreeSet <>()).add (wal );
919+ }
920+ }
1018921 return Collections .unmodifiableMap (walsByIdRecoveredQueues );
1019922 }
1020923
@@ -1177,4 +1080,21 @@ public void cleanUpHFileRefs(String peerId, List<String> files) {
11771080 int activeFailoverTaskCount () {
11781081 return executor .getActiveCount ();
11791082 }
1083+
1084+ private NavigableSet <String > getWalsToRemove (String queueId , String log , boolean inclusive ) {
1085+ NavigableSet <String > walsToRemove = new TreeSet <>();
1086+ String logPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (log );
1087+ try {
1088+ this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId ).forEach (wal -> {
1089+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
1090+ if (walPrefix .equals (logPrefix )) {
1091+ walsToRemove .add (wal );
1092+ }
1093+ });
1094+ } catch (ReplicationException e ) {
1095+ // Just log the exception here, as the recovered replication source will try to cleanup again.
1096+ LOG .warn ("Failed to read wals in queue {}" , queueId , e );
1097+ }
1098+ return walsToRemove .headSet (log , inclusive );
1099+ }
11801100}
0 commit comments