Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreToOriginalSplitsJob;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -43,8 +44,13 @@ private BackupRestoreFactory() {
* @return backup restore job instance
*/
public static RestoreJob getRestoreJob(Configuration conf) {
Class<? extends RestoreJob> defaultCls =
conf.getBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY, RestoreJob.KEEP_ORIGINAL_SPLITS_DEFAULT)
? MapReduceRestoreToOriginalSplitsJob.class
: MapReduceRestoreJob.class;

Class<? extends RestoreJob> cls =
conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, MapReduceRestoreJob.class, RestoreJob.class);
conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, defaultCls, RestoreJob.class);
RestoreJob service = ReflectionUtils.newInstance(cls, conf);
service.setConf(conf);
return service;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@

@InterfaceAudience.Private
public interface RestoreJob extends Configurable {

String KEEP_ORIGINAL_SPLITS_KEY = "hbase.backup.restorejob.keep.original.splits";
boolean KEEP_ORIGINAL_SPLITS_DEFAULT = false;

/**
* Run restore operation
* @param dirPaths path array of WAL log directories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public Builder withOvewrite(boolean overwrite) {
return this;
}

public Builder withKeepOriginalSplits(boolean keepOriginalSplits) {
request.setKeepOriginalSplits(keepOriginalSplits);
return this;
}

public RestoreRequest build() {
return request;
}
Expand All @@ -80,6 +85,8 @@ public RestoreRequest build() {
private TableName[] toTables;
private boolean overwrite = false;

private boolean keepOriginalSplits = false;

private RestoreRequest() {
}

Expand Down Expand Up @@ -145,4 +152,13 @@ private RestoreRequest setOverwrite(boolean overwrite) {
this.overwrite = overwrite;
return this;
}

public boolean isKeepOriginalSplits() {
return keepOriginalSplits;
}

private RestoreRequest setKeepOriginalSplits(boolean keepOriginalSplits) {
this.keepOriginalSplits = keepOriginalSplits;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupCopyJob;
import org.apache.hadoop.hbase.backup.BackupInfo;
Expand All @@ -40,13 +42,16 @@
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
Expand Down Expand Up @@ -165,54 +170,60 @@ protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws I
LOG.debug("copying archive {} to {}", archive, tgt);
archiveFiles.add(archive.toString());
}
mergeSplitBulkloads(activeFiles, archiveFiles, srcTable);
incrementalCopyBulkloadHFiles(tgtFs, srcTable);
}

copyBulkLoadedFiles(activeFiles, archiveFiles);
return bulkLoads;
}

private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
throws IOException {
try {
// Enable special mode of BackupDistCp
conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
// Copy active files
String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId();
int attempt = 1;
while (activeFiles.size() > 0) {
LOG.info("Copy " + activeFiles.size() + " active bulk loaded files. Attempt =" + attempt++);
String[] toCopy = new String[activeFiles.size()];
activeFiles.toArray(toCopy);
// Active file can be archived during copy operation,
// we need to handle this properly
try {
incrementalCopyHFiles(toCopy, tgtDest);
break;
} catch (IOException e) {
// Check if some files got archived
// Update active and archived lists
// When file is being moved from active to archive
// directory, the number of active files decreases
int numOfActive = activeFiles.size();
updateFileLists(activeFiles, archiveFiles);
if (activeFiles.size() < numOfActive) {
continue;
}
// if not - throw exception
throw e;
private void mergeSplitBulkloads(List<String> activeFiles, List<String> archiveFiles,
TableName tn) throws IOException {
int attempt = 1;

while (!activeFiles.isEmpty()) {
LOG.info("MergeSplit {} active bulk loaded files. Attempt={}", activeFiles.size(), attempt++);
// Active file can be archived during copy operation,
// we need to handle this properly
try {
mergeSplitBulkloads(activeFiles, tn);
break;
} catch (IOException e) {
int numActiveFiles = activeFiles.size();
updateFileLists(activeFiles, archiveFiles);
if (activeFiles.size() < numActiveFiles) {
continue;
}

throw e;
}
// If incremental copy will fail for archived files
// we will have partially loaded files in backup destination (only files from active data
// directory). It is OK, because the backup will marked as FAILED and data will be cleaned up
if (archiveFiles.size() > 0) {
String[] toCopy = new String[archiveFiles.size()];
archiveFiles.toArray(toCopy);
incrementalCopyHFiles(toCopy, tgtDest);
}
} finally {
// Disable special mode of BackupDistCp
conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
}

if (!archiveFiles.isEmpty()) {
mergeSplitBulkloads(archiveFiles, tn);
}
}

private void mergeSplitBulkloads(List<String> files, TableName tn) throws IOException {
MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob();
conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY,
getBulkOutputDirForTable(tn).toString());
player.setConf(conf);

String inputDirs = StringUtils.join(files, ",");
String[] args = { inputDirs, tn.getNameWithNamespaceInclAsString() };

int result;

try {
result = player.run(args);
} catch (Exception e) {
LOG.error("Failed to run MapReduceHFileSplitterJob", e);
throw new IOException(e);
}

if (result != 0) {
throw new IOException(
"Failed to run MapReduceHFileSplitterJob with invalid result: " + result);
}
}

Expand Down Expand Up @@ -263,6 +274,7 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
try {
// copy out the table and region info files for each table
BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
setupRegionLocator();
// convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
convertWALsToHFiles();
incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
Expand Down Expand Up @@ -405,6 +417,29 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
}
}

private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn) throws IOException {
Path bulkOutDir = getBulkOutputDirForTable(tn);
FileSystem fs = FileSystem.get(conf);

if (fs.exists(bulkOutDir)) {
conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2);
Path tgtPath = getTargetDirForTable(tn);
try {
RemoteIterator<LocatedFileStatus> locatedFiles = tgtFs.listFiles(bulkOutDir, true);
List<String> files = new ArrayList<>();
while (locatedFiles.hasNext()) {
LocatedFileStatus file = locatedFiles.next();
if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) {
files.add(file.getPath().toString());
}
}
incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), tgtPath.toString());
} finally {
conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
}
}
}

protected Path getBulkOutputDirForTable(TableName table) {
Path tablePath = getBulkOutputDir();
tablePath = new Path(tablePath, table.getNamespaceAsString());
Expand All @@ -420,6 +455,30 @@ protected Path getBulkOutputDir() {
return path;
}

private Path getTargetDirForTable(TableName table) {
Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId());
path = new Path(path, table.getNamespaceAsString());
path = new Path(path, table.getNameAsString());
return path;
}

private void setupRegionLocator() throws IOException {
Map<TableName, String> fullBackupIds = getFullBackupIds();
try (BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) {

for (TableName tableName : backupInfo.getTables()) {
String fullBackupId = fullBackupIds.get(tableName);
BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(fullBackupId);
String snapshotName = fullBackupInfo.getSnapshotName(tableName);
Path root = HBackupFileSystem.getTableBackupPath(tableName,
new Path(fullBackupInfo.getBackupRootDir()), fullBackupId);
String manifestDir =
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root).toString();
SnapshotRegionLocator.setSnapshotManifestDir(conf, manifestDir, tableName);
}
}
}

private Map<TableName, String> getFullBackupIds() throws IOException {
// Ancestors are stored from newest to oldest, so we can iterate backwards
// in order to populate our backupId map with the most recent full backup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class RestoreTablesClient {
private Path restoreRootDir;
private boolean isOverwrite;

private boolean isKeepOriginalSplits;

public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOException {
this.backupRootDir = request.getBackupRootDir();
this.backupId = request.getBackupId();
Expand All @@ -68,6 +70,7 @@ public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOExc
this.tTableArray = sTableArray;
}
this.isOverwrite = request.isOverwrite();
this.isKeepOriginalSplits = request.isKeepOriginalSplits();
this.conn = conn;
this.conf = conn.getConfiguration();
if (request.getRestoreRootDir() != null) {
Expand Down Expand Up @@ -132,7 +135,7 @@ private void checkTargetTables(TableName[] tTableArray, boolean isOverwrite) thr
*/

private void restoreImages(BackupImage[] images, TableName sTable, TableName tTable,
boolean truncateIfExists) throws IOException {
boolean truncateIfExists, boolean isKeepOriginalSplits) throws IOException {
// First image MUST be image of a FULL backup
BackupImage image = images[0];
String rootDir = image.getRootDir();
Expand All @@ -148,7 +151,7 @@ private void restoreImages(BackupImage[] images, TableName sTable, TableName tTa
+ tableBackupPath.toString());
conf.set(JOB_NAME_CONF_KEY, "Full_Restore-" + backupId + "-" + tTable);
restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists,
lastIncrBackupId);
isKeepOriginalSplits, lastIncrBackupId);
conf.unset(JOB_NAME_CONF_KEY);
} else { // incremental Backup
throw new IOException("Unexpected backup type " + image.getType());
Expand Down Expand Up @@ -183,7 +186,7 @@ private void restoreImages(BackupImage[] images, TableName sTable, TableName tTa
dirList.toArray(paths);
conf.set(JOB_NAME_CONF_KEY, "Incremental_Restore-" + backupId + "-" + tTable);
restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable },
new TableName[] { tTable }, lastIncrBackupId);
new TableName[] { tTable }, lastIncrBackupId, isKeepOriginalSplits);
LOG.info(sTable + " has been successfully restored to " + tTable);
}

Expand All @@ -208,7 +211,7 @@ private List<Path> getFilesRecursively(String fileBackupDir)
* @throws IOException exception
*/
private void restore(BackupManifest manifest, TableName[] sTableArray, TableName[] tTableArray,
boolean isOverwrite) throws IOException {
boolean isOverwrite, boolean isKeepOriginalSplits) throws IOException {
TreeSet<BackupImage> restoreImageSet = new TreeSet<>();

for (int i = 0; i < sTableArray.length; i++) {
Expand All @@ -223,7 +226,7 @@ private void restore(BackupManifest manifest, TableName[] sTableArray, TableName
set.addAll(depList);
BackupImage[] arr = new BackupImage[set.size()];
set.toArray(arr);
restoreImages(arr, table, tTableArray[i], isOverwrite);
restoreImages(arr, table, tTableArray[i], isOverwrite, isKeepOriginalSplits);
restoreImageSet.addAll(list);
if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
LOG.info("Restore includes the following image(s):");
Expand Down Expand Up @@ -257,6 +260,6 @@ public void execute() throws IOException {
Path rootPath = new Path(backupRootDir);
BackupManifest manifest = HBackupFileSystem.getManifest(conf, rootPath, backupId);

restore(manifest, sTableArray, tTableArray, isOverwrite);
restore(manifest, sTableArray, tTableArray, isOverwrite, isKeepOriginalSplits);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.io.NullWritable;
Expand Down Expand Up @@ -118,7 +119,7 @@ public Job createSubmittableJob(String[] args) throws IOException {
job.setMapOutputValueClass(MapReduceExtendedCell.class);
try (Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
RegionLocator regionLocator = getRegionLocator(conf, conn, tableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
}
LOG.debug("success configuring load incremental job");
Expand Down Expand Up @@ -171,4 +172,13 @@ public int run(String[] args) throws Exception {
int result = job.waitForCompletion(true) ? 0 : 1;
return result;
}

private static RegionLocator getRegionLocator(Configuration conf, Connection conn,
TableName table) throws IOException {
if (SnapshotRegionLocator.shouldUseSnapshotRegionLocator(conf, table)) {
return SnapshotRegionLocator.create(conf, table);
}

return conn.getRegionLocator(table);
}
}
Loading