Skip to content

Commit f8c359c

Browse files
committed
Bidirectional bulkload replication causes excessive network traffic
1 parent e7be4f3 commit f8c359c

7 files changed

Lines changed: 34 additions & 35 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2641,20 +2641,16 @@ public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableN
26412641
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
26422642
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
26432643
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles, storeFilesSize,
2644-
bulkloadSeqId, null, true);
2644+
bulkloadSeqId, true);
26452645
}
26462646

26472647
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
26482648
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
2649-
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds,
2650-
boolean replicate) {
2649+
Map<String, Long> storeFilesSize, long bulkloadSeqId, boolean replicate) {
26512650
BulkLoadDescriptor.Builder desc =
26522651
BulkLoadDescriptor.newBuilder().setTableName(ProtobufUtil.toProtoTableName(tableName))
26532652
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId)
26542653
.setReplicate(replicate);
2655-
if (clusterIds != null) {
2656-
desc.addAllClusterIds(clusterIds);
2657-
}
26582654

26592655
for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
26602656
WALProtos.StoreDescriptor.Builder builder =

hbase-protocol-shaded/src/main/protobuf/WAL.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,6 @@ message BulkLoadDescriptor {
152152
required bytes encoded_region_name = 2;
153153
repeated StoreDescriptor stores = 3;
154154
required int64 bulkload_seq_num = 4;
155-
repeated string cluster_ids = 5;
156155
optional bool replicate = 6 [default = true];
157156
}
158157

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7077,8 +7077,11 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
70777077
WALProtos.BulkLoadDescriptor loadDescriptor =
70787078
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
70797079
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
7080-
storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
7080+
storeFiles, storeFilesSizes, seqId, replicate);
70817081
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
7082+
clusterIds == null
7083+
? WALKey.EMPTY_UUIDS
7084+
: clusterIds.stream().map(UUID::fromString).collect(Collectors.toList()),
70827085
loadDescriptor, mvcc);
70837086
} catch (IOException ioe) {
70847087
if (this.rsServices != null) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2418,11 +2418,6 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
24182418
final BulkLoadHFileRequest request) throws ServiceException {
24192419
long start = EnvironmentEdgeManager.currentTime();
24202420
List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2421-
if (clusterIds.contains(this.regionServer.clusterId)) {
2422-
return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2423-
} else {
2424-
clusterIds.add(this.regionServer.clusterId);
2425-
}
24262421
try {
24272422
checkOpen();
24282423
requestCount.increment();

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
package org.apache.hadoop.hbase.regionserver.wal;
1919

2020
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
21+
import static org.apache.hadoop.hbase.wal.WALKey.EMPTY_UUIDS;
2122

2223
import java.io.IOException;
2324
import java.util.ArrayList;
25+
import java.util.List;
2426
import java.util.Map;
2527
import java.util.NavigableMap;
2628
import java.util.TreeMap;
29+
import java.util.UUID;
2730
import java.util.function.Function;
2831
import org.apache.hadoop.conf.Configuration;
2932
import org.apache.hadoop.fs.FileSystem;
@@ -75,8 +78,8 @@ private WALUtil() {
7578
public static WALKeyImpl writeCompactionMarker(WAL wal,
7679
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
7780
MultiVersionConcurrencyControl mvcc) throws IOException {
78-
WALKeyImpl walKey =
79-
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
81+
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, EMPTY_UUIDS,
82+
WALEdit.createCompaction(hri, c), mvcc, null);
8083
if (LOG.isTraceEnabled()) {
8184
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
8285
}
@@ -91,7 +94,7 @@ public static WALKeyImpl writeCompactionMarker(WAL wal,
9194
public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
9295
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
9396
throws IOException {
94-
WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
97+
WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri, EMPTY_UUIDS,
9598
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
9699
if (LOG.isTraceEnabled()) {
97100
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
@@ -106,8 +109,8 @@ public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer>
106109
public static WALKeyImpl writeRegionEventMarker(WAL wal,
107110
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
108111
MultiVersionConcurrencyControl mvcc) throws IOException {
109-
WALKeyImpl walKey =
110-
writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
112+
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, EMPTY_UUIDS,
113+
WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
111114
if (LOG.isTraceEnabled()) {
112115
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
113116
}
@@ -127,23 +130,23 @@ public static WALKeyImpl writeRegionEventMarker(WAL wal,
127130
*/
128131
public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
129132
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
130-
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
131-
throws IOException {
132-
WALKeyImpl walKey =
133-
writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
133+
List<UUID> clusterIds, final WALProtos.BulkLoadDescriptor desc,
134+
final MultiVersionConcurrencyControl mvcc) throws IOException {
135+
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, clusterIds,
136+
WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
134137
if (LOG.isTraceEnabled()) {
135138
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
136139
}
137140
return walKey;
138141
}
139142

140143
private static WALKeyImpl writeMarker(final WAL wal,
141-
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, WALEdit edit,
142-
MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes)
144+
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, List<UUID> clusterIds,
145+
WALEdit edit, MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes)
143146
throws IOException {
144147
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
145-
return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes,
146-
true);
148+
return doFullMarkerAppendTransaction(wal, replicationScope, hri, clusterIds, edit, mvcc,
149+
extendedAttributes, true);
147150
}
148151

149152
/**
@@ -155,12 +158,12 @@ private static WALKeyImpl writeMarker(final WAL wal,
155158
* @return WALKeyImpl that was added to the WAL.
156159
*/
157160
private static WALKeyImpl doFullMarkerAppendTransaction(WAL wal,
158-
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final WALEdit edit,
159-
MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes, boolean sync)
160-
throws IOException {
161+
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final List<UUID> clusterIds,
162+
final WALEdit edit, MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes,
163+
boolean sync) throws IOException {
161164
// TODO: Pass in current time to use?
162165
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
163-
EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
166+
EnvironmentEdgeManager.currentTime(), clusterIds, mvcc, replicationScope, extendedAttributes);
164167
long trx = MultiVersionConcurrencyControl.NONE;
165168
try {
166169
trx = wal.appendMarker(hri, walKey, edit);
@@ -232,7 +235,7 @@ public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrenc
232235
RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException {
233236
NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
234237
replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL);
235-
writeMarker(wal, replicationScope, regionInfo,
238+
writeMarker(wal, replicationScope, regionInfo, EMPTY_UUIDS,
236239
WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null);
237240
}
238241
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,11 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
249249
}
250250
// Map of table name Vs list of pair of family and list of
251251
// hfile paths from its namespace
252+
253+
List<String> clusterIds = entry.getKey().getClusterIdsList().stream()
254+
.map(k -> toUUID(k).toString()).collect(Collectors.toList());
252255
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
253-
bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
256+
bulkLoadsPerClusters.computeIfAbsent(clusterIds, k -> new HashMap<>());
254257
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
255258
}
256259
} else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) {

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,9 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, fin
152152
}
153153

154154
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
155-
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope,
156-
Map<String, byte[]> extendedAttributes) {
157-
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
155+
List<UUID> clusterIds, MultiVersionConcurrencyControl mvcc,
156+
final NavigableMap<byte[], Integer> replicationScope, Map<String, byte[]> extendedAttributes) {
157+
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, HConstants.NO_NONCE,
158158
HConstants.NO_NONCE, mvcc, replicationScope, extendedAttributes);
159159
}
160160

0 commit comments

Comments
 (0)