From 6791363d2a8f4fa320afd0bfe81be5b04f75daaa Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 14 Aug 2025 09:59:17 +0800 Subject: [PATCH] =?UTF-8?q?HBASE-29463=20Bidirectional=20serial=20replicat?= =?UTF-8?q?ion=20will=20block=20if=20a=20region=E2=80=99s=20last=20edit=20?= =?UTF-8?q?before=20rs=20crashed=20was=20from=20the=20peer=20cluster=20(#7?= =?UTF-8?q?172)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Dimiduk (cherry picked from commit bea4272960e0c3a92eaf436ec6cc1c5a2527ffc3) --- .../replication/ChainWALEntryFilter.java | 7 ++ .../ClusterMarkingEntryFilter.java | 4 +- .../replication/ScopeWALEntryFilter.java | 16 ++-- .../hbase/replication/WALEntryFilter.java | 14 ++++ .../hbase/replication/WALEntryFilterBase.java | 66 ++++++++++++++++ .../regionserver/ReplicationSource.java | 1 + .../SerialReplicationSourceWALReader.java | 16 ++-- ...TestBidirectionSerialReplicationStuck.java | 79 +++++++++++++++++++ .../replication/TestReplicationBase.java | 31 ++++++-- .../TestReplicationWALEntryFilters.java | 11 ++- 10 files changed, 218 insertions(+), 27 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java index aa84f4705b0f..9683472f3bab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java @@ -53,6 +53,13 @@ public ChainWALEntryFilter(List filters) { initCellFilters(); } + @Override + public void setSerial(boolean serial) { + for (WALEntryFilter filter : filters) { + filter.setSerial(serial); + } + } + public void initCellFilters() { ArrayList cellFilters = new ArrayList<>(filters.length); for (WALEntryFilter filter : filters) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java index e05e79eab5a3..041b9798857b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java @@ -31,7 +31,7 @@ */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) @InterfaceStability.Evolving -public class ClusterMarkingEntryFilter implements WALEntryFilter { +public class ClusterMarkingEntryFilter extends WALEntryFilterBase { private UUID clusterId; private UUID peerClusterId; private ReplicationEndpoint replicationEndpoint; @@ -64,6 +64,6 @@ public Entry filter(Entry entry) { return entry; } } - return null; + return clearOrNull(entry); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java index 6dc41bcc014a..1429379deb3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java @@ -26,24 +26,26 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Predicate; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; /** * Keeps KVs that are scoped other than local */ @InterfaceAudience.Private -public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter { +public class ScopeWALEntryFilter extends WALEntryFilterBase implements WALCellFilter { private final BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); @Override public Entry filter(Entry entry) { - // Do not filter out an entire entry by replication scopes. As now we support serial - // replication, the sequence id of a marker is also needed by upper layer. We will filter out - // all the cells in the filterCell method below if the replication scopes is null or empty. - return entry; + NavigableMap scopes = entry.getKey().getReplicationScopes(); + if (MapUtils.isNotEmpty(scopes)) { + return entry; + } + return clearOrNull(entry); } - private boolean hasGlobalScope(NavigableMap scopes, byte[] family) { + private static boolean hasGlobalScope(NavigableMap scopes, byte[] family) { Integer scope = scopes.get(family); return scope != null && scope.intValue() == HConstants.REPLICATION_SCOPE_GLOBAL; } @@ -51,7 +53,7 @@ private boolean hasGlobalScope(NavigableMap scopes, byte[] fami @Override public Cell filterCell(Entry entry, Cell cell) { NavigableMap scopes = entry.getKey().getReplicationScopes(); - if (scopes == null || scopes.isEmpty()) { + if (MapUtils.isEmpty(scopes)) { return null; } byte[] family = CellUtil.cloneFamily(cell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java index 8aa60f74ebba..d77fedd94bca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java @@ -50,4 +50,18 @@ public interface WALEntryFilter { * the entry to be skipped for replication. */ Entry filter(Entry entry); + + /** + * Tell the filter whether the peer is a serial replication peer. + *

+ * For serial replication, usually you should not filter out an entire entry, unless the peer + * config does not contain the table, because we need the region name and sequence id of the entry + * to advance the pushed sequence id, otherwise the replication may be blocked. You can just + * filter out all the cells of the entry to stop it being replicated to peer cluster,or just rely + * on the {@link WALCellFilter#filterCell(Entry, org.apache.hadoop.hbase.Cell)} method to filter + * all the cells out. + * @param serial {@code true} if the peer is a serial replication peer, otherwise {@code false} + */ + default void setSerial(boolean serial) { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java new file mode 100644 index 000000000000..81efae0e7a82 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Base class for {@link WALEntryFilter}, store the necessary common properties like + * {@link #serial}. + *

+ * Why need to treat serial replication specially: + *

+ * Under some special cases, we may filter out some entries but we still need to record the last + * pushed sequence id for these entries. For example, when we setup a bidirection replication A + * <-> B, if we write to both cluster A and cluster B, cluster A will not replicate the + * entries which are replicated from cluster B, which means we may have holes in the replication + * sequence ids. So if the region is closed abnormally, i.e, we do not have a close event for the + * region, and before the closing, we have some entries from cluster B, then the replication from + * cluster A to cluster B will be stuck if we do not record the last pushed sequence id of these + * entries because we will find out that the previous sequence id range will never finish. So we + * need to record the sequence id for these entries so the last pushed sequence id can reach the + * region barrier. + * @see HBASE-29463 + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public abstract class WALEntryFilterBase implements WALEntryFilter { + + protected boolean serial; + + @Override + public void setSerial(boolean serial) { + this.serial = serial; + } + + /** + * Call this method when you do not need to replicate the entry. + *

+ * For serial replication, since still need to WALKey for recording progress, we clear all the + * cells of the WALEdit. For normal replication, we just return null. + */ + protected final Entry clearOrNull(Entry entry) { + if (serial) { + entry.getEdit().getCells().clear(); + return entry; + } else { + return null; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index aebdccdc92dc..2ce155125540 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -332,6 +332,7 @@ private void initializeWALEntryFilter(UUID peerClusterId) { } filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); this.walEntryFilter = new ChainWALEntryFilter(filters); + this.walEntryFilter.setSerial(replicationPeer.getPeerConfig().isSerial()); } private void tryStartNewShipper(String walGroupId) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 41d95df28219..d1a2e8b57340 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -97,17 +97,14 @@ protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) } sleepMultiplier = sleep(sleepMultiplier); } - // arrive here means we can push the entry, record the last sequence id - batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()), - entry.getKey().getSequenceId()); // actually remove the entry. - removeEntryFromStream(entryStream, batch); + removeEntryFromStream(entry, entryStream, batch); if (addEntryToBatch(batch, entry)) { break; } } else { // actually remove the entry. - removeEntryFromStream(entryStream, batch); + removeEntryFromStream(null, entryStream, batch); } WALEntryStream.HasNext hasNext = entryStream.hasNext(); // always return if we have switched to a new file. @@ -125,9 +122,14 @@ protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) } } - private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) { + private void removeEntryFromStream(Entry entry, WALEntryStream entryStream, WALEntryBatch batch) { entryStream.next(); - firstCellInEntryBeforeFiltering = null; batch.setLastWalPosition(entryStream.getPosition()); + // record last pushed sequence id if needed + if (entry != null) { + batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()), + entry.getKey().getSequenceId()); + } + firstCellInEntryBeforeFiltering = null; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java new file mode 100644 index 000000000000..f069d6b1095b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestBidirectionSerialReplicationStuck extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBidirectionSerialReplicationStuck.class); + + @Override + protected boolean isSerialPeer() { + return true; + } + + @Override + public void setUpBase() throws Exception { + UTIL1.ensureSomeRegionServersAvailable(2); + hbaseAdmin.balancerSwitch(false, true); + addPeer(PEER_ID2, tableName, UTIL1, UTIL2); + addPeer(PEER_ID2, tableName, UTIL2, UTIL1); + } + + @Override + public void tearDownBase() throws Exception { + removePeer(PEER_ID2, UTIL1); + removePeer(PEER_ID2, UTIL2); + } + + @Test + public void testStuck() throws Exception { + // disable the peer cluster1 -> cluster2 + hbaseAdmin.disableReplicationPeer(PEER_ID2); + byte[] qualifier = Bytes.toBytes("q"); + htable1.put(new Put(Bytes.toBytes("aaa-1")).addColumn(famName, qualifier, Bytes.toBytes(1))); + + // add a row to cluster2 and wait it replicate back to cluster1 + htable2.put(new Put(Bytes.toBytes("aaa-2")).addColumn(famName, qualifier, Bytes.toBytes(2))); + UTIL1.waitFor(30000, () -> htable1.exists(new Get(Bytes.toBytes("aaa-2")))); + + // kill the region server which holds the region which contains our rows + UTIL1.getRSForFirstRegionInTable(tableName).abort("for testing"); + // wait until the region is online + UTIL1.waitFor(30000, () -> htable1.exists(new Get(Bytes.toBytes("aaa-2")))); + + // put a new row in cluster1 + htable1.put(new Put(Bytes.toBytes("aaa-3")).addColumn(famName, qualifier, Bytes.toBytes(3))); + + // enable peer cluster1 -> cluster2, the new row should be replicated to cluster2 + hbaseAdmin.enableReplicationPeer(PEER_ID2); + UTIL1.waitFor(30000, () -> htable2.exists(new Get(Bytes.toBytes("aaa-3")))); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index fba25aee4a39..b021dcbc0d40 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -272,16 +272,27 @@ public static void setUpBeforeClass() throws Exception { } private boolean peerExist(String peerId) throws IOException { - return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); + return peerExist(peerId, UTIL1); + } + + private boolean peerExist(String peerId, HBaseTestingUtility util) throws IOException { + return util.getAdmin().listReplicationPeers().stream() + .anyMatch(p -> peerId.equals(p.getPeerId())); } protected final void addPeer(String peerId, TableName tableName) throws Exception { - if (!peerExist(peerId)) { - ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() - .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()) - .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName()); - hbaseAdmin.addReplicationPeer(peerId, builder.build()); + addPeer(peerId, tableName, UTIL1, UTIL2); + } + + protected final void addPeer(String peerId, TableName tableName, HBaseTestingUtility source, + HBaseTestingUtility target) throws Exception { + if (peerExist(peerId, source)) { + return; } + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() + .setClusterKey(target.getClusterKey()).setSerial(isSerialPeer()) + .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName()); + source.getAdmin().addReplicationPeer(peerId, builder.build()); } @Before @@ -290,8 +301,12 @@ public void setUpBase() throws Exception { } protected final void removePeer(String peerId) throws Exception { - if (peerExist(peerId)) { - hbaseAdmin.removeReplicationPeer(peerId); + removePeer(peerId, UTIL1); + } + + protected final void removePeer(String peerId, HBaseTestingUtility util) throws Exception { + if (peerExist(peerId, util)) { + util.getAdmin().removeReplicationPeer(peerId); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 1e26a940b2f8..762945d745ba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -113,15 +113,20 @@ public void testScopeWALEntryFilter() { Entry userEntryEmpty = createEntry(null); // no scopes - // now we will not filter out entries without a replication scope since serial replication still - // need the sequence id, but the cells will all be filtered out. + assertNull(filter.filter(userEntry)); + // now for serial replication, we will not filter out entries without a replication scope since + // serial replication still need the sequence id, but the cells will all be filtered out. + filter.setSerial(true); assertTrue(filter.filter(userEntry).getEdit().isEmpty()); + filter.setSerial(false); // empty scopes - // ditto TreeMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); userEntry = createEntry(scopes, a, b); + assertNull(filter.filter(userEntry)); + filter.setSerial(true); assertTrue(filter.filter(userEntry).getEdit().isEmpty()); + filter.setSerial(false); // different scope scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);