3030import org .apache .commons .io .FilenameUtils ;
3131import org .apache .commons .lang3 .StringUtils ;
3232import org .apache .hadoop .fs .FileSystem ;
33+ import org .apache .hadoop .fs .LocatedFileStatus ;
3334import org .apache .hadoop .fs .Path ;
35+ import org .apache .hadoop .fs .RemoteIterator ;
3436import org .apache .hadoop .hbase .TableName ;
3537import org .apache .hadoop .hbase .backup .BackupCopyJob ;
3638import org .apache .hadoop .hbase .backup .BackupInfo ;
4042import org .apache .hadoop .hbase .backup .BackupType ;
4143import org .apache .hadoop .hbase .backup .HBackupFileSystem ;
4244import org .apache .hadoop .hbase .backup .mapreduce .MapReduceBackupCopyJob ;
45+ import org .apache .hadoop .hbase .backup .mapreduce .MapReduceHFileSplitterJob ;
4346import org .apache .hadoop .hbase .backup .util .BackupUtils ;
4447import org .apache .hadoop .hbase .client .Admin ;
4548import org .apache .hadoop .hbase .client .ColumnFamilyDescriptor ;
4649import org .apache .hadoop .hbase .client .Connection ;
4750import org .apache .hadoop .hbase .mapreduce .HFileOutputFormat2 ;
51+ import org .apache .hadoop .hbase .io .hfile .HFile ;
4852import org .apache .hadoop .hbase .mapreduce .WALPlayer ;
4953import org .apache .hadoop .hbase .snapshot .SnapshotDescriptionUtils ;
5054import org .apache .hadoop .hbase .snapshot .SnapshotManifest ;
55+ import org .apache .hadoop .hbase .snapshot .SnapshotRegionLocator ;
5156import org .apache .hadoop .hbase .util .CommonFSUtils ;
5257import org .apache .hadoop .hbase .util .HFileArchiveUtil ;
5358import org .apache .hadoop .hbase .wal .AbstractFSWALProvider ;
@@ -166,54 +171,60 @@ protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws I
166171 LOG .debug ("copying archive {} to {}" , archive , tgt );
167172 archiveFiles .add (archive .toString ());
168173 }
174+ mergeSplitBulkloads (activeFiles , archiveFiles , srcTable );
175+ incrementalCopyBulkloadHFiles (tgtFs , srcTable );
169176 }
170-
171- copyBulkLoadedFiles (activeFiles , archiveFiles );
172177 return bulkLoads ;
173178 }
174179
175- private void copyBulkLoadedFiles (List <String > activeFiles , List <String > archiveFiles )
176- throws IOException {
177- try {
178- // Enable special mode of BackupDistCp
179- conf .setInt (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY , 5 );
180- // Copy active files
181- String tgtDest = backupInfo .getBackupRootDir () + Path .SEPARATOR + backupInfo .getBackupId ();
182- int attempt = 1 ;
183- while (activeFiles .size () > 0 ) {
184- LOG .info ("Copy " + activeFiles .size () + " active bulk loaded files. Attempt =" + attempt ++);
185- String [] toCopy = new String [activeFiles .size ()];
186- activeFiles .toArray (toCopy );
187- // Active file can be archived during copy operation,
188- // we need to handle this properly
189- try {
190- incrementalCopyHFiles (toCopy , tgtDest );
191- break ;
192- } catch (IOException e ) {
193- // Check if some files got archived
194- // Update active and archived lists
195- // When file is being moved from active to archive
196- // directory, the number of active files decreases
197- int numOfActive = activeFiles .size ();
198- updateFileLists (activeFiles , archiveFiles );
199- if (activeFiles .size () < numOfActive ) {
200- continue ;
201- }
202- // if not - throw exception
203- throw e ;
180+ private void mergeSplitBulkloads (List <String > activeFiles , List <String > archiveFiles ,
181+ TableName tn ) throws IOException {
182+ int attempt = 1 ;
183+
184+ while (!activeFiles .isEmpty ()) {
185+ LOG .info ("MergeSplit {} active bulk loaded files. Attempt={}" , activeFiles .size (), attempt ++);
186+ // Active file can be archived during copy operation,
187+ // we need to handle this properly
188+ try {
189+ mergeSplitBulkloads (activeFiles , tn );
190+ break ;
191+ } catch (IOException e ) {
192+ int numActiveFiles = activeFiles .size ();
193+ updateFileLists (activeFiles , archiveFiles );
194+ if (activeFiles .size () < numActiveFiles ) {
195+ continue ;
204196 }
197+
198+ throw e ;
205199 }
206- // If incremental copy will fail for archived files
207- // we will have partially loaded files in backup destination (only files from active data
208- // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up
209- if (archiveFiles .size () > 0 ) {
210- String [] toCopy = new String [archiveFiles .size ()];
211- archiveFiles .toArray (toCopy );
212- incrementalCopyHFiles (toCopy , tgtDest );
213- }
214- } finally {
215- // Disable special mode of BackupDistCp
216- conf .unset (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY );
200+ }
201+
202+ if (!archiveFiles .isEmpty ()) {
203+ mergeSplitBulkloads (archiveFiles , tn );
204+ }
205+ }
206+
207+ private void mergeSplitBulkloads (List <String > files , TableName tn ) throws IOException {
208+ MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob ();
209+ conf .set (MapReduceHFileSplitterJob .BULK_OUTPUT_CONF_KEY ,
210+ getBulkOutputDirForTable (tn ).toString ());
211+ player .setConf (conf );
212+
213+ String inputDirs = StringUtils .join (files , "," );
214+ String [] args = { inputDirs , tn .getNameWithNamespaceInclAsString () };
215+
216+ int result ;
217+
218+ try {
219+ result = player .run (args );
220+ } catch (Exception e ) {
221+ LOG .error ("Failed to run MapReduceHFileSplitterJob" , e );
222+ throw new IOException (e );
223+ }
224+
225+ if (result != 0 ) {
226+ throw new IOException (
227+ "Failed to run MapReduceHFileSplitterJob with invalid result: " + result );
217228 }
218229 }
219230
@@ -264,6 +275,7 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
264275 try {
265276 // copy out the table and region info files for each table
266277 BackupUtils .copyTableRegionInfo (conn , backupInfo , conf );
278+ setupRegionLocator ();
267279 // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
268280 convertWALsToHFiles ();
269281 incrementalCopyHFiles (new String [] { getBulkOutputDir ().toString () },
@@ -407,6 +419,29 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
407419 }
408420 }
409421
422+ private void incrementalCopyBulkloadHFiles (FileSystem tgtFs , TableName tn ) throws IOException {
423+ Path bulkOutDir = getBulkOutputDirForTable (tn );
424+ FileSystem fs = FileSystem .get (conf );
425+
426+ if (fs .exists (bulkOutDir )) {
427+ conf .setInt (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY , 2 );
428+ Path tgtPath = getTargetDirForTable (tn );
429+ try {
430+ RemoteIterator <LocatedFileStatus > locatedFiles = tgtFs .listFiles (bulkOutDir , true );
431+ List <String > files = new ArrayList <>();
432+ while (locatedFiles .hasNext ()) {
433+ LocatedFileStatus file = locatedFiles .next ();
434+ if (file .isFile () && HFile .isHFileFormat (tgtFs , file .getPath ())) {
435+ files .add (file .getPath ().toString ());
436+ }
437+ }
438+ incrementalCopyHFiles (files .toArray (files .toArray (new String [0 ])), tgtPath .toString ());
439+ } finally {
440+ conf .unset (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY );
441+ }
442+ }
443+ }
444+
410445 protected Path getBulkOutputDirForTable (TableName table ) {
411446 Path tablePath = getBulkOutputDir ();
412447 tablePath = new Path (tablePath , table .getNamespaceAsString ());
@@ -422,6 +457,30 @@ protected Path getBulkOutputDir() {
422457 return path ;
423458 }
424459
460+ private Path getTargetDirForTable (TableName table ) {
461+ Path path = new Path (backupInfo .getBackupRootDir () + Path .SEPARATOR + backupInfo .getBackupId ());
462+ path = new Path (path , table .getNamespaceAsString ());
463+ path = new Path (path , table .getNameAsString ());
464+ return path ;
465+ }
466+
467+ private void setupRegionLocator () throws IOException {
468+ Map <TableName , String > fullBackupIds = getFullBackupIds ();
469+ try (BackupAdminImpl backupAdmin = new BackupAdminImpl (conn )) {
470+
471+ for (TableName tableName : backupInfo .getTables ()) {
472+ String fullBackupId = fullBackupIds .get (tableName );
473+ BackupInfo fullBackupInfo = backupAdmin .getBackupInfo (fullBackupId );
474+ String snapshotName = fullBackupInfo .getSnapshotName (tableName );
475+ Path root = HBackupFileSystem .getTableBackupPath (tableName ,
476+ new Path (fullBackupInfo .getBackupRootDir ()), fullBackupId );
477+ String manifestDir =
478+ SnapshotDescriptionUtils .getCompletedSnapshotDir (snapshotName , root ).toString ();
479+ SnapshotRegionLocator .setSnapshotManifestDir (conf , manifestDir , tableName );
480+ }
481+ }
482+ }
483+
425484 private Map <TableName , String > getFullBackupIds () throws IOException {
426485 // Ancestors are stored from newest to oldest, so we can iterate backwards
427486 // in order to populate our backupId map with the most recent full backup
0 commit comments