diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 05366a33ef50..d1537fa20361 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1493,6 +1493,16 @@ public enum OperationStatusCode { "hbase.master.executor.logreplayops.threads"; public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10; + /** + * Number of rows in a batch operation above which a warning will be logged. + */ + public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; + + /** + * Default value of {@link #BATCH_ROWS_THRESHOLD_NAME} + */ + public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 9a0283104c3b..081942397b37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -276,15 +276,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; - /** - * Number of rows in a batch operation above which a warning will be logged. - */ - static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; - /** - * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} - */ - static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; - protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled"; // Request counter. (Includes requests that are not serviced by regions.) @@ -1229,7 +1220,8 @@ public RSRpcServices(HRegionServer rs) throws IOException { RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException { this.ld = ld; regionServer = rs; - rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); + rowSizeWarnThreshold = rs.conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, + HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); RpcSchedulerFactory rpcSchedulerFactory; try { rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 6c46a853dd32..752cfb8a1427 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -187,7 +187,7 @@ public void replicateLogEntries(List entries, CellScanner cells, @Override public void startReplicationService() throws IOException { this.replicationManager.init(); - this.replicationSink = new ReplicationSink(this.conf, this.server); + this.replicationSink = new ReplicationSink(this.conf); this.scheduleThreadPool.scheduleAtFixedRate( new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index ae0a732499e7..76e22f8b36dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -22,6 +22,7 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,7 +39,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.yetus.audience.InterfaceAudience; @@ -52,13 +52,14 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; /** *

@@ -91,16 +92,21 @@ public class ReplicationSink { private SourceFSConfigurationProvider provider; private WALEntrySinkFilter walEntrySinkFilter; + /** + * Row size threshold for multi requests above which a warning is logged + */ + private final int rowSizeWarnThreshold; + /** * Create a sink for replication - * - * @param conf conf object - * @param stopper boolean to tell this thread to stop + * @param conf conf object * @throws IOException thrown when HDFS goes bad or bad file name */ - public ReplicationSink(Configuration conf, Stoppable stopper) + public ReplicationSink(Configuration conf) throws IOException { this.conf = HBaseConfiguration.create(conf); + rowSizeWarnThreshold = conf.getInt( + HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); decorateConf(); this.metrics = new MetricsSink(); this.walEntrySinkFilter = setupWALEntrySinkFilter(); @@ -210,11 +216,7 @@ public void replicateEntries(List entries, final CellScanner cells, // Map of table name Vs list of pair of family and list of // hfile paths from its namespace Map>>> bulkLoadHFileMap = - bulkLoadsPerClusters.get(bld.getClusterIdsList()); - if (bulkLoadHFileMap == null) { - bulkLoadHFileMap = new HashMap<>(); - bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); - } + bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } } else { @@ -247,7 +249,7 @@ public void replicateEntries(List entries, final CellScanner cells, if (!rowMap.isEmpty()) { LOG.debug("Started replicating mutations."); for (Entry, List>> entry : rowMap.entrySet()) { - batch(entry.getKey(), entry.getValue().values()); + batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); } LOG.debug("Finished replicating mutations."); } @@ -372,17 +374,10 @@ private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { * @param value * @return the list of values corresponding to key1 and key2 */ - private List addToHashMultiMap(Map>> map, K1 key1, K2 key2, V value) { - Map> innerMap = map.get(key1); - if (innerMap == null) { - innerMap = new HashMap<>(); - map.put(key1, innerMap); - } - List values = innerMap.get(key2); - if (values == null) { - values = new ArrayList<>(); - innerMap.put(key2, values); - } + private List addToHashMultiMap(Map>> map, K1 key1, K2 key2, + V value) { + Map> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>()); + List values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>()); values.add(value); return values; } @@ -410,9 +405,10 @@ public void stopReplicationSinkServices() { * Do the changes and handle the pool * @param tableName table to insert into * @param allRows list of actions - * @throws IOException + * @param batchRowSizeThreshold rowSize threshold for batch mutation */ - protected void batch(TableName tableName, Collection> allRows) throws IOException { + private void batch(TableName tableName, Collection> allRows, int batchRowSizeThreshold) + throws IOException { if (allRows.isEmpty()) { return; } @@ -421,7 +417,15 @@ protected void batch(TableName tableName, Collection> allRows) throws Connection connection = getConnection(); table = connection.getTable(tableName); for (List rows : allRows) { - table.batch(rows, null); + List> batchRows; + if (rows.size() > batchRowSizeThreshold) { + batchRows = Lists.partition(rows, batchRowSizeThreshold); + } else { + batchRows = Collections.singletonList(rows); + } + for(List rowList:batchRows){ + table.batch(rowList, null); + } } } catch (RetriesExhaustedWithDetailsException rewde) { for (Throwable ex : rewde.getCauses()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java index 8e11ed58d202..f41335fd29c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -67,8 +68,8 @@ public static void setup() throws Exception { final TableName tableName = TableName.valueOf("tableName"); TEST_UTIL = HBaseTestingUtility.createLocalHTU(); CONF = TEST_UTIL.getConfiguration(); - THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME, - RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT); + THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, + HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); TEST_UTIL.startMiniCluster(); TEST_UTIL.createTable(tableName, TEST_FAM); RS = TEST_UTIL.getRSForFirstRegionInTable(tableName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index aa6c39cd53ad..adb077dbf706 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.security.SecureRandom; import java.util.ArrayList; @@ -55,7 +54,7 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -78,7 +77,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; -@Category({ReplicationTests.class, MediumTests.class}) +@Category({ReplicationTests.class, LargeTests.class}) public class TestReplicationSink { @ClassRule @@ -127,10 +126,8 @@ public void stop(String why) { public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName()); - TEST_UTIL.startMiniCluster(3); - SINK = - new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE); + SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration())); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration()); @@ -203,6 +200,40 @@ public void testMixedPutDelete() throws Exception { assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length); } + @Test + public void testLargeEditsPutDelete() throws Exception { + List entries = new ArrayList<>(); + List cells = new ArrayList<>(); + for (int i = 0; i < 5510; i++) { + entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + } + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, + baseNamespaceDir, hfileArchiveDir); + + ResultScanner resultScanner = table1.getScanner(new Scan()); + int totalRows = 0; + while (resultScanner.next() != null) { + totalRows++; + } + assertEquals(5510, totalRows); + + entries = new ArrayList<>(); + cells = new ArrayList<>(); + for (int i = 0; i < 11000; i++) { + entries.add( + createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, + cells)); + } + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, + baseNamespaceDir, hfileArchiveDir); + resultScanner = table1.getScanner(new Scan()); + totalRows = 0; + while (resultScanner.next() != null) { + totalRows++; + } + assertEquals(5500, totalRows); + } + /** * Insert to 2 different tables * @throws Exception @@ -221,7 +252,11 @@ public void testMixedPutTables() throws Exception { Scan scan = new Scan(); ResultScanner scanRes = table2.getScanner(scan); for(Result res : scanRes) { - assertTrue(Bytes.toInt(res.getRow()) % 2 == 0); + assertEquals(0, Bytes.toInt(res.getRow()) % 2); + } + scanRes = table1.getScanner(scan); + for(Result res : scanRes) { + assertEquals(1, Bytes.toInt(res.getRow()) % 2); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java index 31e94d6bec52..15ff54c0c903 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java @@ -128,7 +128,7 @@ public void testWALEntryFilter() throws IOException { IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); conf.setClass("hbase.client.connection.impl", DevNullConnection.class, Connection.class); - ReplicationSink sink = new ReplicationSink(conf, STOPPABLE); + ReplicationSink sink = new ReplicationSink(conf); // Create some dumb walentries. List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries = new ArrayList<>();