diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupMasterObserver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupMasterObserver.java new file mode 100644 index 000000000000..5310270c24f0 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupMasterObserver.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.impl.BulkLoad; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; + +/** + * An Observer to facilitate backup operations + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class BackupMasterObserver implements MasterCoprocessor, MasterObserver { + private static final Logger LOG = LoggerFactory.getLogger(BackupMasterObserver.class); + + @Override + public Optional getMasterObserver() { + return Optional.of(this); + } + + @Override + public void postDeleteTable(ObserverContext ctx, + TableName tableName) throws IOException { + Configuration cfg = ctx.getEnvironment().getConfiguration(); + if (!BackupManager.isBackupEnabled(cfg)) { + LOG.debug("Skipping postDeleteTable hook since backup is disabled"); + return; + } + deleteBulkLoads(cfg, tableName, (ignored) -> true); + } + + @Override + public void postTruncateTable(ObserverContext ctx, + TableName tableName) throws IOException { + Configuration cfg = ctx.getEnvironment().getConfiguration(); + if (!BackupManager.isBackupEnabled(cfg)) { + LOG.debug("Skipping postTruncateTable hook since backup is disabled"); + return; + } + deleteBulkLoads(cfg, tableName, (ignored) -> true); + } + + @Override + public void postModifyTable(final ObserverContext ctx, + final TableName tableName, TableDescriptor oldDescriptor, TableDescriptor currentDescriptor) + throws IOException { + Configuration cfg = ctx.getEnvironment().getConfiguration(); + if (!BackupManager.isBackupEnabled(cfg)) { + LOG.debug("Skipping postModifyTable hook since backup is disabled"); + return; + } + + Set oldFamilies = Arrays.stream(oldDescriptor.getColumnFamilies()) + .map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet()); + Set newFamilies = Arrays.stream(currentDescriptor.getColumnFamilies()) + .map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet()); + + Set removedFamilies = Sets.difference(oldFamilies, newFamilies); + if (!removedFamilies.isEmpty()) { + Predicate filter = bulkload -> removedFamilies.contains(bulkload.getColumnFamily()); + deleteBulkLoads(cfg, tableName, filter); + } + } + + /** + * Deletes all bulk load entries for the given table, matching the provided predicate. + */ + private void deleteBulkLoads(Configuration config, TableName tableName, + Predicate filter) throws IOException { + try (Connection connection = ConnectionFactory.createConnection(config); + BackupSystemTable tbl = new BackupSystemTable(connection)) { + List bulkLoads = tbl.readBulkloadRows(List.of(tableName)); + List rowsToDelete = + bulkLoads.stream().filter(filter).map(BulkLoad::getRowKey).toList(); + tbl.deleteBulkLoadedRows(rowsToDelete); + } + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java index 30a5674eb021..d05a421a3956 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.backup; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.yetus.audience.InterfaceAudience; /** @@ -98,16 +99,17 @@ public interface BackupRestoreConstants { String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - String BACKUP_CONFIG_STRING = - BackupRestoreConstants.BACKUP_ENABLE_KEY + "=true\n" + "hbase.master.logcleaner.plugins=" - + "YOUR_PLUGINS,org.apache.hadoop.hbase.backup.master.BackupLogCleaner\n" - + "hbase.procedure.master.classes=YOUR_CLASSES," - + "org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager\n" - + "hbase.procedure.regionserver.classes=YOUR_CLASSES," - + "org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager\n" - + "hbase.coprocessor.region.classes=YOUR_CLASSES," - + "org.apache.hadoop.hbase.backup.BackupObserver\n" + "and restart the cluster\n" - + "For more information please see http://hbase.apache.org/book.html#backuprestore\n"; + String BACKUP_CONFIG_STRING = BackupRestoreConstants.BACKUP_ENABLE_KEY + "=true\n" + + "hbase.master.logcleaner.plugins=" + + "YOUR_PLUGINS,org.apache.hadoop.hbase.backup.master.BackupLogCleaner\n" + + "hbase.procedure.master.classes=YOUR_CLASSES," + + "org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager\n" + + "hbase.procedure.regionserver.classes=YOUR_CLASSES," + + "org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager\n" + + CoprocessorHost.REGION_COPROCESSOR_CONF_KEY + "=YOUR_CLASSES," + + BackupObserver.class.getSimpleName() + "\n" + CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY + + "=YOUR_CLASSES," + BackupMasterObserver.class.getSimpleName() + "\nand restart the cluster\n" + + "For more information please see http://hbase.apache.org/book.html#backuprestore\n"; String ENABLE_BACKUP = "Backup is not enabled. To enable backup, " + "in hbase-site.xml, set:\n " + BACKUP_CONFIG_STRING; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index fa24e9028e94..d1b31800183b 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -119,11 +119,17 @@ public static void decorateMasterConfiguration(Configuration conf) { plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, (plugins == null ? "" : plugins + ",") + BackupHFileCleaner.class.getName()); + + String observerClass = BackupObserver.class.getName(); + String masterCoProc = conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY); + conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + (masterCoProc == null ? "" : masterCoProc + ",") + observerClass); + if (LOG.isDebugEnabled()) { LOG.debug( "Added log cleaner: {}. Added master procedure manager: {}." - + "Added master procedure manager: {}", - cleanerClass, masterProcedureClass, BackupHFileCleaner.class.getName()); + + " Added master procedure manager: {}. Added master observer: {}", + cleanerClass, masterProcedureClass, BackupHFileCleaner.class.getName(), observerClass); } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index bb88666c7bf8..c0296d64d9e1 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -411,25 +411,24 @@ public void registerBulkLoad(TableName tableName, byte[] region, try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { List puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath); bufferedMutator.mutate(puts); - LOG.debug("Written {} rows for bulk load of {}", puts.size(), tableName); + LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName); } } - /* - * Removes rows recording bulk loaded hfiles from backup table - * @param lst list of table names - * @param rows the rows to be deleted + /** + * Removes entries from the table that tracks all bulk loaded hfiles. + * @param rows the row keys of the entries to be deleted */ public void deleteBulkLoadedRows(List rows) throws IOException { try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { - List lstDels = new ArrayList<>(); + List deletes = new ArrayList<>(); for (byte[] row : rows) { Delete del = new Delete(row); - lstDels.add(del); - LOG.debug("orig deleting the row: " + Bytes.toString(row)); + deletes.add(del); + LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row)); } - bufferedMutator.mutate(lstDels); - LOG.debug("deleted " + rows.size() + " original bulkload rows"); + bufferedMutator.mutate(deletes); + LOG.debug("Deleted {} bulk load entries.", rows.size()); } } @@ -1522,16 +1521,6 @@ public static void deleteSnapshot(Connection conn) throws IOException { } } - public static List createDeleteForOrigBulkLoad(List lst) { - List lstDels = new ArrayList<>(lst.size()); - for (TableName table : lst) { - Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM)); - del.addFamily(BackupSystemTable.META_FAMILY); - lstDels.add(del); - } - return lstDels; - } - private Put createPutForDeleteOperation(String[] backupIdList) { byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); Put put = new Put(DELETE_OP_ROW); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index c4017e8c1a1a..f21ced9bf2ff 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyJob; @@ -152,6 +153,11 @@ public void execute() throws IOException { // the snapshot. LOG.info("Execute roll log procedure for full backup ..."); + // Gather the bulk loads being tracked by the system, which can be deleted (since their data + // will be part of the snapshot being taken). We gather this list before taking the actual + // snapshots for the same reason as the log rolls. + List bulkLoadsToDelete = backupManager.readBulkloadRows(tableList); + Map props = new HashMap<>(); props.put("backupRoot", backupInfo.getBackupRootDir()); admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, @@ -192,6 +198,9 @@ public void execute() throws IOException { BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); backupManager.writeBackupStartCode(newStartCode); + backupManager + .deleteBulkLoadedRows(bulkLoadsToDelete.stream().map(BulkLoad::getRowKey).toList()); + // backup complete completeBackup(conn, backupInfo, BackupType.FULL, conf); } catch (Exception e) { diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java index d24ec160d0cb..412fd5e32f7e 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java @@ -17,35 +17,34 @@ */ package org.apache.hadoop.hbase.backup; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.impl.BulkLoad; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.tool.TestBulkLoadHFiles; +import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Assert; +import org.apache.hadoop.hbase.util.HFileTestUtil; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** - * 1. Create table t1 2. Load data to t1 3 Full backup t1 4 Load data to t1 5 bulk load into t1 6 - * Incremental backup t1 + * This test checks whether backups properly track & manage bulk files loads. */ @Category(LargeTests.class) public class TestIncrementalBackupWithBulkLoad extends TestBackupBase { @@ -54,85 +53,111 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestIncrementalBackupWithBulkLoad.class); - private static final Logger LOG = LoggerFactory.getLogger(TestIncrementalBackupDeleteTable.class); + private static final String TEST_NAME = TestIncrementalBackupWithBulkLoad.class.getSimpleName(); + private static final int ROWS_IN_BULK_LOAD = 100; // implement all test cases in 1 test since incremental backup/restore has dependencies @Test public void TestIncBackupDeleteTable() throws Exception { - String testName = "TestIncBackupDeleteTable"; - // #1 - create full backup for all tables - LOG.info("create full backup image for all tables"); - - List tables = Lists.newArrayList(table1); - Connection conn = ConnectionFactory.createConnection(conf1); - Admin admin = conn.getAdmin(); - BackupAdminImpl client = new BackupAdminImpl(conn); - - BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); - String backupIdFull = client.backupTables(request); - - assertTrue(checkSucceeded(backupIdFull)); - - // #2 - insert some data to table table1 - Table t1 = conn.getTable(table1); - Put p1; - for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - p1 = new Put(Bytes.toBytes("row-t1" + i)); - p1.addColumn(famName, qualName, Bytes.toBytes("val" + i)); - t1.put(p1); + try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { + // The test starts with some data, and no bulk loaded rows. + int expectedRowCount = NB_ROWS_IN_BATCH; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty()); + + // Bulk loads aren't tracked if the table isn't backed up yet + performBulkLoad("bulk1"); + expectedRowCount += ROWS_IN_BULK_LOAD; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); + + // Create a backup, bulk loads are now being tracked + String backup1 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR); + assertTrue(checkSucceeded(backup1)); + performBulkLoad("bulk2"); + expectedRowCount += ROWS_IN_BULK_LOAD; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size()); + + // Truncating or deleting a table clears the tracked bulk loads (and all rows) + TEST_UTIL.truncateTable(table1).close(); + expectedRowCount = 0; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); + + // Creating a full backup clears the bulk loads (since they are captured in the snapshot) + performBulkLoad("bulk3"); + expectedRowCount = ROWS_IN_BULK_LOAD; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size()); + String backup2 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR); + assertTrue(checkSucceeded(backup2)); + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); + + // Creating an incremental backup clears the bulk loads + performBulkLoad("bulk4"); + performBulkLoad("bulk5"); + performBulkLoad("bulk6"); + expectedRowCount += 3 * ROWS_IN_BULK_LOAD; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertEquals(3, systemTable.readBulkloadRows(List.of(table1)).size()); + String backup3 = backupTables(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR); + assertTrue(checkSucceeded(backup3)); + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); + int rowCountAfterBackup3 = expectedRowCount; + + // Doing another bulk load, to check that this data will disappear after a restore operation + performBulkLoad("bulk7"); + expectedRowCount += ROWS_IN_BULK_LOAD; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + List bulkloadsTemp = systemTable.readBulkloadRows(List.of(table1)); + assertEquals(1, bulkloadsTemp.size()); + BulkLoad bulk7 = bulkloadsTemp.get(0); + + // Doing a restore. Overwriting the table implies clearing the bulk loads, + // but the loading of restored data involves loading bulk data, we expect 2 bulk loads + // associated with backup 3 (loading of full backup, loading of incremental backup). + BackupAdmin client = getBackupAdmin(); + client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup3, false, + new TableName[] { table1 }, new TableName[] { table1 }, true)); + assertEquals(rowCountAfterBackup3, TEST_UTIL.countRows(table1)); + List bulkLoads = systemTable.readBulkloadRows(List.of(table1)); + assertEquals(2, bulkLoads.size()); + assertFalse(bulkLoads.contains(bulk7)); + + // Check that we have data of all expected bulk loads + try (Table restoredTable = TEST_UTIL.getConnection().getTable(table1)) { + assertFalse(containsRowWithKey(restoredTable, "bulk1")); + assertFalse(containsRowWithKey(restoredTable, "bulk2")); + assertTrue(containsRowWithKey(restoredTable, "bulk3")); + assertTrue(containsRowWithKey(restoredTable, "bulk4")); + assertTrue(containsRowWithKey(restoredTable, "bulk5")); + assertTrue(containsRowWithKey(restoredTable, "bulk6")); + assertFalse(containsRowWithKey(restoredTable, "bulk7")); + } } + } - Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH * 2); - t1.close(); - - int NB_ROWS2 = 20; - LOG.debug("bulk loading into " + testName); - int actual = - TestBulkLoadHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName, qualName, false, null, - new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, - new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, - true, false, true, NB_ROWS_IN_BATCH * 2, NB_ROWS2); - - // #3 - incremental backup for table1 - tables = Lists.newArrayList(table1); - request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); - String backupIdIncMultiple = client.backupTables(request); - assertTrue(checkSucceeded(backupIdIncMultiple)); - // #4 bulk load again - LOG.debug("bulk loading into " + testName); - int actual1 = - TestBulkLoadHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName, qualName, false, null, - new byte[][][] { new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("qqq") }, - new byte[][] { Bytes.toBytes("rrr"), Bytes.toBytes("sss") }, }, - true, false, true, NB_ROWS_IN_BATCH * 2 + actual, NB_ROWS2); - - // #5 - incremental backup for table1 - tables = Lists.newArrayList(table1); - request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); - String backupIdIncMultiple1 = client.backupTables(request); - assertTrue(checkSucceeded(backupIdIncMultiple1)); - // Delete all data in table1 - TEST_UTIL.deleteTableData(table1); - - // #6 - restore incremental backup for table1 - TableName[] tablesRestoreIncMultiple = new TableName[] { table1 }; - // TableName[] tablesMapIncMultiple = new TableName[] { table1_restore }; - client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple1, false, - tablesRestoreIncMultiple, tablesRestoreIncMultiple, true)); - - Table hTable = conn.getTable(table1); - Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2 + actual + actual1); - request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); - - backupIdFull = client.backupTables(request); - try (final BackupSystemTable table = new BackupSystemTable(conn)) { - List bulkLoads = table.readBulkloadRows(tables); - assertTrue("bulkloads still has " + bulkLoads.size() + " entries", bulkLoads.isEmpty()); - } - assertTrue(checkSucceeded(backupIdFull)); + private boolean containsRowWithKey(Table table, String rowKey) throws IOException { + byte[] data = Bytes.toBytes(rowKey); + Get get = new Get(data); + Result result = table.get(get); + return result.containsColumn(famName, qualName); + } + + private void performBulkLoad(String keyPrefix) throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(TEST_NAME); + Path hfilePath = + new Path(baseDirectory, Bytes.toString(famName) + Path.SEPARATOR + "hfile_" + keyPrefix); + + HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, hfilePath, famName, qualName, + Bytes.toBytes(keyPrefix), Bytes.toBytes(keyPrefix + "z"), ROWS_IN_BULK_LOAD); - hTable.close(); - admin.close(); - conn.close(); + Map result = + BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, baseDirectory); + assertFalse(result.isEmpty()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java index 7561645f70b4..40b5ef440b2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java @@ -304,14 +304,6 @@ private void runTest(String testName, TableName tableName, BloomType bloomType, runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth); } - public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtil util, - byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, - byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount, - int factor) throws Exception { - return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges, - useMap, deleteFile, copyFiles, initRowCount, factor, 2); - } - public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtil util, byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount,