From 41ab60d1c66469b2aeb0dc2409f5e276cdc45429 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Fri, 28 Jun 2024 16:46:22 -0400 Subject: [PATCH 1/3] Partition BackupSystemTable queries --- .../hbase/backup/impl/BackupSystemTable.java | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 5a12b45a5861..2976cbe2a41f 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.Table; @@ -76,6 +77,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Splitter; import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -101,6 +103,7 @@ public final class BackupSystemTable implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); + private static final int BATCH_SIZE = 1000; static class WALItem { String backupId; @@ -414,7 +417,7 @@ public void writePathsPostBulkLoad(TableName tabName, byte[] region, } try (Table table = connection.getTable(bulkLoadTableName)) { List puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); - table.put(puts); + executePartitionedBatches(table, puts); LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); } } @@ -453,7 +456,7 @@ public void deleteBulkLoadedRows(List rows) throws IOException { lstDels.add(del); LOG.debug("orig deleting the row: " + Bytes.toString(row)); } - table.delete(lstDels); + executePartitionedBatches(table, lstDels); LOG.debug("deleted " + rows.size() + " original bulkload rows"); } } @@ -558,7 +561,7 @@ public void writeBulkLoadedFiles(List sTableList, Map tables, Map operations) + throws IOException { + List> operationBatches = Lists.partition(operations, BATCH_SIZE); + for (List batch : operationBatches) { + try { + table.batch(batch, new Object[batch.size()]); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } } From 162a10d6c7d19107c0828a7433020f984ef0b50b Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Wed, 14 Aug 2024 15:00:53 -0400 Subject: [PATCH 2/3] use buffered mutator --- .../hbase/backup/impl/BackupSystemTable.java | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 2976cbe2a41f..1db2d50c7420 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -54,15 +54,16 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.Table; @@ -77,7 +78,6 @@ import org.apache.hbase.thirdparty.com.google.common.base.Splitter; import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -103,7 +103,6 @@ public final class BackupSystemTable implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); - private static final int BATCH_SIZE = 1000; static class WALItem { String backupId; @@ -417,7 +416,7 @@ public void writePathsPostBulkLoad(TableName tabName, byte[] region, } try (Table table = connection.getTable(bulkLoadTableName)) { List puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); - executePartitionedBatches(table, puts); + executeBufferedMutations(table, puts); LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); } } @@ -456,7 +455,7 @@ public void deleteBulkLoadedRows(List rows) throws IOException { lstDels.add(del); LOG.debug("orig deleting the row: " + Bytes.toString(row)); } - executePartitionedBatches(table, lstDels); + executeBufferedMutations(table, lstDels); LOG.debug("deleted " + rows.size() + " original bulkload rows"); } } @@ -561,7 +560,7 @@ public void writeBulkLoadedFiles(List sTableList, Map tables, Map mutations) + throws IOException { + try (BufferedMutator bufferedMutator = connection.getBufferedMutator(table.getName())) { + bufferedMutator.mutate(mutations); + } + } + private static byte[] rowkey(String s, String... other) { StringBuilder sb = new StringBuilder(s); for (String ss : other) { @@ -1905,19 +1911,4 @@ private static void ensureTableEnabled(Admin admin, TableName tableName) throws } } } - - /** - * Executes the given operations in partitioned batches of size {@link #BATCH_SIZE} - */ - private static void executePartitionedBatches(Table table, List operations) - throws IOException { - List> operationBatches = Lists.partition(operations, BATCH_SIZE); - for (List batch : operationBatches) { - try { - table.batch(batch, new Object[batch.size()]); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } } From 6c5020ac84d2bad817f74fb53c88b15a15272b44 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Fri, 6 Sep 2024 11:18:12 -0400 Subject: [PATCH 3/3] cleanup --- .../hbase/backup/impl/BackupSystemTable.java | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 1db2d50c7420..2ec6c6adbd4f 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -414,9 +413,9 @@ public void writePathsPostBulkLoad(TableName tabName, byte[] region, LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() + " entries"); } - try (Table table = connection.getTable(bulkLoadTableName)) { + try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { List puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); - executeBufferedMutations(table, puts); + bufferedMutator.mutate(puts); LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); } } @@ -448,14 +447,14 @@ public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, fin * @param rows the rows to be deleted */ public void deleteBulkLoadedRows(List rows) throws IOException { - try (Table table = connection.getTable(bulkLoadTableName)) { + try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { List lstDels = new ArrayList<>(); for (byte[] row : rows) { Delete del = new Delete(row); lstDels.add(del); LOG.debug("orig deleting the row: " + Bytes.toString(row)); } - executeBufferedMutations(table, lstDels); + bufferedMutator.mutate(lstDels); LOG.debug("deleted " + rows.size() + " original bulkload rows"); } } @@ -537,7 +536,7 @@ public void deleteBulkLoadedRows(List rows) throws IOException { */ public void writeBulkLoadedFiles(List sTableList, Map>[] maps, String backupId) throws IOException { - try (Table table = connection.getTable(bulkLoadTableName)) { + try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { long ts = EnvironmentEdgeManager.currentTime(); int cnt = 0; List puts = new ArrayList<>(); @@ -560,7 +559,7 @@ public void writeBulkLoadedFiles(List sTableList, Map tables, Map mutations) - throws IOException { - try (BufferedMutator bufferedMutator = connection.getBufferedMutator(table.getName())) { - bufferedMutator.mutate(mutations); - } - } - private static byte[] rowkey(String s, String... other) { StringBuilder sb = new StringBuilder(s); for (String ss : other) {