Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2676,20 +2676,16 @@ public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableN
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles, storeFilesSize,
bulkloadSeqId, null, true);
bulkloadSeqId, true);
}

public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds,
boolean replicate) {
Map<String, Long> storeFilesSize, long bulkloadSeqId, boolean replicate) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder().setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId)
.setReplicate(replicate);
if (clusterIds != null) {
desc.addAllClusterIds(clusterIds);
}

for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
WALProtos.StoreDescriptor.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ message BulkLoadDescriptor {
required bytes encoded_region_name = 2;
repeated StoreDescriptor stores = 3;
required int64 bulkload_seq_num = 4;
repeated string cluster_ids = 5;
optional bool replicate = 6 [default = true];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ROW_LOCK_READ_LOCK_KEY;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import static org.apache.hadoop.hbase.wal.WALKey.EMPTY_UUIDS;

import com.google.errorprone.annotations.RestrictedApi;
import edu.umd.cs.findbugs.annotations.Nullable;
Expand Down Expand Up @@ -3056,7 +3057,7 @@ private void attachRegionReplicationToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSe
assert !flushOpSeqIdMVCCEntry.getCompletionAction().isPresent();
WALEdit flushMarkerWALEdit = WALEdit.createFlushWALEdit(getRegionInfo(), desc);
WALKeyImpl walKey =
WALUtil.createWALKey(getRegionInfo(), mvcc, this.getReplicationScope(), null);
WALUtil.createWALKey(getRegionInfo(), EMPTY_UUIDS, mvcc, this.getReplicationScope(), null);
walKey.setWriteEntry(flushOpSeqIdMVCCEntry);
/**
* Here the {@link ServerCall} is null for {@link RegionReplicationSink#add} because the
Expand Down Expand Up @@ -7557,8 +7558,11 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
storeFiles, storeFilesSizes, seqId, replicate);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
clusterIds == null
? WALKey.EMPTY_UUIDS
: clusterIds.stream().map(UUID::fromString).collect(Collectors.toList()),
loadDescriptor, mvcc, regionReplicationSink.orElse(null));
} catch (IOException ioe) {
if (this.rsServices != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2338,11 +2338,6 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
final BulkLoadHFileRequest request) throws ServiceException {
long start = EnvironmentEdgeManager.currentTime();
List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
if (clusterIds.contains(this.server.getClusterId())) {
return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
} else {
clusterIds.add(this.server.getClusterId());
}
try {
checkOpen();
requestCount.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package org.apache.hadoop.hbase.regionserver.wal;

import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
import static org.apache.hadoop.hbase.wal.WALKey.EMPTY_UUIDS;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -78,8 +81,8 @@ private WALUtil() {
public static WALKeyImpl writeCompactionMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null, sink);
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, EMPTY_UUIDS,
WALEdit.createCompaction(hri, c), mvcc, null, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
Expand All @@ -94,7 +97,7 @@ public static WALKeyImpl writeCompactionMarker(WAL wal,
public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc,
RegionReplicationSink sink) throws IOException {
WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri, EMPTY_UUIDS,
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
Expand All @@ -109,7 +112,7 @@ public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer>
public static WALKeyImpl writeRegionEventMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, EMPTY_UUIDS,
WALEdit.createRegionEventWALEdit(hri, r), mvcc, null, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
Expand All @@ -130,9 +133,10 @@ public static WALKeyImpl writeRegionEventMarker(WAL wal,
*/
public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc,
final RegionReplicationSink sink) throws IOException {
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
final List<UUID> clusterIds, final WALProtos.BulkLoadDescriptor desc,
final MultiVersionConcurrencyControl mvcc, final RegionReplicationSink sink)
throws IOException {
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, clusterIds,
WALEdit.createBulkLoadEvent(hri, desc), mvcc, null, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
Expand All @@ -141,12 +145,13 @@ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
}

private static WALKeyImpl writeMarker(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
final RegionReplicationSink sink) throws IOException {
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final List<UUID> clusterIds, final WALEdit edit, final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes, final RegionReplicationSink sink)
throws IOException {
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes,
true, sink);
return doFullMarkerAppendTransaction(wal, replicationScope, hri, clusterIds, edit, mvcc,
extendedAttributes, true, sink);
}

/**
Expand All @@ -158,11 +163,12 @@ private static WALKeyImpl writeMarker(final WAL wal,
* @return WALKeyImpl that was added to the WAL.
*/
private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
final boolean sync, final RegionReplicationSink sink) throws IOException {
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final List<UUID> clusterIds, final WALEdit edit, final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes, final boolean sync,
final RegionReplicationSink sink) throws IOException {
// TODO: Pass in current time to use?
WALKeyImpl walKey = createWALKey(hri, mvcc, replicationScope, extendedAttributes);
WALKeyImpl walKey = createWALKey(hri, clusterIds, mvcc, replicationScope, extendedAttributes);
long trx = MultiVersionConcurrencyControl.NONE;
try {
trx = wal.appendMarker(hri, walKey, edit);
Expand Down Expand Up @@ -191,11 +197,11 @@ private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
return walKey;
}

public static WALKeyImpl createWALKey(final RegionInfo hri, MultiVersionConcurrencyControl mvcc,
final NavigableMap<byte[], Integer> replicationScope,
public static WALKeyImpl createWALKey(final RegionInfo hri, final List<UUID> clusterIds,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope,
final Map<String, byte[]> extendedAttributes) {
return new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
EnvironmentEdgeManager.currentTime(), clusterIds, mvcc, replicationScope, extendedAttributes);
}

/**
Expand Down Expand Up @@ -246,7 +252,7 @@ public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrenc
RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException {
NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL);
writeMarker(wal, replicationScope, regionInfo,
writeMarker(wal, replicationScope, regionInfo, EMPTY_UUIDS,
WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,11 @@ public void replicateEntries(List<WALEntry> entries, final ExtendedCellScanner c
}
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace

List<String> clusterIds = entry.getKey().getClusterIdsList().stream()
.map(k -> toUUID(k).toString()).collect(Collectors.toList());
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
bulkLoadsPerClusters.computeIfAbsent(clusterIds, k -> new HashMap<>());
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
} else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
cleanUpHFileRefs(source, entry.getEdit());
LOG.trace("shipped entry {}: ", entry);
}
// Log and clean up WAL logs
Expand Down Expand Up @@ -229,7 +229,8 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
}

private void cleanUpHFileRefs(WALEdit edit) throws IOException {
protected static void cleanUpHFileRefs(ReplicationSource source, WALEdit edit)
throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {
// peerClusterZnode will be in the form peerId + "-" + rsZNode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void run() {
}
}
}
} catch (WALEntryFilterRetryableException e) {
} catch (WALEntryFilterRetryableException | IOException e) {
// here we have to recreate the WALEntryStream, as when filtering, we have already called
// next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it
// just considers everything is fine,that's why the catch block is not in the inner block
Expand Down Expand Up @@ -231,7 +231,7 @@ protected static final boolean switched(WALEntryStream entryStream, Path path) {
// This is required in case there is any exception in while reading entries
// we do not want to loss the existing entries in the batch
protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
throws InterruptedException {
throws InterruptedException, IOException {
Path currentPath = entryStream.getCurrentPath();
for (;;) {
Entry entry = entryStream.next();
Expand Down Expand Up @@ -282,7 +282,7 @@ private WALEntryBatch createBatch(WALEntryStream entryStream) {
return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
}

protected final Entry filterEntry(Entry entry) {
protected final Entry filterEntry(Entry entry) throws IOException {
// Always replicate if this edit is Replication Marker edit.
if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
return entry;
Expand All @@ -291,6 +291,8 @@ protected final Entry filterEntry(Entry entry) {
if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
LOG.trace("Filtered entry for replication: {}", entry);
source.getSourceMetrics().incrLogEditsFiltered();
// Filtered Bulk Load HFiles need to be cleaned up
ReplicationSourceShipper.cleanUpHFileRefs(source, entry.getEdit());
}
return filtered;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,

@Override
protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
throws InterruptedException {
throws InterruptedException, IOException {
Path currentPath = entryStream.getCurrentPath();
long positionBefore = entryStream.getPosition();
for (;;) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, fin
}

public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope,
Map<String, byte[]> extendedAttributes) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
List<UUID> clusterIds, MultiVersionConcurrencyControl mvcc,
final NavigableMap<byte[], Integer> replicationScope, Map<String, byte[]> extendedAttributes) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, replicationScope, extendedAttributes);
}

Expand Down
Loading