Skip to content

Commit 38c8bd3

Browse files
authored
HBASE-22380 break circle replication when doing bulkload (#494)
Signed-off-by: stack <[email protected]> Signed-off-by: Andrew Purtell <[email protected]> Signed-off-by: Norbert Kalmar <[email protected]>
1 parent adc0197 commit 38c8bd3

13 files changed

Lines changed: 402 additions & 33 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client
115115
final List<Pair<byte[], String>> familyPaths,
116116
final byte[] regionName, boolean assignSeqNum,
117117
final Token<?> userToken, final String bulkToken) throws IOException {
118-
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken,
119-
false);
118+
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
119+
bulkToken, false, null);
120120
}
121121

122122
/**
@@ -132,13 +132,23 @@ public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client
132132
* @return true if all are loaded
133133
* @throws IOException
134134
*/
135+
public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
136+
final List<Pair<byte[], String>> familyPaths,
137+
final byte[] regionName, boolean assignSeqNum,
138+
final Token<?> userToken, final String bulkToken,
139+
boolean copyFiles) throws IOException {
140+
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
141+
bulkToken, false, null);
142+
}
143+
135144
public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
136145
final List<Pair<byte[], String>> familyPaths,
137146
final byte[] regionName, boolean assignSeqNum,
138-
final Token<?> userToken, final String bulkToken, boolean copyFiles) throws IOException {
147+
final Token<?> userToken, final String bulkToken,
148+
boolean copyFiles, List<String> clusterIds) throws IOException {
139149
BulkLoadHFileRequest request =
140150
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
141-
userToken, bulkToken, copyFiles);
151+
userToken, bulkToken, copyFiles, clusterIds);
142152

143153
try {
144154
BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2577,13 +2577,23 @@ public static QuotaProtos.SpaceQuota toProtoSpaceQuota(
25772577
* name
25782578
* @return The WAL log marker for bulk loads.
25792579
*/
2580+
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
2581+
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
2582+
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
2583+
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
2584+
storeFilesSize, bulkloadSeqId, null);
2585+
}
2586+
25802587
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
25812588
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
2582-
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
2589+
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
25832590
BulkLoadDescriptor.Builder desc =
25842591
BulkLoadDescriptor.newBuilder()
25852592
.setTableName(ProtobufUtil.toProtoTableName(tableName))
25862593
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
2594+
if(clusterIds != null) {
2595+
desc.addAllClusterIds(clusterIds);
2596+
}
25872597

25882598
for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
25892599
WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
566566
final byte[] regionName, boolean assignSeqNum,
567567
final Token<?> userToken, final String bulkToken) {
568568
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
569-
false);
569+
false, null);
570570
}
571571

572572
/**
@@ -581,9 +581,9 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
581581
* @return a bulk load request
582582
*/
583583
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
584-
final List<Pair<byte[], String>> familyPaths,
585-
final byte[] regionName, boolean assignSeqNum,
586-
final Token<?> userToken, final String bulkToken, boolean copyFiles) {
584+
final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
585+
final Token<?> userToken, final String bulkToken, boolean copyFiles,
586+
List<String> clusterIds) {
587587
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
588588
RegionSpecifierType.REGION_NAME, regionName);
589589

@@ -621,6 +621,9 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
621621
request.setBulkToken(bulkToken);
622622
}
623623
request.setCopyFile(copyFiles);
624+
if (clusterIds != null) {
625+
request.addAllClusterIds(clusterIds);
626+
}
624627
return request.build();
625628
}
626629

hbase-protocol-shaded/src/main/protobuf/Client.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ message BulkLoadHFileRequest {
378378
optional DelegationToken fs_token = 4;
379379
optional string bulk_token = 5;
380380
optional bool copy_file = 6 [default = false];
381+
repeated string cluster_ids = 7;
381382

382383
message FamilyPath {
383384
required bytes family = 1;

hbase-protocol-shaded/src/main/protobuf/WAL.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ message BulkLoadDescriptor {
150150
required bytes encoded_region_name = 2;
151151
repeated StoreDescriptor stores = 3;
152152
required int64 bulkload_seq_num = 4;
153+
repeated string cluster_ids = 5;
153154
}
154155

155156
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6090,7 +6090,7 @@ private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>
60906090
*/
60916091
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
60926092
BulkLoadListener bulkLoadListener) throws IOException {
6093-
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
6093+
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
60946094
}
60956095

60966096
/**
@@ -6135,11 +6135,13 @@ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile)
61356135
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
61366136
* file about to be bulk loaded
61376137
* @param copyFile always copy hfiles if true
6138+
* @param clusterIds ids from clusters that had already handled the given bulkload event.
61386139
* @return Map from family to List of store file paths if successful, null if failed recoverably
61396140
* @throws IOException if failed unrecoverably.
61406141
*/
61416142
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
6142-
boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
6143+
boolean assignSeqId, BulkLoadListener bulkLoadListener,
6144+
boolean copyFile, List<String> clusterIds) throws IOException {
61436145
long seqId = -1;
61446146
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
61456147
Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6314,8 +6316,7 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
63146316
WALProtos.BulkLoadDescriptor loadDescriptor =
63156317
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
63166318
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
6317-
storeFiles,
6318-
storeFilesSizes, seqId);
6319+
storeFiles, storeFilesSizes, seqId, clusterIds);
63196320
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
63206321
loadDescriptor, mvcc);
63216322
} catch (IOException ioe) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2364,6 +2364,12 @@ public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
23642364
public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
23652365
final BulkLoadHFileRequest request) throws ServiceException {
23662366
long start = EnvironmentEdgeManager.currentTime();
2367+
List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2368+
if(clusterIds.contains(this.regionServer.clusterId)){
2369+
return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2370+
} else {
2371+
clusterIds.add(this.regionServer.clusterId);
2372+
}
23672373
try {
23682374
checkOpen();
23692375
requestCount.increment();
@@ -2396,15 +2402,15 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
23962402
}
23972403
try {
23982404
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
2399-
request.getCopyFile());
2405+
request.getCopyFile(), clusterIds);
24002406
} finally {
24012407
if (region.getCoprocessorHost() != null) {
24022408
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
24032409
}
24042410
}
24052411
} else {
24062412
// secure bulk load
2407-
map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
2413+
map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds);
24082414
}
24092415
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
24102416
builder.setLoaded(map != null);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,12 @@ private boolean isUserReferenced(UserGroupInformation ugi) {
213213
}
214214

215215
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
216-
final BulkLoadHFileRequest request) throws IOException {
216+
final BulkLoadHFileRequest request) throws IOException {
217+
return secureBulkLoadHFiles(region, request, null);
218+
}
219+
220+
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
221+
final BulkLoadHFileRequest request, List<String> clusterIds) throws IOException {
217222
final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
218223
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
219224
familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
@@ -289,7 +294,8 @@ public Map<byte[], List<Path>> run() {
289294
//We call bulkLoadHFiles as requesting user
290295
//To enable access prior to staging
291296
return region.bulkLoadHFiles(familyPaths, true,
292-
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
297+
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
298+
clusterIds);
293299
} catch (Exception e) {
294300
LOG.error("Failed to complete bulk load", e);
295301
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,19 @@ public class HFileReplicator {
8787
private ThreadPoolExecutor exec;
8888
private int maxCopyThreads;
8989
private int copiesPerThread;
90+
private List<String> sourceClusterIds;
9091

9192
public HFileReplicator(Configuration sourceClusterConf,
9293
String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
9394
Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
94-
Connection connection) throws IOException {
95+
Connection connection, List<String> sourceClusterIds) throws IOException {
9596
this.sourceClusterConf = sourceClusterConf;
9697
this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
9798
this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
9899
this.bulkLoadHFileMap = tableQueueMap;
99100
this.conf = conf;
100101
this.connection = connection;
102+
this.sourceClusterIds = sourceClusterIds;
101103

102104
userProvider = UserProvider.instantiate(conf);
103105
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
@@ -128,6 +130,7 @@ public Void replicate() throws IOException {
128130
LoadIncrementalHFiles loadHFiles = null;
129131
try {
130132
loadHFiles = new LoadIncrementalHFiles(conf);
133+
loadHFiles.setClusterIds(sourceClusterIds);
131134
} catch (Exception e) {
132135
LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
133136
+ " data.", e);

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
174174
// invocation of this method per table and cluster id.
175175
Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
176176

177-
// Map of table name Vs list of pair of family and list of hfile paths from its namespace
178-
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
179-
177+
Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
180178
for (WALEntry entry : entries) {
181179
TableName table =
182180
TableName.valueOf(entry.getKey().getTableName().toByteArray());
@@ -204,10 +202,19 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
204202
Cell cell = cells.current();
205203
// Handle bulk load hfiles replication
206204
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
205+
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
206+
if(bulkLoadsPerClusters == null) {
207+
bulkLoadsPerClusters = new HashMap<>();
208+
}
209+
// Map of table name Vs list of pair of family and list of
210+
// hfile paths from its namespace
211+
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
212+
bulkLoadsPerClusters.get(bld.getClusterIdsList());
207213
if (bulkLoadHFileMap == null) {
208214
bulkLoadHFileMap = new HashMap<>();
215+
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
209216
}
210-
buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
217+
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
211218
} else {
212219
// Handle wal replication
213220
if (isNewRowOrType(previousCell, cell)) {
@@ -243,14 +250,26 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
243250
LOG.debug("Finished replicating mutations.");
244251
}
245252

246-
if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
247-
LOG.debug("Started replicating bulk loaded data.");
248-
HFileReplicator hFileReplicator =
249-
new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
253+
if(bulkLoadsPerClusters != null) {
254+
for (Entry<List<String>, Map<String, List<Pair<byte[],
255+
List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
256+
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
257+
if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
258+
if(LOG.isDebugEnabled()) {
259+
LOG.debug("Started replicating bulk loaded data from cluster ids: {}.",
260+
entry.getKey().toString());
261+
}
262+
HFileReplicator hFileReplicator =
263+
new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
250264
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
251-
getConnection());
252-
hFileReplicator.replicate();
253-
LOG.debug("Finished replicating bulk loaded data.");
265+
getConnection(), entry.getKey());
266+
hFileReplicator.replicate();
267+
if(LOG.isDebugEnabled()) {
268+
LOG.debug("Finished replicating bulk loaded data from cluster id: {}",
269+
entry.getKey().toString());
270+
}
271+
}
272+
}
254273
}
255274

256275
int size = entries.size();
@@ -265,8 +284,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
265284

266285
private void buildBulkLoadHFileMap(
267286
final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
268-
Cell cell) throws IOException {
269-
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
287+
BulkLoadDescriptor bld) throws IOException {
270288
List<StoreDescriptor> storesList = bld.getStoresList();
271289
int storesSize = storesList.size();
272290
for (int j = 0; j < storesSize; j++) {

0 commit comments

Comments
 (0)