Skip to content

Commit 603d2b6

Browse files
authored
HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold (apache#2139)
Closes apache#2127 Signed-off-by: stack <[email protected]>
1 parent 8dec49a commit 603d2b6

7 files changed

Lines changed: 91 additions & 49 deletions

File tree

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,6 +1493,16 @@ public enum OperationStatusCode {
14931493
"hbase.master.executor.logreplayops.threads";
14941494
public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;
14951495

1496+
/**
1497+
* Number of rows in a batch operation above which a warning will be logged.
1498+
*/
1499+
public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
1500+
1501+
/**
1502+
* Default value of {@link #BATCH_ROWS_THRESHOLD_NAME}
1503+
*/
1504+
public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
1505+
14961506
private HConstants() {
14971507
// Can't be instantiated with this ctor.
14981508
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -276,15 +276,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
276276
*/
277277
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
278278

279-
/**
280-
* Number of rows in a batch operation above which a warning will be logged.
281-
*/
282-
static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
283-
/**
284-
* Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
285-
*/
286-
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
287-
288279
protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";
289280

290281
// Request counter. (Includes requests that are not serviced by regions.)
@@ -1229,7 +1220,8 @@ public RSRpcServices(HRegionServer rs) throws IOException {
12291220
RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException {
12301221
this.ld = ld;
12311222
regionServer = rs;
1232-
rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
1223+
rowSizeWarnThreshold = rs.conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
1224+
HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
12331225
RpcSchedulerFactory rpcSchedulerFactory;
12341226
try {
12351227
rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
187187
@Override
188188
public void startReplicationService() throws IOException {
189189
this.replicationManager.init();
190-
this.replicationSink = new ReplicationSink(this.conf, this.server);
190+
this.replicationSink = new ReplicationSink(this.conf);
191191
this.scheduleThreadPool.scheduleAtFixedRate(
192192
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
193193
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.InterruptedIOException;
2323
import java.util.ArrayList;
2424
import java.util.Collection;
25+
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
@@ -38,7 +39,6 @@
3839
import org.apache.hadoop.hbase.CellUtil;
3940
import org.apache.hadoop.hbase.HBaseConfiguration;
4041
import org.apache.hadoop.hbase.HConstants;
41-
import org.apache.hadoop.hbase.Stoppable;
4242
import org.apache.hadoop.hbase.TableName;
4343
import org.apache.hadoop.hbase.TableNotFoundException;
4444
import org.apache.yetus.audience.InterfaceAudience;
@@ -52,13 +52,14 @@
5252
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
5353
import org.apache.hadoop.hbase.client.Row;
5454
import org.apache.hadoop.hbase.client.Table;
55+
import org.apache.hadoop.hbase.util.Bytes;
56+
import org.apache.hadoop.hbase.util.Pair;
57+
import org.apache.hadoop.hbase.wal.WALEdit;
58+
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
5559
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
5660
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
5761
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
5862
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
59-
import org.apache.hadoop.hbase.wal.WALEdit;
60-
import org.apache.hadoop.hbase.util.Bytes;
61-
import org.apache.hadoop.hbase.util.Pair;
6263

6364
/**
6465
* <p>
@@ -91,16 +92,21 @@ public class ReplicationSink {
9192
private SourceFSConfigurationProvider provider;
9293
private WALEntrySinkFilter walEntrySinkFilter;
9394

95+
/**
96+
* Row size threshold for multi requests above which a warning is logged
97+
*/
98+
private final int rowSizeWarnThreshold;
99+
94100
/**
95101
* Create a sink for replication
96-
*
97-
* @param conf conf object
98-
* @param stopper boolean to tell this thread to stop
102+
* @param conf conf object
99103
* @throws IOException thrown when HDFS goes bad or bad file name
100104
*/
101-
public ReplicationSink(Configuration conf, Stoppable stopper)
105+
public ReplicationSink(Configuration conf)
102106
throws IOException {
103107
this.conf = HBaseConfiguration.create(conf);
108+
rowSizeWarnThreshold = conf.getInt(
109+
HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
104110
decorateConf();
105111
this.metrics = new MetricsSink();
106112
this.walEntrySinkFilter = setupWALEntrySinkFilter();
@@ -210,11 +216,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
210216
// Map of table name Vs list of pair of family and list of
211217
// hfile paths from its namespace
212218
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
213-
bulkLoadsPerClusters.get(bld.getClusterIdsList());
214-
if (bulkLoadHFileMap == null) {
215-
bulkLoadHFileMap = new HashMap<>();
216-
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
217-
}
219+
bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
218220
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
219221
}
220222
} else {
@@ -247,7 +249,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
247249
if (!rowMap.isEmpty()) {
248250
LOG.debug("Started replicating mutations.");
249251
for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
250-
batch(entry.getKey(), entry.getValue().values());
252+
batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
251253
}
252254
LOG.debug("Finished replicating mutations.");
253255
}
@@ -372,17 +374,10 @@ private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
372374
* @param value
373375
* @return the list of values corresponding to key1 and key2
374376
*/
375-
private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
376-
Map<K2,List<V>> innerMap = map.get(key1);
377-
if (innerMap == null) {
378-
innerMap = new HashMap<>();
379-
map.put(key1, innerMap);
380-
}
381-
List<V> values = innerMap.get(key2);
382-
if (values == null) {
383-
values = new ArrayList<>();
384-
innerMap.put(key2, values);
385-
}
377+
private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2,
378+
V value) {
379+
Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
380+
List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
386381
values.add(value);
387382
return values;
388383
}
@@ -410,9 +405,10 @@ public void stopReplicationSinkServices() {
410405
* Do the changes and handle the pool
411406
* @param tableName table to insert into
412407
* @param allRows list of actions
413-
* @throws IOException
408+
* @param batchRowSizeThreshold rowSize threshold for batch mutation
414409
*/
415-
protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
410+
private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
411+
throws IOException {
416412
if (allRows.isEmpty()) {
417413
return;
418414
}
@@ -421,7 +417,15 @@ protected void batch(TableName tableName, Collection<List<Row>> allRows) throws
421417
Connection connection = getConnection();
422418
table = connection.getTable(tableName);
423419
for (List<Row> rows : allRows) {
424-
table.batch(rows, null);
420+
List<List<Row>> batchRows;
421+
if (rows.size() > batchRowSizeThreshold) {
422+
batchRows = Lists.partition(rows, batchRowSizeThreshold);
423+
} else {
424+
batchRows = Collections.singletonList(rows);
425+
}
426+
for(List<Row> rowList:batchRows){
427+
table.batch(rowList, null);
428+
}
425429
}
426430
} catch (RetriesExhaustedWithDetailsException rewde) {
427431
for (Throwable ex : rewde.getCauses()) {

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.conf.Configuration;
2424
import org.apache.hadoop.hbase.HBaseClassTestRule;
2525
import org.apache.hadoop.hbase.HBaseTestingUtility;
26+
import org.apache.hadoop.hbase.HConstants;
2627
import org.apache.hadoop.hbase.TableName;
2728
import org.apache.hadoop.hbase.testclassification.MediumTests;
2829
import org.apache.hadoop.hbase.util.Bytes;
@@ -67,8 +68,8 @@ public static void setup() throws Exception {
6768
final TableName tableName = TableName.valueOf("tableName");
6869
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
6970
CONF = TEST_UTIL.getConfiguration();
70-
THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
71-
RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
71+
THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
72+
HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
7273
TEST_UTIL.startMiniCluster();
7374
TEST_UTIL.createTable(tableName, TEST_FAM);
7475
RS = TEST_UTIL.getRSForFirstRegionInTable(tableName);

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.hadoop.hbase.replication.regionserver;
1919

2020
import static org.junit.Assert.assertEquals;
21-
import static org.junit.Assert.assertTrue;
2221

2322
import java.security.SecureRandom;
2423
import java.util.ArrayList;
@@ -55,7 +54,7 @@
5554
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
5655
import org.apache.hadoop.hbase.client.Scan;
5756
import org.apache.hadoop.hbase.client.Table;
58-
import org.apache.hadoop.hbase.testclassification.MediumTests;
57+
import org.apache.hadoop.hbase.testclassification.LargeTests;
5958
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
6059
import org.apache.hadoop.hbase.util.Bytes;
6160
import org.apache.hadoop.hbase.util.FSUtils;
@@ -78,7 +77,7 @@
7877
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
7978
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
8079

81-
@Category({ReplicationTests.class, MediumTests.class})
80+
@Category({ReplicationTests.class, LargeTests.class})
8281
public class TestReplicationSink {
8382

8483
@ClassRule
@@ -127,10 +126,8 @@ public void stop(String why) {
127126
public static void setUpBeforeClass() throws Exception {
128127
TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
129128
TestSourceFSConfigurationProvider.class.getCanonicalName());
130-
131129
TEST_UTIL.startMiniCluster(3);
132-
SINK =
133-
new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
130+
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
134131
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
135132
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
136133
Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
@@ -203,6 +200,40 @@ public void testMixedPutDelete() throws Exception {
203200
assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
204201
}
205202

203+
@Test
204+
public void testLargeEditsPutDelete() throws Exception {
205+
List<WALEntry> entries = new ArrayList<>();
206+
List<Cell> cells = new ArrayList<>();
207+
for (int i = 0; i < 5510; i++) {
208+
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
209+
}
210+
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
211+
baseNamespaceDir, hfileArchiveDir);
212+
213+
ResultScanner resultScanner = table1.getScanner(new Scan());
214+
int totalRows = 0;
215+
while (resultScanner.next() != null) {
216+
totalRows++;
217+
}
218+
assertEquals(5510, totalRows);
219+
220+
entries = new ArrayList<>();
221+
cells = new ArrayList<>();
222+
for (int i = 0; i < 11000; i++) {
223+
entries.add(
224+
createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn,
225+
cells));
226+
}
227+
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
228+
baseNamespaceDir, hfileArchiveDir);
229+
resultScanner = table1.getScanner(new Scan());
230+
totalRows = 0;
231+
while (resultScanner.next() != null) {
232+
totalRows++;
233+
}
234+
assertEquals(5500, totalRows);
235+
}
236+
206237
/**
207238
* Insert to 2 different tables
208239
* @throws Exception
@@ -221,7 +252,11 @@ public void testMixedPutTables() throws Exception {
221252
Scan scan = new Scan();
222253
ResultScanner scanRes = table2.getScanner(scan);
223254
for(Result res : scanRes) {
224-
assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
255+
assertEquals(0, Bytes.toInt(res.getRow()) % 2);
256+
}
257+
scanRes = table1.getScanner(scan);
258+
for(Result res : scanRes) {
259+
assertEquals(1, Bytes.toInt(res.getRow()) % 2);
225260
}
226261
}
227262

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void testWALEntryFilter() throws IOException {
128128
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
129129
conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
130130
Connection.class);
131-
ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
131+
ReplicationSink sink = new ReplicationSink(conf);
132132
// Create some dumb walentries.
133133
List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =
134134
new ArrayList<>();

0 commit comments

Comments
 (0)