2323import java .util .ArrayList ;
2424import java .util .Collection ;
2525import java .util .Collections ;
26- import java .util .HashMap ;
2726import java .util .List ;
28- import java .util .Map ;
2927import java .util .concurrent .ExecutionException ;
3028import java .util .concurrent .Future ;
3129import java .util .concurrent .ThreadFactory ;
@@ -99,7 +97,7 @@ public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info)
9997 public static void archiveRegion (Configuration conf , FileSystem fs , RegionInfo info )
10098 throws IOException {
10199 Path rootDir = CommonFSUtils .getRootDir (conf );
102- archiveRegion (conf , fs , rootDir , CommonFSUtils .getTableDir (rootDir , info .getTable ()),
100+ archiveRegion (fs , rootDir , CommonFSUtils .getTableDir (rootDir , info .getTable ()),
103101 FSUtils .getRegionDirFromRootDir (rootDir , info ));
104102 }
105103
@@ -115,8 +113,8 @@ public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo i
115113 * operations could not complete.
116114 * @throws IOException if the request cannot be completed
117115 */
118- public static boolean archiveRegion (final Configuration conf , FileSystem fs , Path rootdir ,
119- Path tableDir , Path regionDir ) throws IOException {
116+ public static boolean archiveRegion (FileSystem fs , Path rootdir , Path tableDir , Path regionDir )
117+ throws IOException {
120118 // otherwise, we archive the files
121119 // make sure we can archive
122120 if (tableDir == null || regionDir == null ) {
@@ -159,8 +157,8 @@ public boolean accept(Path file) {
159157 // convert the files in the region to a File
160158 Stream .of (storeDirs ).map (getAsFile ).forEachOrdered (toArchive ::add );
161159 LOG .debug ("Archiving " + toArchive );
162- List <File > failedArchive = resolveAndArchive ( conf , fs , regionArchiveDir , toArchive ,
163- EnvironmentEdgeManager .currentTime ());
160+ List <File > failedArchive =
161+ resolveAndArchive ( fs , regionArchiveDir , toArchive , EnvironmentEdgeManager .currentTime ());
164162 if (!failedArchive .isEmpty ()) {
165163 throw new FailedArchiveException (
166164 "Failed to archive/delete all the files for region:" + regionDir .getName () + " into "
@@ -188,7 +186,7 @@ public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDi
188186 List <Future <Void >> futures = new ArrayList <>(regionDirList .size ());
189187 for (Path regionDir : regionDirList ) {
190188 Future <Void > future = getArchiveExecutor (conf ).submit (() -> {
191- archiveRegion (conf , fs , rootDir , tableDir , regionDir );
189+ archiveRegion (fs , rootDir , tableDir , regionDir );
192190 return null ;
193191 });
194192 futures .add (future );
@@ -260,8 +258,8 @@ public static void archiveFamily(FileSystem fs, Configuration conf, RegionInfo p
260258 * @param family the family hosting the store files
261259 * @throws IOException if the files could not be correctly disposed.
262260 */
263- public static void archiveFamilyByFamilyDir (FileSystem fs , final Configuration conf ,
264- RegionInfo parent , Path familyDir , byte [] family ) throws IOException {
261+ public static void archiveFamilyByFamilyDir (FileSystem fs , Configuration conf , RegionInfo parent ,
262+ Path familyDir , byte [] family ) throws IOException {
265263 FileStatus [] storeFiles = CommonFSUtils .listStatus (fs , familyDir );
266264 if (storeFiles == null ) {
267265 LOG .debug ("No files to dispose of in {}, family={}" , parent .getRegionNameAsString (),
@@ -275,7 +273,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, final Configuration c
275273
276274 // do the actual archive
277275 List <File > failedArchive =
278- resolveAndArchive (conf , fs , storeArchiveDir , toArchive , EnvironmentEdgeManager .currentTime ());
276+ resolveAndArchive (fs , storeArchiveDir , toArchive , EnvironmentEdgeManager .currentTime ());
279277 if (!failedArchive .isEmpty ()) {
280278 throw new FailedArchiveException (
281279 "Failed to archive/delete all the files for region:"
@@ -295,11 +293,10 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, final Configuration c
295293 * attempted; otherwise likely to cause an {@link IOException}
296294 * @throws IOException if the files could not be correctly disposed.
297295 */
298- public static void archiveStoreFiles (final Configuration conf , FileSystem fs ,
299- RegionInfo regionInfo , Path tableDir , byte [] family , Collection <HStoreFile > compactedFiles )
300- throws IOException {
296+ public static void archiveStoreFiles (Configuration conf , FileSystem fs , RegionInfo regionInfo ,
297+ Path tableDir , byte [] family , Collection <HStoreFile > compactedFiles ) throws IOException {
301298 Path storeArchiveDir = HFileArchiveUtil .getStoreArchivePath (conf , regionInfo , tableDir , family );
302- archive (conf , fs , regionInfo , family , compactedFiles , storeArchiveDir );
299+ archive (fs , regionInfo , family , compactedFiles , storeArchiveDir );
303300 }
304301
305302 /**
@@ -330,11 +327,11 @@ public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, Regi
330327 "Wrong file system! Should be " + path .toUri ().getScheme () + ", but got " + fs .getScheme ());
331328 }
332329 path = HFileArchiveUtil .getStoreArchivePathForRootDir (path , regionInfo , family );
333- archive (conf , fs , regionInfo , family , replayedEdits , path );
330+ archive (fs , regionInfo , family , replayedEdits , path );
334331 }
335332
336- private static void archive (final Configuration conf , FileSystem fs , RegionInfo regionInfo ,
337- byte [] family , Collection <HStoreFile > compactedFiles , Path storeArchiveDir ) throws IOException {
333+ private static void archive (FileSystem fs , RegionInfo regionInfo , byte [] family ,
334+ Collection <HStoreFile > compactedFiles , Path storeArchiveDir ) throws IOException {
338335 // sometimes in testing, we don't have rss, so we need to check for that
339336 if (fs == null ) {
340337 LOG .warn (
@@ -368,8 +365,8 @@ private static void archive(final Configuration conf, FileSystem fs, RegionInfo
368365 compactedFiles .stream ().map (getStorePath ).collect (Collectors .toList ());
369366
370367 // do the actual archive
371- List <File > failedArchive = resolveAndArchive ( conf , fs , storeArchiveDir , storeFiles ,
372- EnvironmentEdgeManager .currentTime ());
368+ List <File > failedArchive =
369+ resolveAndArchive ( fs , storeArchiveDir , storeFiles , EnvironmentEdgeManager .currentTime ());
373370
374371 if (!failedArchive .isEmpty ()) {
375372 throw new FailedArchiveException (
@@ -422,8 +419,8 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf
422419 * @return the list of failed to archive files.
423420 * @throws IOException if an unexpected file operation exception occurred
424421 */
425- private static List <File > resolveAndArchive (final Configuration conf , FileSystem fs ,
426- Path baseArchiveDir , Collection <File > toArchive , long start ) throws IOException {
422+ private static List <File > resolveAndArchive (FileSystem fs , Path baseArchiveDir ,
423+ Collection <File > toArchive , long start ) throws IOException {
427424 // short circuit if no files to move
428425 if (toArchive .isEmpty ()) {
429426 return Collections .emptyList ();
@@ -440,54 +437,33 @@ private static List<File> resolveAndArchive(final Configuration conf, FileSystem
440437 LOG .trace ("Created archive directory {}" , baseArchiveDir );
441438 }
442439
443- List <File > failures = Collections . synchronizedList ( new ArrayList <>() );
440+ List <File > failures = new ArrayList <>();
444441 String startTime = Long .toString (start );
445- List <File > filesOnly = new ArrayList <>();
446442 for (File file : toArchive ) {
443+ // if its a file archive it
447444 try {
448- if (!file .isFile ()) {
449- // if its a directory and we need to archive all files
445+ LOG .trace ("Archiving {}" , file );
446+ if (file .isFile ()) {
447+ // attempt to archive the file
448+ if (!resolveAndArchiveFile (baseArchiveDir , file , startTime )) {
449+ LOG .warn ("Couldn't archive " + file + " into backup directory: " + baseArchiveDir );
450+ failures .add (file );
451+ }
452+ } else {
453+ // otherwise its a directory and we need to archive all files
450454 LOG .trace ("{} is a directory, archiving children files" , file );
451455 // so we add the directory name to the one base archive
452456 Path parentArchiveDir = new Path (baseArchiveDir , file .getName ());
453457 // and then get all the files from that directory and attempt to
454458 // archive those too
455459 Collection <File > children = file .getChildren ();
456- failures .addAll (resolveAndArchive (conf , fs , parentArchiveDir , children , start ));
457- } else {
458- filesOnly .add (file );
460+ failures .addAll (resolveAndArchive (fs , parentArchiveDir , children , start ));
459461 }
460462 } catch (IOException e ) {
461463 LOG .warn ("Failed to archive {}" , file , e );
462464 failures .add (file );
463465 }
464466 }
465- Map <File , Future <Boolean >> futures = new HashMap <>();
466- // In current baseDir all files will be processed concurrently
467- for (File file : filesOnly ) {
468- LOG .trace ("Archiving {}" , file );
469- Future <Boolean > archiveTask = getArchiveExecutor (conf )
470- .submit (() -> resolveAndArchiveFile (baseArchiveDir , file , startTime ));
471- futures .put (file , archiveTask );
472- }
473-
474- for (Map .Entry <File , Future <Boolean >> fileFutureEntry : futures .entrySet ()) {
475- try {
476- boolean fileCleaned = fileFutureEntry .getValue ().get ();
477- if (!fileCleaned ) {
478- LOG .warn ("Couldn't archive {} into backup directory: {}" , fileFutureEntry .getKey (),
479- baseArchiveDir );
480- failures .add (fileFutureEntry .getKey ());
481- }
482- } catch (InterruptedException e ) {
483- LOG .warn ("HFileArchive Cleanup thread was interrupted" );
484- failures .add (fileFutureEntry .getKey ());
485- } catch (ExecutionException e ) {
486- // this is IOException
487- LOG .warn ("Failed to archive {}" , fileFutureEntry .getKey (), e );
488- failures .add (fileFutureEntry .getKey ());
489- }
490- }
491467 return failures ;
492468 }
493469
0 commit comments