diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index feaa3a4ec7fb..2b50d76ca5d0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2641,20 +2641,16 @@ public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableN ByteString encodedRegionName, Map> storeFiles, Map storeFilesSize, long bulkloadSeqId) { return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles, storeFilesSize, - bulkloadSeqId, null, true); + bulkloadSeqId, true); } public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName, Map> storeFiles, - Map storeFilesSize, long bulkloadSeqId, List clusterIds, - boolean replicate) { + Map storeFilesSize, long bulkloadSeqId, boolean replicate) { BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder().setTableName(ProtobufUtil.toProtoTableName(tableName)) .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId) .setReplicate(replicate); - if (clusterIds != null) { - desc.addAllClusterIds(clusterIds); - } for (Map.Entry> entry : storeFiles.entrySet()) { WALProtos.StoreDescriptor.Builder builder = diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto index ba12dcf3edfc..9b5929ee9ca4 100644 --- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto @@ -152,7 +152,6 @@ message BulkLoadDescriptor { required bytes encoded_region_name = 2; repeated StoreDescriptor stores = 3; required int64 bulkload_seq_num = 4; - repeated string cluster_ids = 5; optional bool replicate = 6 [default = true]; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0113d439dc02..160b04efbeb6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7077,8 +7077,11 @@ public Map> bulkLoadHFiles(Collection> f WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(), UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()), - storeFiles, storeFilesSizes, seqId, clusterIds, replicate); + storeFiles, storeFilesSizes, seqId, replicate); WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(), + clusterIds == null + ? WALKey.EMPTY_UUIDS + : clusterIds.stream().map(UUID::fromString).collect(Collectors.toList()), loadDescriptor, mvcc); } catch (IOException ioe) { if (this.rsServices != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e246da4bd83d..393699f838d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2418,11 +2418,6 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, final BulkLoadHFileRequest request) throws ServiceException { long start = EnvironmentEdgeManager.currentTime(); List clusterIds = new ArrayList<>(request.getClusterIdsList()); - if (clusterIds.contains(this.regionServer.clusterId)) { - return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); - } else { - clusterIds.add(this.regionServer.clusterId); - } try { checkOpen(); requestCount.increment(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index dd8d152da8bb..215421a96d0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; +import static org.apache.hadoop.hbase.wal.WALKey.EMPTY_UUIDS; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.UUID; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -75,8 +78,8 @@ private WALUtil() { public static WALKeyImpl writeCompactionMarker(WAL wal, NavigableMap replicationScope, RegionInfo hri, final CompactionDescriptor c, MultiVersionConcurrencyControl mvcc) throws IOException { - WALKeyImpl walKey = - writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null); + WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, EMPTY_UUIDS, + WALEdit.createCompaction(hri, c), mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } @@ -91,7 +94,7 @@ public static WALKeyImpl writeCompactionMarker(WAL wal, public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap replicationScope, RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) throws IOException { - WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri, + WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri, EMPTY_UUIDS, WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); @@ -106,8 +109,8 @@ public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap public static WALKeyImpl writeRegionEventMarker(WAL wal, NavigableMap replicationScope, RegionInfo hri, RegionEventDescriptor r, MultiVersionConcurrencyControl mvcc) throws IOException { - WALKeyImpl walKey = - writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null); + WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, EMPTY_UUIDS, + WALEdit.createRegionEventWALEdit(hri, r), mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); } @@ -127,10 +130,10 @@ public static WALKeyImpl writeRegionEventMarker(WAL wal, */ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, final NavigableMap replicationScope, final RegionInfo hri, - final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc) - throws IOException { - WALKeyImpl walKey = - writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null); + List clusterIds, final WALProtos.BulkLoadDescriptor desc, + final MultiVersionConcurrencyControl mvcc) throws IOException { + WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, clusterIds, + WALEdit.createBulkLoadEvent(hri, desc), mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc)); } @@ -138,12 +141,12 @@ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, } private static WALKeyImpl writeMarker(final WAL wal, - NavigableMap replicationScope, RegionInfo hri, WALEdit edit, - MultiVersionConcurrencyControl mvcc, Map extendedAttributes) + NavigableMap replicationScope, RegionInfo hri, List clusterIds, + WALEdit edit, MultiVersionConcurrencyControl mvcc, Map extendedAttributes) throws IOException { // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT - return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes, - true); + return doFullMarkerAppendTransaction(wal, replicationScope, hri, clusterIds, edit, mvcc, + extendedAttributes, true); } /** @@ -155,12 +158,12 @@ private static WALKeyImpl writeMarker(final WAL wal, * @return WALKeyImpl that was added to the WAL. */ private static WALKeyImpl doFullMarkerAppendTransaction(WAL wal, - NavigableMap replicationScope, RegionInfo hri, final WALEdit edit, - MultiVersionConcurrencyControl mvcc, Map extendedAttributes, boolean sync) - throws IOException { + NavigableMap replicationScope, RegionInfo hri, final List clusterIds, + final WALEdit edit, MultiVersionConcurrencyControl mvcc, Map extendedAttributes, + boolean sync) throws IOException { // TODO: Pass in current time to use? WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), - EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes); + EnvironmentEdgeManager.currentTime(), clusterIds, mvcc, replicationScope, extendedAttributes); long trx = MultiVersionConcurrencyControl.NONE; try { trx = wal.appendMarker(hri, walKey, edit); @@ -232,7 +235,7 @@ public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrenc RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException { NavigableMap replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL); - writeMarker(wal, replicationScope, regionInfo, + writeMarker(wal, replicationScope, regionInfo, EMPTY_UUIDS, WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 961e4d859662..a3f55237c8ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -249,8 +249,11 @@ public void replicateEntries(List entries, final CellScanner cells, } // Map of table name Vs list of pair of family and list of // hfile paths from its namespace + + List clusterIds = entry.getKey().getClusterIdsList().stream() + .map(k -> toUUID(k).toString()).collect(Collectors.toList()); Map>>> bulkLoadHFileMap = - bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); + bulkLoadsPerClusters.computeIfAbsent(clusterIds, k -> new HashMap<>()); buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 746c845908f2..c499f1424882 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -185,7 +185,7 @@ private void shipEdits(WALEntryBatch entryBatch) { } // Clean up hfile references for (Entry entry : entries) { - cleanUpHFileRefs(entry.getEdit()); + cleanUpHFileRefs(source, entry.getEdit()); LOG.trace("shipped entry {}: ", entry); } // Log and clean up WAL logs @@ -219,7 +219,8 @@ private void shipEdits(WALEntryBatch entryBatch) { } } - private void cleanUpHFileRefs(WALEdit edit) throws IOException { + protected static void cleanUpHFileRefs(ReplicationSource source, WALEdit edit) + throws IOException { String peerId = source.getPeerId(); if (peerId.contains("-")) { // peerClusterZnode will be in the form peerId + "-" + rsZNode. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 26360cbe3ea1..f407b6b596f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -191,7 +191,7 @@ public void run() { } } } - } catch (WALEntryFilterRetryableException e) { + } catch (WALEntryFilterRetryableException | IOException e) { // here we have to recreate the WALEntryStream, as when filtering, we have already called // next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it // just considers everything is fine,that's why the catch block is not in the inner block @@ -235,7 +235,7 @@ protected static final boolean switched(WALEntryStream entryStream, Path path) { // This is required in case there is any exception in while reading entries // we do not want to loss the existing entries in the batch protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) - throws InterruptedException { + throws InterruptedException, IOException { Path currentPath = entryStream.getCurrentPath(); for (;;) { Entry entry = entryStream.next(); @@ -286,7 +286,7 @@ private WALEntryBatch createBatch(WALEntryStream entryStream) { return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); } - protected final Entry filterEntry(Entry entry) { + protected final Entry filterEntry(Entry entry) throws IOException { // Always replicate if this edit is Replication Marker edit. if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) { return entry; @@ -295,6 +295,8 @@ protected final Entry filterEntry(Entry entry) { if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { LOG.trace("Filtered entry for replication: {}", entry); source.getSourceMetrics().incrLogEditsFiltered(); + // Filtered Bulk Load HFiles need to be cleaned up + ReplicationSourceShipper.cleanUpHFileRefs(source, entry.getEdit()); } return filtered; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index d1a2e8b57340..5b482d06505b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -55,7 +55,7 @@ public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf, @Override protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) - throws InterruptedException { + throws InterruptedException, IOException { Path currentPath = entryStream.getCurrentPath(); long positionBefore = entryStream.getPosition(); for (;;) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java index aff98431e564..6c6a917b55d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java @@ -152,9 +152,9 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, fin } public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, - MultiVersionConcurrencyControl mvcc, final NavigableMap replicationScope, - Map extendedAttributes) { - init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, + List clusterIds, MultiVersionConcurrencyControl mvcc, + final NavigableMap replicationScope, Map extendedAttributes) { + init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, replicationScope, extendedAttributes); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java index 9e3759311c32..a16a82c35c6a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; @@ -61,6 +62,9 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -113,6 +117,8 @@ public class TestBulkLoadReplication extends TestReplicationBase { private static AtomicInteger BULK_LOADS_COUNT; private static CountDownLatch BULK_LOAD_LATCH; + private static AtomicInteger REPLICATION_COUNT; + protected static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility(); protected static final Configuration CONF3 = UTIL3.getConfiguration(); @@ -124,7 +130,7 @@ public class TestBulkLoadReplication extends TestReplicationBase { @ClassRule public static TemporaryFolder testFolder = new TemporaryFolder(); - private static ReplicationQueueStorage queueStorage; + private static final List queueStorages = new ArrayList<>(); private static boolean replicationPeersAdded = false; @@ -136,8 +142,11 @@ public static void setUpBeforeClass() throws Exception { setupConfig(UTIL3, "/3"); TestReplicationBase.setUpBeforeClass(); startThirdCluster(); - queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getZooKeeperWatcher(), - UTIL1.getConfiguration()); + for (HBaseTestingUtility util : new HBaseTestingUtility[] { UTIL1, UTIL2, UTIL3 }) { + ReplicationQueueStorage queueStorage = ReplicationStorageFactory + .getReplicationQueueStorage(util.getZooKeeperWatcher(), util.getConfiguration()); + queueStorages.add(queueStorage); + } } private static void startThirdCluster() throws Exception { @@ -183,6 +192,7 @@ public void setUpBase() throws Exception { } BULK_LOADS_COUNT = new AtomicInteger(0); + REPLICATION_COUNT = new AtomicInteger(0); } private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) { @@ -191,21 +201,37 @@ private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) } private void setupCoprocessor(HBaseTestingUtility cluster) { + Configuration conf = cluster.getConfiguration(); + String clusterKey = cluster.getClusterKey(); + Class cpClass = + TestBulkLoadReplication.BulkReplicationTestObserver.class; + cluster.getHBaseCluster().getRegions(tableName).forEach(r -> { try { - TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost() - .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); + RegionCoprocessorHost cpHost = r.getCoprocessorHost(); + TestBulkLoadReplication.BulkReplicationTestObserver cp = cpHost.findCoprocessor(cpClass); if (cp == null) { - r.getCoprocessorHost().load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, - cluster.getConfiguration()); - cp = r.getCoprocessorHost() - .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); - cp.clusterName = cluster.getClusterKey(); + cpHost.load(cpClass, 0, conf); + cp = cpHost.findCoprocessor(cpClass); + cp.clusterName = clusterKey; } } catch (Exception e) { LOG.error(e.getMessage(), e); } }); + + try { + RegionServerCoprocessorHost cpHost = + cluster.getHBaseCluster().getRegionServer(0).getRegionServerCoprocessorHost(); + TestBulkLoadReplication.BulkReplicationTestObserver cp = cpHost.findCoprocessor(cpClass); + if (cp == null) { + cpHost.load(cpClass, 0, conf); + cp = cpHost.findCoprocessor(cpClass); + cp.clusterName = clusterKey; + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } } protected static void setupBulkLoadConfigsForCluster(Configuration config, @@ -241,6 +267,13 @@ public void testBulkLoadReplicationActiveActive() throws Exception { // Each event gets 3 counts (the originator cluster, plus the two peers), // so BULK_LOADS_COUNT expected value is 3 * 3 = 9. assertEquals(9, BULK_LOADS_COUNT.get()); + // Each bulk load event gets replicated twice (to two peers), + // so REPLICATION_COUNT expected value is 3 * 2 = 6. + assertEquals(6, REPLICATION_COUNT.get()); + waitForReplicationQueuesToEmpty(); + for (ReplicationQueueStorage queueStorage : queueStorages) { + assertEquals(0, queueStorage.getAllHFileRefs().size()); + } } protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value, @@ -305,10 +338,12 @@ private String createHFileForFamilies(byte[] row, byte[] value, Configuration cl return hFileLocation.getAbsoluteFile().getAbsolutePath(); } - public static class BulkReplicationTestObserver implements RegionCoprocessor { + public static class BulkReplicationTestObserver + implements RegionCoprocessor, RegionServerCoprocessor { String clusterName; AtomicInteger bulkLoadCounts = new AtomicInteger(); + AtomicInteger replicationCount = new AtomicInteger(); @Override public Optional getRegionObserver() { @@ -325,6 +360,21 @@ public void postBulkLoadHFile(ObserverContext ctx, } }); } + + @Override + public Optional getRegionServerObserver() { + return Optional.of(new RegionServerObserver() { + + // FIXME: this is deprecated, but we need it for validating bulk load replication + @Override + public void + postReplicateLogEntries(final ObserverContext ctx) { + REPLICATION_COUNT.incrementAndGet(); + LOG.info("Replication succeeded. Total for {}: {}", clusterName, + replicationCount.addAndGet(1)); + } + }); + } } @Test @@ -339,7 +389,12 @@ public void testBulkloadReplicationActiveActiveForNoRepFamily() throws Exception // additional wait to make sure no extra bulk load happens Thread.sleep(400); assertEquals(1, BULK_LOADS_COUNT.get()); - assertEquals(0, queueStorage.getAllHFileRefs().size()); + // No replication should happen for no-replication family + assertEquals(0, REPLICATION_COUNT.get()); + waitForReplicationQueuesToEmpty(); + for (ReplicationQueueStorage queueStorage : queueStorages) { + assertEquals(0, queueStorage.getAllHFileRefs().size()); + } } private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value, @@ -403,4 +458,15 @@ private void assertTableNotHasValue(Table table, byte[] row, byte[] value) throw Result result = table.get(get); assertNotEquals(Bytes.toString(value), Bytes.toString(result.value())); } + + private void waitForReplicationQueuesToEmpty() { + Waiter.waitFor(CONF1, 1000, () -> { + for (ReplicationQueueStorage queueStorage : queueStorages) { + if (!queueStorage.getAllHFileRefs().isEmpty()) { + return false; + } + } + return true; + }); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 762945d745ba..694310cc8a6f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.UUID; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -427,6 +428,58 @@ public void testNamespaceTableCfWALEntryFilter2() { assertEquals(null, filter.filter(userEntry)); } + @Test + public void testClusterMarkingEntryFilter() { + // Setup cluster IDs + UUID clusterId = UUID.randomUUID(); + UUID peerClusterId = UUID.randomUUID(); + + // Mock ReplicationEndpoint + ReplicationEndpoint endpoint = mock(ReplicationEndpoint.class); + when(endpoint.canReplicateToSameCluster()).thenReturn(false); + + ClusterMarkingEntryFilter filter = + new ClusterMarkingEntryFilter(clusterId, peerClusterId, endpoint); + + // 1. Entry without any cluster IDs - should pass and be marked with clusterId + List emptyIds = new ArrayList<>(); + WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), + EnvironmentEdgeManager.currentTime(), emptyIds, null, null, null); + WALEdit edit1 = new WALEdit(); + edit1.add(new KeyValue(a, a, a)); + Entry entry1 = new Entry(key1, edit1); + + Entry filtered1 = filter.filter(entry1); + Assert.assertNotNull(filtered1); + Assert.assertTrue(filtered1.getKey().getClusterIds().contains(clusterId)); + + // 2. Entry with peerClusterId - should be filtered out (prevent circular replication) + List peerIds = new ArrayList<>(); + peerIds.add(peerClusterId); + WALKeyImpl key2 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), + EnvironmentEdgeManager.currentTime(), peerIds, null, null, null); + WALEdit edit2 = new WALEdit(); + edit2.add(new KeyValue(a, a, a)); + Entry entry2 = new Entry(key2, edit2); + + Assert.assertNull(filter.filter(entry2)); + + // 3. Entry with empty WALEdit - should be filtered out + WALKeyImpl key3 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), + EnvironmentEdgeManager.currentTime(), new ArrayList<>(), null, null, null); + WALEdit edit3 = new WALEdit(); + Entry entry3 = new Entry(key3, edit3); + + Assert.assertNull(filter.filter(entry3)); + + // 4. Entry with null WALEdit - should be filtered out + WALKeyImpl key4 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), + EnvironmentEdgeManager.currentTime(), new ArrayList<>(), null, null, null); + Entry entry4 = new Entry(key4, null); + + Assert.assertNull(filter.filter(entry4)); + } + private Entry createEntry(TreeMap scopes, byte[]... kvs) { WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), EnvironmentEdgeManager.currentTime(), scopes);