Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ public ChainWALEntryFilter(List<WALEntryFilter> filters) {
initCellFilters();
}

@Override
public void setSerial(boolean serial) {
for (WALEntryFilter filter : filters) {
filter.setSerial(serial);
}
}

public void initCellFilters() {
ArrayList<WALCellFilter> cellFilters = new ArrayList<>(filters.length);
for (WALEntryFilter filter : filters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
@InterfaceStability.Evolving
public class ClusterMarkingEntryFilter implements WALEntryFilter {
public class ClusterMarkingEntryFilter extends WALEntryFilterBase {
private UUID clusterId;
private UUID peerClusterId;
private ReplicationEndpoint replicationEndpoint;
Expand Down Expand Up @@ -64,6 +64,6 @@ public Entry filter(Entry entry) {
return entry;
}
}
return null;
return clearOrNull(entry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,34 @@
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;

/**
* Keeps KVs that are scoped other than local
*/
@InterfaceAudience.Private
public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
public class ScopeWALEntryFilter extends WALEntryFilterBase implements WALCellFilter {

private final BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();

@Override
public Entry filter(Entry entry) {
// Do not filter out an entire entry by replication scopes. As now we support serial
// replication, the sequence id of a marker is also needed by upper layer. We will filter out
// all the cells in the filterCell method below if the replication scopes is null or empty.
return entry;
NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
if (MapUtils.isNotEmpty(scopes)) {
return entry;
}
return clearOrNull(entry);
}

private boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, byte[] family) {
private static boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, byte[] family) {
Integer scope = scopes.get(family);
return scope != null && scope.intValue() == HConstants.REPLICATION_SCOPE_GLOBAL;
}

@Override
public Cell filterCell(Entry entry, Cell cell) {
NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
if (scopes == null || scopes.isEmpty()) {
if (MapUtils.isEmpty(scopes)) {
return null;
}
byte[] family = CellUtil.cloneFamily(cell);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,18 @@ public interface WALEntryFilter {
* the entry to be skipped for replication.
*/
Entry filter(Entry entry);

/**
* Tell the filter whether the peer is a serial replication peer.
* <p>
* For serial replication, usually you should not filter out an entire entry, unless the peer
* config does not contain the table, because we need the region name and sequence id of the entry
* to advance the pushed sequence id, otherwise the replication may be blocked. You can just
* filter out all the cells of the entry to stop it being replicated to peer cluster,or just rely
* on the {@link WALCellFilter#filterCell(Entry, org.apache.hadoop.hbase.Cell)} method to filter
* all the cells out.
* @param serial {@code true} if the peer is a serial replication peer, otherwise {@code false}
*/
default void setSerial(boolean serial) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Base class for {@link WALEntryFilter}, store the necessary common properties like
* {@link #serial}.
* <p>
* Why need to treat serial replication specially:
* <p>
* Under some special cases, we may filter out some entries but we still need to record the last
* pushed sequence id for these entries. For example, when we setup a bidirection replication A
* &lt;-&gt; B, if we write to both cluster A and cluster B, cluster A will not replicate the
* entries which are replicated from cluster B, which means we may have holes in the replication
* sequence ids. So if the region is closed abnormally, i.e, we do not have a close event for the
* region, and before the closing, we have some entries from cluster B, then the replication from
* cluster A to cluster B will be stuck if we do not record the last pushed sequence id of these
* entries because we will find out that the previous sequence id range will never finish. So we
* need to record the sequence id for these entries so the last pushed sequence id can reach the
* region barrier.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-29463">HBASE-29463</a>
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public abstract class WALEntryFilterBase implements WALEntryFilter {

protected boolean serial;

@Override
public void setSerial(boolean serial) {
this.serial = serial;
}

/**
* Call this method when you do not need to replicate the entry.
* <p>
* For serial replication, since still need to WALKey for recording progress, we clear all the
* cells of the WALEdit. For normal replication, we just return null.
*/
protected final Entry clearOrNull(Entry entry) {
if (serial) {
entry.getEdit().getCells().clear();
return entry;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ private void initializeWALEntryFilter(UUID peerClusterId) {
}
filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
this.walEntryFilter = new ChainWALEntryFilter(filters);
this.walEntryFilter.setSerial(replicationPeer.getPeerConfig().isSerial());
}

private void tryStartNewShipper(String walGroupId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,14 @@ protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
}
sleepMultiplier = sleep(sleepMultiplier);
}
// arrive here means we can push the entry, record the last sequence id
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
entry.getKey().getSequenceId());
// actually remove the entry.
removeEntryFromStream(entryStream, batch);
removeEntryFromStream(entry, entryStream, batch);
if (addEntryToBatch(batch, entry)) {
break;
}
} else {
// actually remove the entry.
removeEntryFromStream(entryStream, batch);
removeEntryFromStream(null, entryStream, batch);
}
WALEntryStream.HasNext hasNext = entryStream.hasNext();
// always return if we have switched to a new file.
Expand All @@ -125,9 +122,14 @@ protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
}
}

private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) {
private void removeEntryFromStream(Entry entry, WALEntryStream entryStream, WALEntryBatch batch) {
entryStream.next();
firstCellInEntryBeforeFiltering = null;
batch.setLastWalPosition(entryStream.getPosition());
// record last pushed sequence id if needed
if (entry != null) {
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
entry.getKey().getSequenceId());
}
firstCellInEntryBeforeFiltering = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ ReplicationTests.class, LargeTests.class })
public class TestBidirectionSerialReplicationStuck extends TestReplicationBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBidirectionSerialReplicationStuck.class);

@Override
protected boolean isSerialPeer() {
return true;
}

@Override
public void setUpBase() throws Exception {
UTIL1.ensureSomeRegionServersAvailable(2);
hbaseAdmin.balancerSwitch(false, true);
addPeer(PEER_ID2, tableName, UTIL1, UTIL2);
addPeer(PEER_ID2, tableName, UTIL2, UTIL1);
}

@Override
public void tearDownBase() throws Exception {
removePeer(PEER_ID2, UTIL1);
removePeer(PEER_ID2, UTIL2);
}

@Test
public void testStuck() throws Exception {
// disable the peer cluster1 -> cluster2
hbaseAdmin.disableReplicationPeer(PEER_ID2);
byte[] qualifier = Bytes.toBytes("q");
htable1.put(new Put(Bytes.toBytes("aaa-1")).addColumn(famName, qualifier, Bytes.toBytes(1)));

// add a row to cluster2 and wait it replicate back to cluster1
htable2.put(new Put(Bytes.toBytes("aaa-2")).addColumn(famName, qualifier, Bytes.toBytes(2)));
UTIL1.waitFor(30000, () -> htable1.exists(new Get(Bytes.toBytes("aaa-2"))));

// kill the region server which holds the region which contains our rows
UTIL1.getRSForFirstRegionInTable(tableName).abort("for testing");
// wait until the region is online
UTIL1.waitFor(30000, () -> htable1.exists(new Get(Bytes.toBytes("aaa-2"))));

// put a new row in cluster1
htable1.put(new Put(Bytes.toBytes("aaa-3")).addColumn(famName, qualifier, Bytes.toBytes(3)));

// enable peer cluster1 -> cluster2, the new row should be replicated to cluster2
hbaseAdmin.enableReplicationPeer(PEER_ID2);
UTIL1.waitFor(30000, () -> htable2.exists(new Get(Bytes.toBytes("aaa-3"))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,27 @@ public static void setUpBeforeClass() throws Exception {
}

private boolean peerExist(String peerId) throws IOException {
return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
return peerExist(peerId, UTIL1);
}

private boolean peerExist(String peerId, HBaseTestingUtility util) throws IOException {
return util.getAdmin().listReplicationPeers().stream()
.anyMatch(p -> peerId.equals(p.getPeerId()));
}

protected final void addPeer(String peerId, TableName tableName) throws Exception {
if (!peerExist(peerId)) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer())
.setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
hbaseAdmin.addReplicationPeer(peerId, builder.build());
addPeer(peerId, tableName, UTIL1, UTIL2);
}

protected final void addPeer(String peerId, TableName tableName, HBaseTestingUtility source,
HBaseTestingUtility target) throws Exception {
if (peerExist(peerId, source)) {
return;
}
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
.setClusterKey(target.getClusterKey()).setSerial(isSerialPeer())
.setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
source.getAdmin().addReplicationPeer(peerId, builder.build());
}

@Before
Expand All @@ -290,8 +301,12 @@ public void setUpBase() throws Exception {
}

protected final void removePeer(String peerId) throws Exception {
if (peerExist(peerId)) {
hbaseAdmin.removeReplicationPeer(peerId);
removePeer(peerId, UTIL1);
}

protected final void removePeer(String peerId, HBaseTestingUtility util) throws Exception {
if (peerExist(peerId, util)) {
util.getAdmin().removeReplicationPeer(peerId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,20 @@ public void testScopeWALEntryFilter() {
Entry userEntryEmpty = createEntry(null);

// no scopes
// now we will not filter out entries without a replication scope since serial replication still
// need the sequence id, but the cells will all be filtered out.
assertNull(filter.filter(userEntry));
// now for serial replication, we will not filter out entries without a replication scope since
// serial replication still need the sequence id, but the cells will all be filtered out.
filter.setSerial(true);
assertTrue(filter.filter(userEntry).getEdit().isEmpty());
filter.setSerial(false);

// empty scopes
// ditto
TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
userEntry = createEntry(scopes, a, b);
assertNull(filter.filter(userEntry));
filter.setSerial(true);
assertTrue(filter.filter(userEntry).getEdit().isEmpty());
filter.setSerial(false);

// different scope
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Expand Down