Skip to content

Commit bea4272

Browse files
authored
HBASE-29463 Bidirectional serial replication will block if a region’s last edit before rs crashed was from the peer cluster (#7172)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
1 parent 8e25d75 commit bea4272

10 files changed

Lines changed: 228 additions & 37 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ public ChainWALEntryFilter(List<WALEntryFilter> filters) {
5454
initCellFilters();
5555
}
5656

57+
@Override
58+
public void setSerial(boolean serial) {
59+
for (WALEntryFilter filter : filters) {
60+
filter.setSerial(serial);
61+
}
62+
}
63+
5764
public void initCellFilters() {
5865
ArrayList<WALCellFilter> cellFilters = new ArrayList<>(filters.length);
5966
for (WALEntryFilter filter : filters) {

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
*/
3232
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
3333
@InterfaceStability.Evolving
34-
public class ClusterMarkingEntryFilter implements WALEntryFilter {
34+
public class ClusterMarkingEntryFilter extends WALEntryFilterBase {
3535
private UUID clusterId;
3636
private UUID peerClusterId;
3737
private ReplicationEndpoint replicationEndpoint;
@@ -64,6 +64,6 @@ public Entry filter(Entry entry) {
6464
return entry;
6565
}
6666
}
67-
return null;
67+
return clearOrNull(entry);
6868
}
6969
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,26 @@
2828
import org.apache.yetus.audience.InterfaceAudience;
2929

3030
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
31+
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
3132

3233
/**
3334
* Keeps KVs that are scoped other than local
3435
*/
3536
@InterfaceAudience.Private
36-
public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
37+
public class ScopeWALEntryFilter extends WALEntryFilterBase implements WALCellFilter {
3738

3839
private final BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
3940

4041
@Override
4142
public Entry filter(Entry entry) {
42-
// Do not filter out an entire entry by replication scopes. As now we support serial
43-
// replication, the sequence id of a marker is also needed by upper layer. We will filter out
44-
// all the cells in the filterCell method below if the replication scopes is null or empty.
45-
return entry;
43+
NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
44+
if (MapUtils.isNotEmpty(scopes)) {
45+
return entry;
46+
}
47+
return clearOrNull(entry);
4648
}
4749

48-
private boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, byte[] family) {
50+
private static boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, byte[] family) {
4951
Integer scope = scopes.get(family);
5052
return scope != null && scope.intValue() == HConstants.REPLICATION_SCOPE_GLOBAL;
5153
}
@@ -54,7 +56,7 @@ private boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, byte[] fami
5456
public Cell filterCell(Entry entry, Cell cell) {
5557
ExtendedCell extendedCell = PrivateCellUtil.ensureExtendedCell(cell);
5658
NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
57-
if (scopes == null || scopes.isEmpty()) {
59+
if (MapUtils.isEmpty(scopes)) {
5860
return null;
5961
}
6062
byte[] family = CellUtil.cloneFamily(cell);

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,18 @@ public interface WALEntryFilter {
5050
* the entry to be skipped for replication.
5151
*/
5252
Entry filter(Entry entry);
53+
54+
/**
55+
* Tell the filter whether the peer is a serial replication peer.
56+
* <p>
57+
* For serial replication, usually you should not filter out an entire entry, unless the peer
58+
* config does not contain the table, because we need the region name and sequence id of the entry
59+
* to advance the pushed sequence id, otherwise the replication may be blocked. You can just
60+
* filter out all the cells of the entry to stop it being replicated to peer cluster,or just rely
61+
* on the {@link WALCellFilter#filterCell(Entry, org.apache.hadoop.hbase.Cell)} method to filter
62+
* all the cells out.
63+
* @param serial {@code true} if the peer is a serial replication peer, otherwise {@code false}
64+
*/
65+
default void setSerial(boolean serial) {
66+
}
5367
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
21+
import org.apache.hadoop.hbase.wal.WAL.Entry;
22+
import org.apache.yetus.audience.InterfaceAudience;
23+
24+
/**
25+
* Base class for {@link WALEntryFilter}, store the necessary common properties like
26+
* {@link #serial}.
27+
* <p>
28+
* Why need to treat serial replication specially:
29+
* <p>
30+
* Under some special cases, we may filter out some entries but we still need to record the last
31+
* pushed sequence id for these entries. For example, when we setup a bidirection replication A
32+
* &lt;-&gt; B, if we write to both cluster A and cluster B, cluster A will not replicate the
33+
* entries which are replicated from cluster B, which means we may have holes in the replication
34+
* sequence ids. So if the region is closed abnormally, i.e, we do not have a close event for the
35+
* region, and before the closing, we have some entries from cluster B, then the replication from
36+
* cluster A to cluster B will be stuck if we do not record the last pushed sequence id of these
37+
* entries because we will find out that the previous sequence id range will never finish. So we
38+
* need to record the sequence id for these entries so the last pushed sequence id can reach the
39+
* region barrier.
40+
* @see <a href="https://issues.apache.org/jira/browse/HBASE-29463">HBASE-29463</a>
41+
*/
42+
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
43+
public abstract class WALEntryFilterBase implements WALEntryFilter {
44+
45+
protected boolean serial;
46+
47+
@Override
48+
public void setSerial(boolean serial) {
49+
this.serial = serial;
50+
}
51+
52+
/**
53+
* Call this method when you do not need to replicate the entry.
54+
* <p>
55+
* For serial replication, since still need to WALKey for recording progress, we clear all the
56+
* cells of the WALEdit. For normal replication, we just return null.
57+
*/
58+
protected final Entry clearOrNull(Entry entry) {
59+
if (serial) {
60+
entry.getEdit().getCells().clear();
61+
return entry;
62+
} else {
63+
return null;
64+
}
65+
}
66+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ private void initializeWALEntryFilter(UUID peerClusterId) {
334334
}
335335
filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
336336
this.walEntryFilter = new ChainWALEntryFilter(filters);
337+
this.walEntryFilter.setSerial(replicationPeer.getPeerConfig().isSerial());
337338
}
338339

339340
private long getStartOffset(String walGroupId) {

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,17 +97,14 @@ protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
9797
}
9898
sleepMultiplier = sleep(sleepMultiplier);
9999
}
100-
// arrive here means we can push the entry, record the last sequence id
101-
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
102-
entry.getKey().getSequenceId());
103100
// actually remove the entry.
104-
removeEntryFromStream(entryStream, batch);
101+
removeEntryFromStream(entry, entryStream, batch);
105102
if (addEntryToBatch(batch, entry)) {
106103
break;
107104
}
108105
} else {
109106
// actually remove the entry.
110-
removeEntryFromStream(entryStream, batch);
107+
removeEntryFromStream(null, entryStream, batch);
111108
}
112109
WALEntryStream.HasNext hasNext = entryStream.hasNext();
113110
// always return if we have switched to a new file.
@@ -125,9 +122,14 @@ protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
125122
}
126123
}
127124

128-
private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) {
125+
private void removeEntryFromStream(Entry entry, WALEntryStream entryStream, WALEntryBatch batch) {
129126
entryStream.next();
130-
firstCellInEntryBeforeFiltering = null;
131127
batch.setLastWalPosition(entryStream.getPosition());
128+
// record last pushed sequence id if needed
129+
if (entry != null) {
130+
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
131+
entry.getKey().getSequenceId());
132+
}
133+
firstCellInEntryBeforeFiltering = null;
132134
}
133135
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import org.apache.hadoop.hbase.HBaseClassTestRule;
21+
import org.apache.hadoop.hbase.client.Get;
22+
import org.apache.hadoop.hbase.client.Put;
23+
import org.apache.hadoop.hbase.testclassification.LargeTests;
24+
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
25+
import org.apache.hadoop.hbase.util.Bytes;
26+
import org.junit.ClassRule;
27+
import org.junit.Test;
28+
import org.junit.experimental.categories.Category;
29+
30+
@Category({ ReplicationTests.class, LargeTests.class })
31+
public class TestBidirectionSerialReplicationStuck extends TestReplicationBase {
32+
33+
@ClassRule
34+
public static final HBaseClassTestRule CLASS_RULE =
35+
HBaseClassTestRule.forClass(TestBidirectionSerialReplicationStuck.class);
36+
37+
@Override
38+
protected boolean isSerialPeer() {
39+
return true;
40+
}
41+
42+
@Override
43+
public void setUpBase() throws Exception {
44+
UTIL1.ensureSomeRegionServersAvailable(2);
45+
hbaseAdmin.balancerSwitch(false, true);
46+
addPeer(PEER_ID2, tableName, UTIL1, UTIL2);
47+
addPeer(PEER_ID2, tableName, UTIL2, UTIL1);
48+
}
49+
50+
@Override
51+
public void tearDownBase() throws Exception {
52+
removePeer(PEER_ID2, UTIL1);
53+
removePeer(PEER_ID2, UTIL2);
54+
}
55+
56+
@Test
57+
public void testStuck() throws Exception {
58+
// disable the peer cluster1 -> cluster2
59+
hbaseAdmin.disableReplicationPeer(PEER_ID2);
60+
byte[] qualifier = Bytes.toBytes("q");
61+
htable1.put(new Put(Bytes.toBytes("aaa-1")).addColumn(famName, qualifier, Bytes.toBytes(1)));
62+
63+
// add a row to cluster2 and wait it replicate back to cluster1
64+
htable2.put(new Put(Bytes.toBytes("aaa-2")).addColumn(famName, qualifier, Bytes.toBytes(2)));
65+
UTIL1.waitFor(30000, () -> htable1.exists(new Get(Bytes.toBytes("aaa-2"))));
66+
67+
// kill the region server which holds the region which contains our rows
68+
UTIL1.getRSForFirstRegionInTable(tableName).abort("for testing");
69+
// wait until the region is online
70+
UTIL1.waitFor(30000, () -> htable1.exists(new Get(Bytes.toBytes("aaa-2"))));
71+
72+
// put a new row in cluster1
73+
htable1.put(new Put(Bytes.toBytes("aaa-3")).addColumn(famName, qualifier, Bytes.toBytes(3)));
74+
75+
// enable peer cluster1 -> cluster2, the new row should be replicated to cluster2
76+
hbaseAdmin.enableReplicationPeer(PEER_ID2);
77+
UTIL1.waitFor(30000, () -> htable2.exists(new Get(Bytes.toBytes("aaa-3"))));
78+
}
79+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,12 @@ public static void setUpBeforeClass() throws Exception {
279279
}
280280

281281
private boolean peerExist(String peerId) throws IOException {
282-
return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
282+
return peerExist(peerId, UTIL1);
283+
}
284+
285+
private boolean peerExist(String peerId, HBaseTestingUtil util) throws IOException {
286+
return util.getAdmin().listReplicationPeers().stream()
287+
.anyMatch(p -> peerId.equals(p.getPeerId()));
283288
}
284289

285290
// can be override in tests, in case you need to use zk based uri, or the old style uri
@@ -288,22 +293,28 @@ protected String getClusterKey(HBaseTestingUtil util) throws Exception {
288293
}
289294

290295
protected final void addPeer(String peerId, TableName tableName) throws Exception {
291-
if (!peerExist(peerId)) {
292-
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
293-
.setClusterKey(getClusterKey(UTIL2)).setSerial(isSerialPeer())
294-
.setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
295-
if (isSyncPeer()) {
296-
FileSystem fs2 = UTIL2.getTestFileSystem();
297-
// The remote wal dir is not important as we do not use it in DA state, here we only need to
298-
// confirm that a sync peer in DA state can still replicate data to remote cluster
299-
// asynchronously.
300-
builder.setReplicateAllUserTables(false)
301-
.setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of()))
302-
.setRemoteWALDir(new Path("/RemoteWAL")
303-
.makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString());
304-
}
305-
hbaseAdmin.addReplicationPeer(peerId, builder.build());
296+
addPeer(peerId, tableName, UTIL1, UTIL2);
297+
}
298+
299+
protected final void addPeer(String peerId, TableName tableName, HBaseTestingUtil source,
300+
HBaseTestingUtil target) throws Exception {
301+
if (peerExist(peerId, source)) {
302+
return;
303+
}
304+
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
305+
.setClusterKey(getClusterKey(target)).setSerial(isSerialPeer())
306+
.setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
307+
if (isSyncPeer()) {
308+
FileSystem fs2 = target.getTestFileSystem();
309+
// The remote wal dir is not important as we do not use it in DA state, here we only need to
310+
// confirm that a sync peer in DA state can still replicate data to remote cluster
311+
// asynchronously.
312+
builder.setReplicateAllUserTables(false)
313+
.setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of()))
314+
.setRemoteWALDir(new Path("/RemoteWAL")
315+
.makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString());
306316
}
317+
source.getAdmin().addReplicationPeer(peerId, builder.build());
307318
}
308319

309320
@Before
@@ -312,8 +323,12 @@ public void setUpBase() throws Exception {
312323
}
313324

314325
protected final void removePeer(String peerId) throws Exception {
315-
if (peerExist(peerId)) {
316-
hbaseAdmin.removeReplicationPeer(peerId);
326+
removePeer(peerId, UTIL1);
327+
}
328+
329+
protected final void removePeer(String peerId, HBaseTestingUtil util) throws Exception {
330+
if (peerExist(peerId, util)) {
331+
util.getAdmin().removeReplicationPeer(peerId);
317332
}
318333
}
319334

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,20 @@ public void testScopeWALEntryFilter() {
107107
Entry userEntryEmpty = createEntry(null);
108108

109109
// no scopes
110-
// now we will not filter out entries without a replication scope since serial replication still
111-
// need the sequence id, but the cells will all be filtered out.
110+
assertNull(filter.filter(userEntry));
111+
// now for serial replication, we will not filter out entries without a replication scope since
112+
// serial replication still need the sequence id, but the cells will all be filtered out.
113+
filter.setSerial(true);
112114
assertTrue(filter.filter(userEntry).getEdit().isEmpty());
115+
filter.setSerial(false);
113116

114117
// empty scopes
115-
// ditto
116118
TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
117119
userEntry = createEntry(scopes, a, b);
120+
assertNull(filter.filter(userEntry));
121+
filter.setSerial(true);
118122
assertTrue(filter.filter(userEntry).getEdit().isEmpty());
123+
filter.setSerial(false);
119124

120125
// different scope
121126
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);

0 commit comments

Comments
 (0)