Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
Expand All @@ -51,7 +52,6 @@
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
Expand Down Expand Up @@ -358,13 +358,13 @@ private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetry
e);
}
}
// Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key
if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) {
// Endpoints implementing HBaseReplicationEndpoint need to check cluster key
if (endpoint == null || endpoint instanceof HBaseReplicationEndpoint) {
Copy link
Contributor

@shahrs87 shahrs87 Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for this issue, it doesn't matter whether it is an instance of HBaseInterClusterReplicationEndpoint or HBaseReplicationEndpoint, correct ? since the test is skipping the check completely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I modified here is that the target of HBaseReplicationEndpoint is an HBase cluster, for all its implementations including HBaseInterClusterReplicationEndpoint, we should check clusterKey.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound reasonable.

checkClusterKey(peerConfig.getClusterKey());
}
// Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster
if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
checkClusterId(peerConfig.getClusterKey());
// Check if endpoint can replicate to the same cluster
if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
checkSameClusterKey(peerConfig.getClusterKey());
}
}

if (peerConfig.replicateAllUserTables()) {
Expand Down Expand Up @@ -510,7 +510,7 @@ private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
}
}

private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
private void checkSameClusterKey(String clusterKey) throws DoNotRetryIOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is checking whether cluster id is same or not, so IMO the method name is correct. We can change it to checkSameClusterId if you like.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original method name checkClusterId is inappropriate, clusterId and clusterKey are not the same thing in replication, So I kept the same style as the above method checkClusterKey and renamed it checkSameClusterKey.

String peerClusterId = "";
try {
// Create the peer cluster config for get peer cluster id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ MediumTests.class, ReplicationTests.class })
public class TestNonHBaseReplicationEndpoint {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNonHBaseReplicationEndpoint.class);

private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();

private static Admin ADMIN;

private static final TableName tableName = TableName.valueOf("test");
private static final byte[] famName = Bytes.toBytes("f");

private static final AtomicBoolean REPLICATED = new AtomicBoolean();

@BeforeClass
public static void setupBeforeClass() throws Exception {
UTIL.startMiniCluster();
ADMIN = UTIL.getAdmin();
}

@AfterClass
public static void teardownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}

@Before
public void setup() {
REPLICATED.set(false);
}

@Test
public void test() throws IOException {
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build();
Table table = UTIL.createTable(td, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);

ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setReplicationEndpointImpl(NonHBaseReplicationEndpoint.class.getName())
.setReplicateAllUserTables(false)
.setTableCFsMap(new HashMap<TableName, List<String>>() {{
put(tableName, new ArrayList<>());
}
}).build();

ADMIN.addReplicationPeer("1", peerConfig);
loadData(table);

UTIL.waitFor(10000L, () -> REPLICATED.get());
}

protected static void loadData(Table table) throws IOException {
for (int i = 0; i < 100; i++) {
Put put = new Put(Bytes.toBytes(Integer.toString(i)));
put.addColumn(famName, famName, Bytes.toBytes(i));
table.put(put);
}
}

public static class NonHBaseReplicationEndpoint implements ReplicationEndpoint {

private boolean running = false;

@Override
public void init(Context context) throws IOException {
}

@Override
public boolean canReplicateToSameCluster() {
return false;
}

@Override
public UUID getPeerUUID() {
return UUID.randomUUID();
}

@Override
public WALEntryFilter getWALEntryfilter() {
return null;
}

@Override
public boolean replicate(ReplicateContext replicateContext) {
REPLICATED.set(true);
return true;
}

@Override
public boolean isRunning() {
return running;
}

@Override
public boolean isStarting() {
return false;
}

@Override
public void start() {
running = true;
}

@Override
public void awaitRunning() {
long interval = 100L;
while (!running) {
Threads.sleep(interval);
}
}

@Override
public void awaitRunning(long timeout, TimeUnit unit) {
long start = System.currentTimeMillis();
long end = start + unit.toMillis(timeout);
long interval = 100L;
while (!running && System.currentTimeMillis() < end) {
Threads.sleep(interval);
}
}

@Override
public void stop() {
running = false;
}

@Override
public void awaitTerminated() {
long interval = 100L;
while (running) {
Threads.sleep(interval);
}
}

@Override
public void awaitTerminated(long timeout, TimeUnit unit) {
long start = System.currentTimeMillis();
long end = start + unit.toMillis(timeout);
long interval = 100L;
while (running && System.currentTimeMillis() < end) {
Threads.sleep(interval);
}
}

@Override
public Throwable failureCause() {
return null;
}

@Override
public void peerConfigUpdated(ReplicationPeerConfig rpc) {
}
}
}