diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java index 40bbb4bc7fea..d5126a187eb8 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreToOriginalSplitsJob; import org.apache.hadoop.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -43,8 +44,13 @@ private BackupRestoreFactory() { * @return backup restore job instance */ public static RestoreJob getRestoreJob(Configuration conf) { + Class defaultCls = + conf.getBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY, RestoreJob.KEEP_ORIGINAL_SPLITS_DEFAULT) + ? MapReduceRestoreToOriginalSplitsJob.class + : MapReduceRestoreJob.class; + Class cls = - conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, MapReduceRestoreJob.class, RestoreJob.class); + conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, defaultCls, RestoreJob.class); RestoreJob service = ReflectionUtils.newInstance(cls, conf); service.setConf(conf); return service; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java index 831e097cb923..207684e7588d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java @@ -30,6 +30,10 @@ @InterfaceAudience.Private public interface RestoreJob extends Configurable { + + String KEEP_ORIGINAL_SPLITS_KEY = "hbase.backup.restorejob.keep.original.splits"; + boolean KEEP_ORIGINAL_SPLITS_DEFAULT = false; + /** * Run restore operation * @param dirPaths path array of WAL log directories diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java index f7f1d848d958..6e52d312ab7b 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java @@ -67,6 +67,11 @@ public Builder withOvewrite(boolean overwrite) { return this; } + public Builder withKeepOriginalSplits(boolean keepOriginalSplits) { + request.setKeepOriginalSplits(keepOriginalSplits); + return this; + } + public RestoreRequest build() { return request; } @@ -80,6 +85,8 @@ public RestoreRequest build() { private TableName[] toTables; private boolean overwrite = false; + private boolean keepOriginalSplits = false; + private RestoreRequest() { } @@ -145,4 +152,13 @@ private RestoreRequest setOverwrite(boolean overwrite) { this.overwrite = overwrite; return this; } + + public boolean isKeepOriginalSplits() { + return keepOriginalSplits; + } + + private RestoreRequest setKeepOriginalSplits(boolean keepOriginalSplits) { + this.keepOriginalSplits = keepOriginalSplits; + return this; + } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 8376a5f29279..43db0491484b 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -30,7 +30,9 @@ import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyJob; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -40,13 +42,16 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -165,54 +170,60 @@ protected List handleBulkLoad(List tablesToBackup) throws I LOG.debug("copying archive {} to {}", archive, tgt); archiveFiles.add(archive.toString()); } + mergeSplitBulkloads(activeFiles, archiveFiles, srcTable); + incrementalCopyBulkloadHFiles(tgtFs, srcTable); } - - copyBulkLoadedFiles(activeFiles, archiveFiles); return bulkLoads; } - private void copyBulkLoadedFiles(List activeFiles, List archiveFiles) - throws IOException { - try { - // Enable special mode of BackupDistCp - conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5); - // Copy active files - String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId(); - int attempt = 1; - while (activeFiles.size() > 0) { - LOG.info("Copy " + activeFiles.size() + " active bulk loaded files. Attempt =" + attempt++); - String[] toCopy = new String[activeFiles.size()]; - activeFiles.toArray(toCopy); - // Active file can be archived during copy operation, - // we need to handle this properly - try { - incrementalCopyHFiles(toCopy, tgtDest); - break; - } catch (IOException e) { - // Check if some files got archived - // Update active and archived lists - // When file is being moved from active to archive - // directory, the number of active files decreases - int numOfActive = activeFiles.size(); - updateFileLists(activeFiles, archiveFiles); - if (activeFiles.size() < numOfActive) { - continue; - } - // if not - throw exception - throw e; + private void mergeSplitBulkloads(List activeFiles, List archiveFiles, + TableName tn) throws IOException { + int attempt = 1; + + while (!activeFiles.isEmpty()) { + LOG.info("MergeSplit {} active bulk loaded files. Attempt={}", activeFiles.size(), attempt++); + // Active file can be archived during copy operation, + // we need to handle this properly + try { + mergeSplitBulkloads(activeFiles, tn); + break; + } catch (IOException e) { + int numActiveFiles = activeFiles.size(); + updateFileLists(activeFiles, archiveFiles); + if (activeFiles.size() < numActiveFiles) { + continue; } + + throw e; } - // If incremental copy will fail for archived files - // we will have partially loaded files in backup destination (only files from active data - // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up - if (archiveFiles.size() > 0) { - String[] toCopy = new String[archiveFiles.size()]; - archiveFiles.toArray(toCopy); - incrementalCopyHFiles(toCopy, tgtDest); - } - } finally { - // Disable special mode of BackupDistCp - conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); + } + + if (!archiveFiles.isEmpty()) { + mergeSplitBulkloads(archiveFiles, tn); + } + } + + private void mergeSplitBulkloads(List files, TableName tn) throws IOException { + MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob(); + conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY, + getBulkOutputDirForTable(tn).toString()); + player.setConf(conf); + + String inputDirs = StringUtils.join(files, ","); + String[] args = { inputDirs, tn.getNameWithNamespaceInclAsString() }; + + int result; + + try { + result = player.run(args); + } catch (Exception e) { + LOG.error("Failed to run MapReduceHFileSplitterJob", e); + throw new IOException(e); + } + + if (result != 0) { + throw new IOException( + "Failed to run MapReduceHFileSplitterJob with invalid result: " + result); } } @@ -263,6 +274,7 @@ public void execute() throws IOException, ColumnFamilyMismatchException { try { // copy out the table and region info files for each table BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); + setupRegionLocator(); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT convertWALsToHFiles(); incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, @@ -405,6 +417,29 @@ protected void walToHFiles(List dirPaths, List tableList) throws } } + private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn) throws IOException { + Path bulkOutDir = getBulkOutputDirForTable(tn); + FileSystem fs = FileSystem.get(conf); + + if (fs.exists(bulkOutDir)) { + conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2); + Path tgtPath = getTargetDirForTable(tn); + try { + RemoteIterator locatedFiles = tgtFs.listFiles(bulkOutDir, true); + List files = new ArrayList<>(); + while (locatedFiles.hasNext()) { + LocatedFileStatus file = locatedFiles.next(); + if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) { + files.add(file.getPath().toString()); + } + } + incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), tgtPath.toString()); + } finally { + conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); + } + } + } + protected Path getBulkOutputDirForTable(TableName table) { Path tablePath = getBulkOutputDir(); tablePath = new Path(tablePath, table.getNamespaceAsString()); @@ -420,6 +455,30 @@ protected Path getBulkOutputDir() { return path; } + private Path getTargetDirForTable(TableName table) { + Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId()); + path = new Path(path, table.getNamespaceAsString()); + path = new Path(path, table.getNameAsString()); + return path; + } + + private void setupRegionLocator() throws IOException { + Map fullBackupIds = getFullBackupIds(); + try (BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) { + + for (TableName tableName : backupInfo.getTables()) { + String fullBackupId = fullBackupIds.get(tableName); + BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(fullBackupId); + String snapshotName = fullBackupInfo.getSnapshotName(tableName); + Path root = HBackupFileSystem.getTableBackupPath(tableName, + new Path(fullBackupInfo.getBackupRootDir()), fullBackupId); + String manifestDir = + SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root).toString(); + SnapshotRegionLocator.setSnapshotManifestDir(conf, manifestDir, tableName); + } + } + } + private Map getFullBackupIds() throws IOException { // Ancestors are stored from newest to oldest, so we can iterate backwards // in order to populate our backupId map with the most recent full backup diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index 0c3c5b40ffb5..4ba56f951256 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -59,6 +59,8 @@ public class RestoreTablesClient { private Path restoreRootDir; private boolean isOverwrite; + private boolean isKeepOriginalSplits; + public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOException { this.backupRootDir = request.getBackupRootDir(); this.backupId = request.getBackupId(); @@ -68,6 +70,7 @@ public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOExc this.tTableArray = sTableArray; } this.isOverwrite = request.isOverwrite(); + this.isKeepOriginalSplits = request.isKeepOriginalSplits(); this.conn = conn; this.conf = conn.getConfiguration(); if (request.getRestoreRootDir() != null) { @@ -132,7 +135,7 @@ private void checkTargetTables(TableName[] tTableArray, boolean isOverwrite) thr */ private void restoreImages(BackupImage[] images, TableName sTable, TableName tTable, - boolean truncateIfExists) throws IOException { + boolean truncateIfExists, boolean isKeepOriginalSplits) throws IOException { // First image MUST be image of a FULL backup BackupImage image = images[0]; String rootDir = image.getRootDir(); @@ -148,7 +151,7 @@ private void restoreImages(BackupImage[] images, TableName sTable, TableName tTa + tableBackupPath.toString()); conf.set(JOB_NAME_CONF_KEY, "Full_Restore-" + backupId + "-" + tTable); restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists, - lastIncrBackupId); + isKeepOriginalSplits, lastIncrBackupId); conf.unset(JOB_NAME_CONF_KEY); } else { // incremental Backup throw new IOException("Unexpected backup type " + image.getType()); @@ -183,7 +186,7 @@ private void restoreImages(BackupImage[] images, TableName sTable, TableName tTa dirList.toArray(paths); conf.set(JOB_NAME_CONF_KEY, "Incremental_Restore-" + backupId + "-" + tTable); restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable }, - new TableName[] { tTable }, lastIncrBackupId); + new TableName[] { tTable }, lastIncrBackupId, isKeepOriginalSplits); LOG.info(sTable + " has been successfully restored to " + tTable); } @@ -208,7 +211,7 @@ private List getFilesRecursively(String fileBackupDir) * @throws IOException exception */ private void restore(BackupManifest manifest, TableName[] sTableArray, TableName[] tTableArray, - boolean isOverwrite) throws IOException { + boolean isOverwrite, boolean isKeepOriginalSplits) throws IOException { TreeSet restoreImageSet = new TreeSet<>(); for (int i = 0; i < sTableArray.length; i++) { @@ -223,7 +226,7 @@ private void restore(BackupManifest manifest, TableName[] sTableArray, TableName set.addAll(depList); BackupImage[] arr = new BackupImage[set.size()]; set.toArray(arr); - restoreImages(arr, table, tTableArray[i], isOverwrite); + restoreImages(arr, table, tTableArray[i], isOverwrite, isKeepOriginalSplits); restoreImageSet.addAll(list); if (restoreImageSet != null && !restoreImageSet.isEmpty()) { LOG.info("Restore includes the following image(s):"); @@ -257,6 +260,6 @@ public void execute() throws IOException { Path rootPath = new Path(backupRootDir); BackupManifest manifest = HBackupFileSystem.getManifest(conf, rootPath, backupId); - restore(manifest, sTableArray, tTableArray, isOverwrite); + restore(manifest, sTableArray, tTableArray, isOverwrite, isKeepOriginalSplits); } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java index 2c073c56f7eb..7d9430914cb3 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.mapreduce.HFileInputFormat; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MapReduceExtendedCell; import org.apache.hadoop.io.NullWritable; @@ -118,7 +119,7 @@ public Job createSubmittableJob(String[] args) throws IOException { job.setMapOutputValueClass(MapReduceExtendedCell.class); try (Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + RegionLocator regionLocator = getRegionLocator(conf, conn, tableName)) { HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); } LOG.debug("success configuring load incremental job"); @@ -171,4 +172,13 @@ public int run(String[] args) throws Exception { int result = job.waitForCompletion(true) ? 0 : 1; return result; } + + private static RegionLocator getRegionLocator(Configuration conf, Connection conn, + TableName table) throws IOException { + if (SnapshotRegionLocator.shouldUseSnapshotRegionLocator(conf, table)) { + return SnapshotRegionLocator.create(conf, table); + } + + return conn.getRegionLocator(table); + } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreToOriginalSplitsJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreToOriginalSplitsJob.java new file mode 100644 index 000000000000..942f69a2fb8c --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreToOriginalSplitsJob.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.RestoreJob; +import org.apache.hadoop.hbase.tool.BulkLoadHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSVisitor; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +@InterfaceAudience.Private +public class MapReduceRestoreToOriginalSplitsJob implements RestoreJob { + private Configuration conf; + + @Override + public void run(Path[] dirPaths, TableName[] fromTables, Path restoreRootDir, + TableName[] toTables, boolean fullBackupRestore) throws IOException { + Configuration conf = getConf(); + + // We are using the files from the snapshot. We should copy them rather than move them over + conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true); + + FileSystem fs = FileSystem.get(conf); + Map> family2Files = buildFamily2Files(fs, dirPaths, fullBackupRestore); + + BulkLoadHFiles bulkLoad = BulkLoadHFiles.create(conf); + for (int i = 0; i < fromTables.length; i++) { + bulkLoad.bulkLoad(toTables[i], family2Files); + } + } + + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + } + + @Override + public Configuration getConf() { + return conf; + } + + private static Map> buildFamily2Files(FileSystem fs, Path[] dirs, + boolean isFullBackup) throws IOException { + if (isFullBackup) { + return buildFullBackupFamily2Files(fs, dirs); + } + + Map> family2Files = new HashMap<>(); + + for (Path dir : dirs) { + byte[] familyName = Bytes.toBytes(dir.getParent().getName()); + if (family2Files.containsKey(familyName)) { + family2Files.get(familyName).add(dir); + } else { + family2Files.put(familyName, Lists.newArrayList(dir)); + } + } + + return family2Files; + } + + private static Map> buildFullBackupFamily2Files(FileSystem fs, Path[] dirs) + throws IOException { + Map> family2Files = new HashMap<>(); + for (Path regionPath : dirs) { + FSVisitor.visitRegionStoreFiles(fs, regionPath, (region, family, name) -> { + Path path = new Path(regionPath, new Path(family, name)); + byte[] familyName = Bytes.toBytes(family); + if (family2Files.containsKey(familyName)) { + family2Files.get(familyName).add(path); + } else { + family2Files.put(familyName, Lists.newArrayList(path)); + } + }); + } + return family2Files; + } + +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index aa87a2f3d401..15159ed73e46 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -657,10 +657,17 @@ public static BackupInfo loadBackupInfo(Path backupRootPath, String backupId, Fi */ public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { + return createRestoreRequest(backupRootDir, backupId, check, fromTables, toTables, isOverwrite, + false); + } + + public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId, + boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite, + boolean isKeepOriginalSplits) { RestoreRequest.Builder builder = new RestoreRequest.Builder(); - RestoreRequest request = - builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) - .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); + RestoreRequest request = builder.withBackupRootDir(backupRootDir).withBackupId(backupId) + .withCheck(check).withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite) + .withKeepOriginalSplits(isKeepOriginalSplits).build(); return request; } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java index ff4e2672f7a2..6248d7932dde 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java @@ -143,16 +143,19 @@ void modifyTableSync(Connection conn, TableDescriptor desc) throws IOException { * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently * tableNames and newTablesNames only contain single table, will be expanded to multiple tables in * the future - * @param conn HBase connection - * @param tableBackupPath backup path - * @param logDirs : incremental backup folders, which contains WAL - * @param tableNames : source tableNames(table names were backuped) - * @param newTableNames : target tableNames(table names to be restored to) - * @param incrBackupId incremental backup Id + * @param conn HBase connection + * @param tableBackupPath backup path + * @param logDirs : incremental backup folders, which contains WAL + * @param tableNames : source tableNames(table names were backuped) + * @param newTableNames : target tableNames(table names to be restored to) + * @param incrBackupId incremental backup Id + * @param keepOriginalSplits whether the original region splits from the full backup should be + * kept * @throws IOException exception */ public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] logDirs, - TableName[] tableNames, TableName[] newTableNames, String incrBackupId) throws IOException { + TableName[] tableNames, TableName[] newTableNames, String incrBackupId, + boolean keepOriginalSplits) throws IOException { try (Admin admin = conn.getAdmin()) { if (tableNames.length != newTableNames.length) { throw new IOException("Number of source tables and target tables does not match!"); @@ -200,6 +203,7 @@ public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[ LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor); } } + conf.setBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY, keepOriginalSplits); RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf); restoreService.run(logDirs, tableNames, restoreRootDir, newTableNames, false); @@ -207,9 +211,10 @@ public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[ } public void fullRestoreTable(Connection conn, Path tableBackupPath, TableName tableName, - TableName newTableName, boolean truncateIfExists, String lastIncrBackupId) throws IOException { + TableName newTableName, boolean truncateIfExists, boolean isKeepOriginalSplits, + String lastIncrBackupId) throws IOException { createAndRestoreTable(conn, tableName, newTableName, tableBackupPath, truncateIfExists, - lastIncrBackupId); + isKeepOriginalSplits, lastIncrBackupId); } /** @@ -283,7 +288,8 @@ private TableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableNa } private void createAndRestoreTable(Connection conn, TableName tableName, TableName newTableName, - Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId) throws IOException { + Path tableBackupPath, boolean truncateIfExists, boolean isKeepOriginalSplits, + String lastIncrBackupId) throws IOException { if (newTableName == null) { newTableName = tableName; } @@ -349,6 +355,7 @@ private void createAndRestoreTable(Connection conn, TableName tableName, TableNa // should only try to create the table with all region informations, so we could pre-split // the regions in fine grain checkAndCreateTable(conn, newTableName, regionPathList, tableDescriptor, truncateIfExists); + conf.setBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY, isKeepOriginalSplits); RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf); Path[] paths = new Path[regionPathList.size()]; regionPathList.toArray(paths); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java index aed4f8ae9520..d000dba9a649 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java @@ -18,15 +18,23 @@ package org.apache.hadoop.hbase.backup; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.SingleProcessHBaseCluster; @@ -44,9 +52,14 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.LogRoller; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.junit.After; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -69,6 +82,8 @@ public class TestIncrementalBackup extends TestBackupBase { HBaseClassTestRule.forClass(TestIncrementalBackup.class); private static final Logger LOG = LoggerFactory.getLogger(TestIncrementalBackup.class); + private static final byte[] BULKLOAD_START_KEY = new byte[] { 0x00 }; + private static final byte[] BULKLOAD_END_KEY = new byte[] { Byte.MAX_VALUE }; @Parameterized.Parameters public static Collection data() { @@ -81,6 +96,34 @@ public static Collection data() { public TestIncrementalBackup(Boolean b) { } + @After + public void ensurePreviousBackupTestsAreCleanedUp() throws Exception { + TEST_UTIL.flush(table1); + TEST_UTIL.flush(table2); + TEST_UTIL.flush(table1_restore); + + TEST_UTIL.truncateTable(table1).close(); + TEST_UTIL.truncateTable(table2).close(); + TEST_UTIL.truncateTable(table1_restore).close(); + + TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(rst -> { + try { + LogRoller walRoller = rst.getRegionServer().getWalRoller(); + walRoller.requestRollAll(); + walRoller.waitUntilWalRollFinished(); + } catch (Exception ignored) { + } + }); + + try (Table table = TEST_UTIL.getConnection().getTable(table1)) { + loadTable(table); + } + + try (Table table = TEST_UTIL.getConnection().getTable(table2)) { + loadTable(table); + } + } + // implement all test cases in 1 test since incremental // backup/restore has dependencies @Test @@ -240,6 +283,101 @@ public void TestIncBackupRestore() throws Exception { } } + @Test + public void TestIncBackupRestoreWithOriginalSplits() throws Exception { + byte[] mobFam = Bytes.toBytes("mob"); + + List tables = Lists.newArrayList(table1); + TableDescriptor newTable1Desc = + TableDescriptorBuilder.newBuilder(table1Desc).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(mobFam).setMobEnabled(true).setMobThreshold(5L).build()).build(); + TEST_UTIL.getAdmin().modifyTable(newTable1Desc); + + Connection conn = TEST_UTIL.getConnection(); + BackupAdminImpl backupAdmin = new BackupAdminImpl(conn); + BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); + String fullBackupId = backupAdmin.backupTables(request); + assertTrue(checkSucceeded(fullBackupId)); + + TableName[] fromTables = new TableName[] { table1 }; + TableName[] toTables = new TableName[] { table1_restore }; + + List preRestoreBackupFiles = getBackupFiles(); + backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, fullBackupId, false, + fromTables, toTables, true, true)); + List postRestoreBackupFiles = getBackupFiles(); + + // Check that the backup files are the same before and after the restore process + Assert.assertEquals(postRestoreBackupFiles, preRestoreBackupFiles); + Assert.assertEquals(TEST_UTIL.countRows(table1_restore), NB_ROWS_IN_BATCH); + + int ROWS_TO_ADD = 1_000; + // different IDs so that rows don't overlap + insertIntoTable(conn, table1, famName, 3, ROWS_TO_ADD); + insertIntoTable(conn, table1, mobFam, 4, ROWS_TO_ADD); + + try (Admin admin = conn.getAdmin()) { + List currentRegions = TEST_UTIL.getHBaseCluster().getRegions(table1); + for (HRegion region : currentRegions) { + byte[] name = region.getRegionInfo().getEncodedNameAsBytes(); + admin.splitRegionAsync(name).get(); + } + + TEST_UTIL.waitTableAvailable(table1); + + // Make sure we've split regions + assertNotEquals(currentRegions, TEST_UTIL.getHBaseCluster().getRegions(table1)); + + request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); + String incrementalBackupId = backupAdmin.backupTables(request); + assertTrue(checkSucceeded(incrementalBackupId)); + preRestoreBackupFiles = getBackupFiles(); + backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, incrementalBackupId, + false, fromTables, toTables, true, true)); + postRestoreBackupFiles = getBackupFiles(); + Assert.assertEquals(postRestoreBackupFiles, preRestoreBackupFiles); + Assert.assertEquals(NB_ROWS_IN_BATCH + ROWS_TO_ADD + ROWS_TO_ADD, + TEST_UTIL.countRows(table1_restore)); + + // test bulkloads + HRegion regionToBulkload = TEST_UTIL.getHBaseCluster().getRegions(table1).get(0); + String regionName = regionToBulkload.getRegionInfo().getEncodedName(); + + insertIntoTable(conn, table1, famName, 5, ROWS_TO_ADD); + insertIntoTable(conn, table1, mobFam, 6, ROWS_TO_ADD); + + doBulkload(table1, regionName, famName, mobFam); + + // we need to major compact the regions to make sure there are no references + // and the regions are once again splittable + TEST_UTIL.compact(true); + TEST_UTIL.flush(); + TEST_UTIL.waitTableAvailable(table1); + + for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(table1)) { + if (region.isSplittable()) { + admin.splitRegionAsync(region.getRegionInfo().getEncodedNameAsBytes()).get(); + } + } + + request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); + incrementalBackupId = backupAdmin.backupTables(request); + assertTrue(checkSucceeded(incrementalBackupId)); + + preRestoreBackupFiles = getBackupFiles(); + backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, incrementalBackupId, + false, fromTables, toTables, true, true)); + postRestoreBackupFiles = getBackupFiles(); + + Assert.assertEquals(postRestoreBackupFiles, preRestoreBackupFiles); + + int rowsExpected = TEST_UTIL.countRows(table1); + int rowsActual = TEST_UTIL.countRows(table1_restore); + + Assert.assertEquals(rowsExpected, rowsActual); + } + } + private void checkThrowsCFMismatch(IOException ex, List tables) { Throwable cause = Throwables.getRootCause(ex); assertEquals(cause.getClass(), ColumnFamilyMismatchException.class); @@ -255,6 +393,32 @@ private String takeFullBackup(List tables, BackupAdminImpl backupAdmi return backupId; } + private static void doBulkload(TableName tn, String regionName, byte[]... fams) + throws IOException { + Path regionDir = createHFiles(tn, regionName, fams); + Map results = + BulkLoadHFiles.create(conf1).bulkLoad(tn, regionDir); + assertFalse(results.isEmpty()); + } + + private static Path createHFiles(TableName tn, String regionName, byte[]... fams) + throws IOException { + Path rootdir = CommonFSUtils.getRootDir(conf1); + Path regionDir = CommonFSUtils.getRegionDir(rootdir, tn, regionName); + + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + fs.mkdirs(rootdir); + + for (byte[] fam : fams) { + Path famDir = new Path(regionDir, Bytes.toString(fam)); + Path hFileDir = new Path(famDir, UUID.randomUUID().toString()); + HFileTestUtil.createHFile(conf1, fs, hFileDir, fam, qualName, BULKLOAD_START_KEY, + BULKLOAD_END_KEY, 1000); + } + + return regionDir; + } + /** * Check that backup manifest can be produced for a different root. Users may want to move * existing backups to a different location. @@ -271,4 +435,16 @@ private void validateRootPathCanBeOverridden(String originalPath, String backupI assertEquals(anotherRootDir, ancestor.getRootDir()); } } + + private List getBackupFiles() throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + RemoteIterator iter = fs.listFiles(new Path(BACKUP_ROOT_DIR), true); + List files = new ArrayList<>(); + + while (iter.hasNext()) { + files.add(iter.next()); + } + + return files; + } } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 99b1dd112b98..5e2dc0902e0d 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.TableInfo; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MapReduceExtendedCell; @@ -335,7 +336,7 @@ public Job createSubmittableJob(String[] args) throws IOException { List tableInfoList = new ArrayList(); for (TableName tableName : tableNames) { Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName); + RegionLocator regionLocator = getRegionLocator(tableName, conf, conn); tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator)); } MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList); @@ -425,4 +426,13 @@ public int run(String[] args) throws Exception { Job job = createSubmittableJob(args); return job.waitForCompletion(true) ? 0 : 1; } + + private static RegionLocator getRegionLocator(TableName tableName, Configuration conf, + Connection conn) throws IOException { + if (SnapshotRegionLocator.shouldUseSnapshotRegionLocator(conf, tableName)) { + return SnapshotRegionLocator.create(conf, tableName); + } + + return conn.getRegionLocator(tableName); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRegionLocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRegionLocator.java new file mode 100644 index 000000000000..24e8caba0d58 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRegionLocator.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.snapshot; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; + +/** + * {@link RegionLocator} built using the most recent full backup's snapshot manifest for a given + * table. Useful for aligning any subsequent incremental backups along the same splits as the full + * backup + */ +@InterfaceAudience.Private +public final class SnapshotRegionLocator implements RegionLocator { + + private static final String SNAPSHOT_MANIFEST_DIR_PREFIX = + "region.locator.snapshot.manifest.dir."; + + private static final ServerName FAKE_SERVER_NAME = + ServerName.parseServerName("www.example.net,1234,1212121212"); + + private final TableName tableName; + private final TreeMap regions; + + private final List rawLocations; + + public static SnapshotRegionLocator create(Configuration conf, TableName table) + throws IOException { + Path workingDir = new Path(conf.get(getSnapshotManifestDirKey(table))); + FileSystem fs = workingDir.getFileSystem(conf); + SnapshotProtos.SnapshotDescription desc = + SnapshotDescriptionUtils.readSnapshotInfo(fs, workingDir); + SnapshotManifest manifest = SnapshotManifest.open(conf, fs, workingDir, desc); + + TableName tableName = manifest.getTableDescriptor().getTableName(); + TreeMap replicas = new TreeMap<>(Bytes.BYTES_COMPARATOR); + List rawLocations = new ArrayList<>(); + + for (SnapshotProtos.SnapshotRegionManifest region : manifest.getRegionManifests()) { + HBaseProtos.RegionInfo ri = region.getRegionInfo(); + byte[] key = ri.getStartKey().toByteArray(); + + SnapshotHRegionLocation location = toLocation(ri, tableName); + rawLocations.add(location); + HRegionReplicas hrr = replicas.get(key); + + if (hrr == null) { + hrr = new HRegionReplicas(location); + } else { + hrr.addReplica(location); + } + + replicas.put(key, hrr); + } + + return new SnapshotRegionLocator(tableName, replicas, rawLocations); + } + + private SnapshotRegionLocator(TableName tableName, TreeMap regions, + List rawLocations) { + this.tableName = tableName; + this.regions = regions; + this.rawLocations = rawLocations; + } + + @Override + public HRegionLocation getRegionLocation(byte[] row, int replicaId, boolean reload) + throws IOException { + return regions.floorEntry(row).getValue().getReplica(replicaId); + } + + @Override + public List getRegionLocations(byte[] row, boolean reload) throws IOException { + return List.of(getRegionLocation(row, reload)); + } + + @Override + public void clearRegionLocationCache() { + + } + + @Override + public List getAllRegionLocations() throws IOException { + return rawLocations; + } + + @Override + public TableName getName() { + return tableName; + } + + @Override + public void close() throws IOException { + + } + + public static boolean shouldUseSnapshotRegionLocator(Configuration conf, TableName table) { + return conf.get(getSnapshotManifestDirKey(table)) != null; + } + + public static void setSnapshotManifestDir(Configuration conf, String dir, TableName table) { + conf.set(getSnapshotManifestDirKey(table), dir); + } + + private static String getSnapshotManifestDirKey(TableName table) { + return SNAPSHOT_MANIFEST_DIR_PREFIX + table.getNameWithNamespaceInclAsString(); + } + + private static SnapshotHRegionLocation toLocation(HBaseProtos.RegionInfo ri, + TableName tableName) { + RegionInfo region = RegionInfoBuilder.newBuilder(tableName) + .setStartKey(ri.getStartKey().toByteArray()).setEndKey(ri.getEndKey().toByteArray()) + .setRegionId(ri.getRegionId()).setReplicaId(ri.getReplicaId()).build(); + + return new SnapshotHRegionLocation(region); + } + + private static final class HRegionReplicas { + private final Map replicas = new HashMap<>(); + + private HRegionReplicas(SnapshotHRegionLocation region) { + addReplica(region); + } + + private void addReplica(SnapshotHRegionLocation replica) { + this.replicas.put(replica.getRegion().getReplicaId(), replica); + } + + private HRegionLocation getReplica(int replicaId) { + return replicas.get(replicaId); + } + } + + public static final class SnapshotHRegionLocation extends HRegionLocation { + + public SnapshotHRegionLocation(RegionInfo regionInfo) { + super(regionInfo, FAKE_SERVER_NAME); + } + + @Override + public ServerName getServerName() { + throw new NotImplementedException("SnapshotHRegionLocation doesn't have a server name"); + } + + @Override + public String getHostname() { + throw new NotImplementedException("SnapshotHRegionLocation doesn't have a host name"); + } + + @Override + public int getPort() { + throw new NotImplementedException("SnapshotHRegionLocation doesn't have a port"); + } + + @Override + public int hashCode() { + return this.getRegion().hashCode(); + } + + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public int compareTo(HRegionLocation o) { + return this.getRegion().compareTo(o.getRegion()); + } + } +}