@@ -81,39 +81,64 @@ public void init(Map<String, Object> params) {
8181 }
8282 }
8383
84- private Map <Address , Long > getServerToNewestBackupTs (List <BackupInfo > backups )
84+ /**
85+ * Calculates the timestamp boundary up to which all backup roots have already included the WAL.
86+ * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental
87+ * backups.
88+ */
89+ private Map <Address , Long > serverToPreservationBoundaryTs (List <BackupInfo > backups )
8590 throws IOException {
8691 if (LOG .isDebugEnabled ()) {
8792 LOG .debug (
88- "Cleaning WALs if they are older than the newest backups . "
93+ "Cleaning WALs if they are older than the WAL cleanup time-boundary . "
8994 + "Checking WALs against {} backups: {}" ,
9095 backups .size (),
9196 backups .stream ().map (BackupInfo ::getBackupId ).sorted ().collect (Collectors .joining (", " )));
9297 }
93- Map <Address , Long > serverAddressToNewestBackupMap = new HashMap <>();
94-
95- Map <TableName , Long > tableNameBackupInfoMap = new HashMap <>();
96- for (BackupInfo backupInfo : backups ) {
97- for (TableName table : backupInfo .getTables ()) {
98- tableNameBackupInfoMap .putIfAbsent (table , backupInfo .getStartTs ());
99- if (tableNameBackupInfoMap .get (table ) <= backupInfo .getStartTs ()) {
100- tableNameBackupInfoMap .put (table , backupInfo .getStartTs ());
101- for (Map .Entry <String , Long > entry : backupInfo .getTableSetTimestampMap ().get (table )
102- .entrySet ()) {
103- serverAddressToNewestBackupMap .put (Address .fromString (entry .getKey ()),
104- entry .getValue ());
98+
99+ // This map tracks, for every backup root, the most recent created backup (= highest timestamp)
100+ Map <String , BackupInfo > newestBackupPerRootDir = new HashMap <>();
101+ for (BackupInfo backup : backups ) {
102+ BackupInfo existingEntry = newestBackupPerRootDir .get (backup .getBackupRootDir ());
103+ if (existingEntry == null || existingEntry .getStartTs () < backup .getStartTs ()) {
104+ newestBackupPerRootDir .put (backup .getBackupRootDir (), backup );
105+ }
106+ }
107+
108+ if (LOG .isDebugEnabled ()) {
109+ LOG .debug ("WAL cleanup time-boundary using info from: {}. " ,
110+ newestBackupPerRootDir .entrySet ().stream ()
111+ .map (e -> "Backup root " + e .getKey () + ": " + e .getValue ().getBackupId ()).sorted ()
112+ .collect (Collectors .joining (", " )));
113+ }
114+
115+ // This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp)
116+ // inclusion in any backup. In other words, it is the timestamp boundary up to which all backup
117+ // roots have included the WAL in their backup.
118+ Map <Address , Long > boundaries = new HashMap <>();
119+ for (BackupInfo backupInfo : newestBackupPerRootDir .values ()) {
120+ // Iterate over all tables in the timestamp map, which contains all tables covered in the
121+ // backup root, not just the tables included in that specific backup (which could be a subset)
122+ for (TableName table : backupInfo .getTableSetTimestampMap ().keySet ()) {
123+ for (Map .Entry <String , Long > entry : backupInfo .getTableSetTimestampMap ().get (table )
124+ .entrySet ()) {
125+ Address address = Address .fromString (entry .getKey ());
126+ Long storedTs = boundaries .get (address );
127+ if (storedTs == null || entry .getValue () < storedTs ) {
128+ boundaries .put (address , entry .getValue ());
105129 }
106130 }
107131 }
108132 }
109133
110134 if (LOG .isDebugEnabled ()) {
111- for (Map .Entry <Address , Long > entry : serverAddressToNewestBackupMap .entrySet ()) {
112- LOG .debug ("Server: {}, Newest Backup: {}" , entry .getKey ().getHostName (), entry .getValue ());
135+ for (Map .Entry <Address , Long > entry : boundaries .entrySet ()) {
136+ LOG .debug ("Server: {}, WAL cleanup boundary: {}" , entry .getKey ().getHostName (),
137+ entry .getValue ());
113138 }
114139 }
115140
116- return serverAddressToNewestBackupMap ;
141+ return boundaries ;
117142 }
118143
119144 @ Override
@@ -128,18 +153,19 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
128153 return files ;
129154 }
130155
131- Map <Address , Long > addressToNewestBackupMap ;
156+ Map <Address , Long > serverToPreservationBoundaryTs ;
132157 try {
133158 try (BackupManager backupManager = new BackupManager (conn , getConf ())) {
134- addressToNewestBackupMap = getServerToNewestBackupTs (backupManager .getBackupHistory (true ));
159+ serverToPreservationBoundaryTs =
160+ serverToPreservationBoundaryTs (backupManager .getBackupHistory (true ));
135161 }
136162 } catch (IOException ex ) {
137163 LOG .error ("Failed to analyse backup history with exception: {}. Retaining all logs" ,
138164 ex .getMessage (), ex );
139165 return Collections .emptyList ();
140166 }
141167 for (FileStatus file : files ) {
142- if (canDeleteFile (addressToNewestBackupMap , file .getPath ())) {
168+ if (canDeleteFile (serverToPreservationBoundaryTs , file .getPath ())) {
143169 filteredFiles .add (file );
144170 }
145171 }
@@ -174,7 +200,7 @@ public boolean isStopped() {
174200 return this .stopped ;
175201 }
176202
177- protected static boolean canDeleteFile (Map <Address , Long > addressToNewestBackupMap , Path path ) {
203+ protected static boolean canDeleteFile (Map <Address , Long > addressToBoundaryTs , Path path ) {
178204 if (isHMasterWAL (path )) {
179205 return true ;
180206 }
@@ -190,28 +216,27 @@ protected static boolean canDeleteFile(Map<Address, Long> addressToNewestBackupM
190216 Address walServerAddress = Address .fromString (hostname );
191217 long walTimestamp = WAL .getTimestamp (path .getName ());
192218
193- if (!addressToNewestBackupMap .containsKey (walServerAddress )) {
219+ if (!addressToBoundaryTs .containsKey (walServerAddress )) {
194220 if (LOG .isDebugEnabled ()) {
195- LOG .debug ("No backup found for server: {}. Deleting file: {}" ,
221+ LOG .debug ("No cleanup WAL time-boundary found for server: {}. Ok to delete file: {}" ,
196222 walServerAddress .getHostName (), path );
197223 }
198224 return true ;
199225 }
200226
201- Long lastBackupTs = addressToNewestBackupMap .get (walServerAddress );
202- if (lastBackupTs >= walTimestamp ) {
227+ Long backupBoundary = addressToBoundaryTs .get (walServerAddress );
228+ if (backupBoundary >= walTimestamp ) {
203229 if (LOG .isDebugEnabled ()) {
204230 LOG .debug (
205- "Backup found for server: {}. Backup from {} is newer than file, so deleting : {}" ,
206- walServerAddress .getHostName (), lastBackupTs , path );
231+ "WAL cleanup time-boundary found for server {}: {}. Ok to delete older file : {}" ,
232+ walServerAddress .getHostName (), backupBoundary , path );
207233 }
208234 return true ;
209235 }
210236
211237 if (LOG .isDebugEnabled ()) {
212- LOG .debug (
213- "Backup found for server: {}. Backup from {} is older than the file, so keeping: {}" ,
214- walServerAddress .getHostName (), lastBackupTs , path );
238+ LOG .debug ("WAL cleanup time-boundary found for server {}: {}. Keeping younger file: {}" ,
239+ walServerAddress .getHostName (), backupBoundary , path );
215240 }
216241 } catch (Exception ex ) {
217242 LOG .warn ("Error occurred while filtering file: {}. Ignoring cleanup of this log" , path , ex );
0 commit comments