diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java index 09f5b9083644..bb6ffe5ef85c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java @@ -54,6 +54,13 @@ public ChainWALEntryFilter(List filters) { initCellFilters(); } + @Override + public void setSerial(boolean serial) { + for (WALEntryFilter filter : filters) { + filter.setSerial(serial); + } + } + public void initCellFilters() { ArrayList cellFilters = new ArrayList<>(filters.length); for (WALEntryFilter filter : filters) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java index e05e79eab5a3..041b9798857b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java @@ -31,7 +31,7 @@ */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) @InterfaceStability.Evolving -public class ClusterMarkingEntryFilter implements WALEntryFilter { +public class ClusterMarkingEntryFilter extends WALEntryFilterBase { private UUID clusterId; private UUID peerClusterId; private ReplicationEndpoint replicationEndpoint; @@ -64,6 +64,6 @@ public Entry filter(Entry entry) { return entry; } } - return null; + return clearOrNull(entry); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java index 897e06f4a9fd..10e1764fae91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java @@ -28,24 +28,26 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Predicate; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; /** * Keeps KVs that are scoped other than local */ @InterfaceAudience.Private -public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter { +public class ScopeWALEntryFilter extends WALEntryFilterBase implements WALCellFilter { private final BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); @Override public Entry filter(Entry entry) { - // Do not filter out an entire entry by replication scopes. As now we support serial - // replication, the sequence id of a marker is also needed by upper layer. We will filter out - // all the cells in the filterCell method below if the replication scopes is null or empty. - return entry; + NavigableMap scopes = entry.getKey().getReplicationScopes(); + if (MapUtils.isNotEmpty(scopes)) { + return entry; + } + return clearOrNull(entry); } - private boolean hasGlobalScope(NavigableMap scopes, byte[] family) { + private static boolean hasGlobalScope(NavigableMap scopes, byte[] family) { Integer scope = scopes.get(family); return scope != null && scope.intValue() == HConstants.REPLICATION_SCOPE_GLOBAL; } @@ -54,7 +56,7 @@ private boolean hasGlobalScope(NavigableMap scopes, byte[] fami public Cell filterCell(Entry entry, Cell cell) { ExtendedCell extendedCell = PrivateCellUtil.ensureExtendedCell(cell); NavigableMap scopes = entry.getKey().getReplicationScopes(); - if (scopes == null || scopes.isEmpty()) { + if (MapUtils.isEmpty(scopes)) { return null; } byte[] family = CellUtil.cloneFamily(cell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java index 8aa60f74ebba..d77fedd94bca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java @@ -50,4 +50,18 @@ public interface WALEntryFilter { * the entry to be skipped for replication. */ Entry filter(Entry entry); + + /** + * Tell the filter whether the peer is a serial replication peer. + *

+ * For serial replication, usually you should not filter out an entire entry, unless the peer + * config does not contain the table, because we need the region name and sequence id of the entry + * to advance the pushed sequence id, otherwise the replication may be blocked. You can just + * filter out all the cells of the entry to stop it being replicated to peer cluster,or just rely + * on the {@link WALCellFilter#filterCell(Entry, org.apache.hadoop.hbase.Cell)} method to filter + * all the cells out. + * @param serial {@code true} if the peer is a serial replication peer, otherwise {@code false} + */ + default void setSerial(boolean serial) { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java new file mode 100644 index 000000000000..81efae0e7a82 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Base class for {@link WALEntryFilter}, store the necessary common properties like + * {@link #serial}. + *

+ * Why need to treat serial replication specially: + *

+ * Under some special cases, we may filter out some entries but we still need to record the last + * pushed sequence id for these entries. For example, when we setup a bidirection replication A + * <-> B, if we write to both cluster A and cluster B, cluster A will not replicate the + * entries which are replicated from cluster B, which means we may have holes in the replication + * sequence ids. So if the region is closed abnormally, i.e, we do not have a close event for the + * region, and before the closing, we have some entries from cluster B, then the replication from + * cluster A to cluster B will be stuck if we do not record the last pushed sequence id of these + * entries because we will find out that the previous sequence id range will never finish. So we + * need to record the sequence id for these entries so the last pushed sequence id can reach the + * region barrier. + * @see HBASE-29463 + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public abstract class WALEntryFilterBase implements WALEntryFilter { + + protected boolean serial; + + @Override + public void setSerial(boolean serial) { + this.serial = serial; + } + + /** + * Call this method when you do not need to replicate the entry. + *

+ * For serial replication, since still need to WALKey for recording progress, we clear all the + * cells of the WALEdit. For normal replication, we just return null. + */ + protected final Entry clearOrNull(Entry entry) { + if (serial) { + entry.getEdit().getCells().clear(); + return entry; + } else { + return null; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 094fa4aaa786..dc17ed12ff0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -334,6 +334,7 @@ private void initializeWALEntryFilter(UUID peerClusterId) { } filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); this.walEntryFilter = new ChainWALEntryFilter(filters); + this.walEntryFilter.setSerial(replicationPeer.getPeerConfig().isSerial()); } private long getStartOffset(String walGroupId) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 41d95df28219..d1a2e8b57340 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -97,17 +97,14 @@ protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) } sleepMultiplier = sleep(sleepMultiplier); } - // arrive here means we can push the entry, record the last sequence id - batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()), - entry.getKey().getSequenceId()); // actually remove the entry. - removeEntryFromStream(entryStream, batch); + removeEntryFromStream(entry, entryStream, batch); if (addEntryToBatch(batch, entry)) { break; } } else { // actually remove the entry. - removeEntryFromStream(entryStream, batch); + removeEntryFromStream(null, entryStream, batch); } WALEntryStream.HasNext hasNext = entryStream.hasNext(); // always return if we have switched to a new file. @@ -125,9 +122,14 @@ protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) } } - private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) { + private void removeEntryFromStream(Entry entry, WALEntryStream entryStream, WALEntryBatch batch) { entryStream.next(); - firstCellInEntryBeforeFiltering = null; batch.setLastWalPosition(entryStream.getPosition()); + // record last pushed sequence id if needed + if (entry != null) { + batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()), + entry.getKey().getSequenceId()); + } + firstCellInEntryBeforeFiltering = null; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java new file mode 100644 index 000000000000..f069d6b1095b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestBidirectionSerialReplicationStuck extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBidirectionSerialReplicationStuck.class); + + @Override + protected boolean isSerialPeer() { + return true; + } + + @Override + public void setUpBase() throws Exception { + UTIL1.ensureSomeRegionServersAvailable(2); + hbaseAdmin.balancerSwitch(false, true); + addPeer(PEER_ID2, tableName, UTIL1, UTIL2); + addPeer(PEER_ID2, tableName, UTIL2, UTIL1); + } + + @Override + public void tearDownBase() throws Exception { + removePeer(PEER_ID2, UTIL1); + removePeer(PEER_ID2, UTIL2); + } + + @Test + public void testStuck() throws Exception { + // disable the peer cluster1 -> cluster2 + hbaseAdmin.disableReplicationPeer(PEER_ID2); + byte[] qualifier = Bytes.toBytes("q"); + htable1.put(new Put(Bytes.toBytes("aaa-1")).addColumn(famName, qualifier, Bytes.toBytes(1))); + + // add a row to cluster2 and wait it replicate back to cluster1 + htable2.put(new Put(Bytes.toBytes("aaa-2")).addColumn(famName, qualifier, Bytes.toBytes(2))); + UTIL1.waitFor(30000, () -> htable1.exists(new Get(Bytes.toBytes("aaa-2")))); + + // kill the region server which holds the region which contains our rows + UTIL1.getRSForFirstRegionInTable(tableName).abort("for testing"); + // wait until the region is online + UTIL1.waitFor(30000, () -> htable1.exists(new Get(Bytes.toBytes("aaa-2")))); + + // put a new row in cluster1 + htable1.put(new Put(Bytes.toBytes("aaa-3")).addColumn(famName, qualifier, Bytes.toBytes(3))); + + // enable peer cluster1 -> cluster2, the new row should be replicated to cluster2 + hbaseAdmin.enableReplicationPeer(PEER_ID2); + UTIL1.waitFor(30000, () -> htable2.exists(new Get(Bytes.toBytes("aaa-3")))); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 70a6d73c6202..b03b89d6d69a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -279,7 +279,12 @@ public static void setUpBeforeClass() throws Exception { } private boolean peerExist(String peerId) throws IOException { - return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); + return peerExist(peerId, UTIL1); + } + + private boolean peerExist(String peerId, HBaseTestingUtil util) throws IOException { + return util.getAdmin().listReplicationPeers().stream() + .anyMatch(p -> peerId.equals(p.getPeerId())); } // can be override in tests, in case you need to use zk based uri, or the old style uri @@ -288,22 +293,28 @@ protected String getClusterKey(HBaseTestingUtil util) throws Exception { } protected final void addPeer(String peerId, TableName tableName) throws Exception { - if (!peerExist(peerId)) { - ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() - .setClusterKey(getClusterKey(UTIL2)).setSerial(isSerialPeer()) - .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName()); - if (isSyncPeer()) { - FileSystem fs2 = UTIL2.getTestFileSystem(); - // The remote wal dir is not important as we do not use it in DA state, here we only need to - // confirm that a sync peer in DA state can still replicate data to remote cluster - // asynchronously. - builder.setReplicateAllUserTables(false) - .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())) - .setRemoteWALDir(new Path("/RemoteWAL") - .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString()); - } - hbaseAdmin.addReplicationPeer(peerId, builder.build()); + addPeer(peerId, tableName, UTIL1, UTIL2); + } + + protected final void addPeer(String peerId, TableName tableName, HBaseTestingUtil source, + HBaseTestingUtil target) throws Exception { + if (peerExist(peerId, source)) { + return; + } + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() + .setClusterKey(getClusterKey(target)).setSerial(isSerialPeer()) + .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName()); + if (isSyncPeer()) { + FileSystem fs2 = target.getTestFileSystem(); + // The remote wal dir is not important as we do not use it in DA state, here we only need to + // confirm that a sync peer in DA state can still replicate data to remote cluster + // asynchronously. + builder.setReplicateAllUserTables(false) + .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())) + .setRemoteWALDir(new Path("/RemoteWAL") + .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString()); } + source.getAdmin().addReplicationPeer(peerId, builder.build()); } @Before @@ -312,8 +323,12 @@ public void setUpBase() throws Exception { } protected final void removePeer(String peerId) throws Exception { - if (peerExist(peerId)) { - hbaseAdmin.removeReplicationPeer(peerId); + removePeer(peerId, UTIL1); + } + + protected final void removePeer(String peerId, HBaseTestingUtil util) throws Exception { + if (peerExist(peerId, util)) { + util.getAdmin().removeReplicationPeer(peerId); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index fb4729d87f83..897166a94000 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -107,15 +107,20 @@ public void testScopeWALEntryFilter() { Entry userEntryEmpty = createEntry(null); // no scopes - // now we will not filter out entries without a replication scope since serial replication still - // need the sequence id, but the cells will all be filtered out. + assertNull(filter.filter(userEntry)); + // now for serial replication, we will not filter out entries without a replication scope since + // serial replication still need the sequence id, but the cells will all be filtered out. + filter.setSerial(true); assertTrue(filter.filter(userEntry).getEdit().isEmpty()); + filter.setSerial(false); // empty scopes - // ditto TreeMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); userEntry = createEntry(scopes, a, b); + assertNull(filter.filter(userEntry)); + filter.setSerial(true); assertTrue(filter.filter(userEntry).getEdit().isEmpty()); + filter.setSerial(false); // different scope scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);