2323import java .util .ArrayList ;
2424import java .util .Collection ;
2525import java .util .Collections ;
26+ import java .util .HashMap ;
2627import java .util .List ;
28+ import java .util .Map ;
2729import java .util .concurrent .ExecutionException ;
2830import java .util .concurrent .Future ;
2931import java .util .concurrent .ThreadFactory ;
@@ -97,7 +99,7 @@ public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info)
9799 public static void archiveRegion (Configuration conf , FileSystem fs , RegionInfo info )
98100 throws IOException {
99101 Path rootDir = CommonFSUtils .getRootDir (conf );
100- archiveRegion (fs , rootDir , CommonFSUtils .getTableDir (rootDir , info .getTable ()),
102+ archiveRegion (conf , fs , rootDir , CommonFSUtils .getTableDir (rootDir , info .getTable ()),
101103 FSUtils .getRegionDirFromRootDir (rootDir , info ));
102104 }
103105
@@ -113,8 +115,8 @@ public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo i
113115 * operations could not complete.
114116 * @throws IOException if the request cannot be completed
115117 */
116- public static boolean archiveRegion (FileSystem fs , Path rootdir , Path tableDir , Path regionDir )
117- throws IOException {
118+ public static boolean archiveRegion (final Configuration conf , FileSystem fs , Path rootdir ,
119+ Path tableDir , Path regionDir ) throws IOException {
118120 // otherwise, we archive the files
119121 // make sure we can archive
120122 if (tableDir == null || regionDir == null ) {
@@ -157,8 +159,8 @@ public boolean accept(Path file) {
157159 // convert the files in the region to a File
158160 Stream .of (storeDirs ).map (getAsFile ).forEachOrdered (toArchive ::add );
159161 LOG .debug ("Archiving " + toArchive );
160- List <File > failedArchive =
161- resolveAndArchive ( fs , regionArchiveDir , toArchive , EnvironmentEdgeManager .currentTime ());
162+ List <File > failedArchive = resolveAndArchive ( conf , fs , regionArchiveDir , toArchive ,
163+ EnvironmentEdgeManager .currentTime ());
162164 if (!failedArchive .isEmpty ()) {
163165 throw new FailedArchiveException (
164166 "Failed to archive/delete all the files for region:" + regionDir .getName () + " into "
@@ -186,7 +188,7 @@ public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDi
186188 List <Future <Void >> futures = new ArrayList <>(regionDirList .size ());
187189 for (Path regionDir : regionDirList ) {
188190 Future <Void > future = getArchiveExecutor (conf ).submit (() -> {
189- archiveRegion (fs , rootDir , tableDir , regionDir );
191+ archiveRegion (conf , fs , rootDir , tableDir , regionDir );
190192 return null ;
191193 });
192194 futures .add (future );
@@ -258,8 +260,8 @@ public static void archiveFamily(FileSystem fs, Configuration conf, RegionInfo p
258260 * @param family the family hosting the store files
259261 * @throws IOException if the files could not be correctly disposed.
260262 */
261- public static void archiveFamilyByFamilyDir (FileSystem fs , Configuration conf , RegionInfo parent ,
262- Path familyDir , byte [] family ) throws IOException {
263+ public static void archiveFamilyByFamilyDir (FileSystem fs , final Configuration conf ,
264+ RegionInfo parent , Path familyDir , byte [] family ) throws IOException {
263265 FileStatus [] storeFiles = CommonFSUtils .listStatus (fs , familyDir );
264266 if (storeFiles == null ) {
265267 LOG .debug ("No files to dispose of in {}, family={}" , parent .getRegionNameAsString (),
@@ -273,7 +275,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R
273275
274276 // do the actual archive
275277 List <File > failedArchive =
276- resolveAndArchive (fs , storeArchiveDir , toArchive , EnvironmentEdgeManager .currentTime ());
278+ resolveAndArchive (conf , fs , storeArchiveDir , toArchive , EnvironmentEdgeManager .currentTime ());
277279 if (!failedArchive .isEmpty ()) {
278280 throw new FailedArchiveException (
279281 "Failed to archive/delete all the files for region:"
@@ -293,10 +295,11 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R
293295 * attempted; otherwise likely to cause an {@link IOException}
294296 * @throws IOException if the files could not be correctly disposed.
295297 */
296- public static void archiveStoreFiles (Configuration conf , FileSystem fs , RegionInfo regionInfo ,
297- Path tableDir , byte [] family , Collection <HStoreFile > compactedFiles ) throws IOException {
298+ public static void archiveStoreFiles (final Configuration conf , FileSystem fs ,
299+ RegionInfo regionInfo , Path tableDir , byte [] family , Collection <HStoreFile > compactedFiles )
300+ throws IOException {
298301 Path storeArchiveDir = HFileArchiveUtil .getStoreArchivePath (conf , regionInfo , tableDir , family );
299- archive (fs , regionInfo , family , compactedFiles , storeArchiveDir );
302+ archive (conf , fs , regionInfo , family , compactedFiles , storeArchiveDir );
300303 }
301304
302305 /**
@@ -327,11 +330,11 @@ public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, Regi
327330 "Wrong file system! Should be " + path .toUri ().getScheme () + ", but got " + fs .getScheme ());
328331 }
329332 path = HFileArchiveUtil .getStoreArchivePathForRootDir (path , regionInfo , family );
330- archive (fs , regionInfo , family , replayedEdits , path );
333+ archive (conf , fs , regionInfo , family , replayedEdits , path );
331334 }
332335
333- private static void archive (FileSystem fs , RegionInfo regionInfo , byte [] family ,
334- Collection <HStoreFile > compactedFiles , Path storeArchiveDir ) throws IOException {
336+ private static void archive (final Configuration conf , FileSystem fs , RegionInfo regionInfo ,
337+ byte [] family , Collection <HStoreFile > compactedFiles , Path storeArchiveDir ) throws IOException {
335338 // sometimes in testing, we don't have rss, so we need to check for that
336339 if (fs == null ) {
337340 LOG .warn (
@@ -365,8 +368,8 @@ private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family,
365368 compactedFiles .stream ().map (getStorePath ).collect (Collectors .toList ());
366369
367370 // do the actual archive
368- List <File > failedArchive =
369- resolveAndArchive ( fs , storeArchiveDir , storeFiles , EnvironmentEdgeManager .currentTime ());
371+ List <File > failedArchive = resolveAndArchive ( conf , fs , storeArchiveDir , storeFiles ,
372+ EnvironmentEdgeManager .currentTime ());
370373
371374 if (!failedArchive .isEmpty ()) {
372375 throw new FailedArchiveException (
@@ -419,8 +422,8 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf
419422 * @return the list of failed to archive files.
420423 * @throws IOException if an unexpected file operation exception occurred
421424 */
422- private static List <File > resolveAndArchive (FileSystem fs , Path baseArchiveDir ,
423- Collection <File > toArchive , long start ) throws IOException {
425+ private static List <File > resolveAndArchive (final Configuration conf , FileSystem fs ,
426+ Path baseArchiveDir , Collection <File > toArchive , long start ) throws IOException {
424427 // short circuit if no files to move
425428 if (toArchive .isEmpty ()) {
426429 return Collections .emptyList ();
@@ -437,33 +440,54 @@ private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
437440 LOG .trace ("Created archive directory {}" , baseArchiveDir );
438441 }
439442
440- List <File > failures = new ArrayList <>();
443+ List <File > failures = Collections . synchronizedList ( new ArrayList <>() );
441444 String startTime = Long .toString (start );
445+ List <File > filesOnly = new ArrayList <>();
442446 for (File file : toArchive ) {
443- // if its a file archive it
444447 try {
445- LOG .trace ("Archiving {}" , file );
446- if (file .isFile ()) {
447- // attempt to archive the file
448- if (!resolveAndArchiveFile (baseArchiveDir , file , startTime )) {
449- LOG .warn ("Couldn't archive " + file + " into backup directory: " + baseArchiveDir );
450- failures .add (file );
451- }
452- } else {
453- // otherwise its a directory and we need to archive all files
448+ if (!file .isFile ()) {
449+ // if its a directory and we need to archive all files
454450 LOG .trace ("{} is a directory, archiving children files" , file );
455451 // so we add the directory name to the one base archive
456452 Path parentArchiveDir = new Path (baseArchiveDir , file .getName ());
457453 // and then get all the files from that directory and attempt to
458454 // archive those too
459455 Collection <File > children = file .getChildren ();
460- failures .addAll (resolveAndArchive (fs , parentArchiveDir , children , start ));
456+ failures .addAll (resolveAndArchive (conf , fs , parentArchiveDir , children , start ));
457+ } else {
458+ filesOnly .add (file );
461459 }
462460 } catch (IOException e ) {
463461 LOG .warn ("Failed to archive {}" , file , e );
464462 failures .add (file );
465463 }
466464 }
465+ Map <File , Future <Boolean >> futures = new HashMap <>();
466+ // In current baseDir all files will be processed concurrently
467+ for (File file : filesOnly ) {
468+ LOG .trace ("Archiving {}" , file );
469+ Future <Boolean > archiveTask = getArchiveExecutor (conf )
470+ .submit (() -> resolveAndArchiveFile (baseArchiveDir , file , startTime ));
471+ futures .put (file , archiveTask );
472+ }
473+
474+ for (Map .Entry <File , Future <Boolean >> fileFutureEntry : futures .entrySet ()) {
475+ try {
476+ boolean fileCleaned = fileFutureEntry .getValue ().get ();
477+ if (!fileCleaned ) {
478+ LOG .warn ("Couldn't archive {} into backup directory: {}" , fileFutureEntry .getKey (),
479+ baseArchiveDir );
480+ failures .add (fileFutureEntry .getKey ());
481+ }
482+ } catch (InterruptedException e ) {
483+ LOG .warn ("HFileArchive Cleanup thread was interrupted" );
484+ failures .add (fileFutureEntry .getKey ());
485+ } catch (ExecutionException e ) {
486+ // this is IOException
487+ LOG .warn ("Failed to archive {}" , fileFutureEntry .getKey (), e );
488+ failures .add (fileFutureEntry .getKey ());
489+ }
490+ }
467491 return failures ;
468492 }
469493
0 commit comments