Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
Expand All @@ -60,6 +61,8 @@
public final class ReplicationPeerConfigUtil {

private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
public static final String HBASE_REPLICATION_PEER_BASE_CONFIG =
"hbase.replication.peer.base.config";

private ReplicationPeerConfigUtil() {}

Expand Down Expand Up @@ -450,6 +453,41 @@ public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
return builder.build();
}

/**
* Helper method to add base peer configs from Configuration to ReplicationPeerConfig
* if not present in latter.
*
* This merges the user supplied peer configuration
* {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs
* provided as property hbase.replication.peer.base.configs in hbase configuration.
* Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1". Original value
* of conf is retained if already present in ReplicationPeerConfig.
*
* @param conf Configuration
* @return ReplicationPeerConfig containing updated configs.
*/
public static ReplicationPeerConfig addBasePeerConfigsIfNotPresent(Configuration conf,
ReplicationPeerConfig receivedPeerConfig) {
String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig.
newBuilder(receivedPeerConfig);
Map<String,String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Remove multiple extraneous new lines in this method.

if (basePeerConfigs.length() != 0) {
Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
.withKeyValueSeparator("=").split(basePeerConfigs);
for (Map.Entry<String,String> entry : basePeerConfigMap.entrySet()) {
String configName = entry.getKey();
String configValue = entry.getValue();
// Only override if base config does not exist in existing peer configs
if (!receivedPeerConfigMap.containsKey(configName)) {
copiedPeerConfigBuilder.putConfiguration(configName, configValue);
}
}
}
return copiedPeerConfigBuilder.build();
}

public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
throws ReplicationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -34,9 +35,12 @@
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
Expand Down Expand Up @@ -215,4 +219,47 @@ public void testNoSyncReplicationState()
assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
STORAGE.getNewSyncReplicationStateNode(peerId)));
}

@Test
public void testBaseReplicationPeerConfig() {
String customPeerConfigKey = "hbase.xxx.custom_config";
String customPeerConfigValue = "test";
String customPeerConfigUpdatedValue = "testUpdated";

String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
String customPeerConfigSecondValue = "testSecond";
String customPeerConfigSecondUpdatedValue = "testSecondUpdated";

ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);

// custom config not present
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);

Configuration conf = UTIL.getConfiguration();
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";").
concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));

ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
addBasePeerConfigsIfNotPresent(conf,existingReplicationPeerConfig);

// validates base configs are present in replicationPeerConfig
assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().
get(customPeerConfigKey));
assertEquals(customPeerConfigSecondValue, updatedReplicationPeerConfig.getConfiguration().
get(customPeerConfigSecondKey));

// validates base configs does not override value if config already present
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";").
concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));

ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil.
addBasePeerConfigsIfNotPresent(conf,updatedReplicationPeerConfig);

assertEquals(customPeerConfigValue, replicationPeerConfigAfterValueUpdate.
getConfiguration().get(customPeerConfigKey));
assertEquals(customPeerConfigSecondValue, replicationPeerConfigAfterValueUpdate.
getConfiguration().get(customPeerConfigSecondKey));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
Expand Down Expand Up @@ -232,6 +233,7 @@ public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean ena
// this should be a retry, just return
return;
}
peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
SyncReplicationState syncReplicationState =
copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
Expand Down Expand Up @@ -546,6 +548,9 @@ public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, St
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
for (String peerId : peerStorage.listPeerIds()) {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);

peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
peerStorage.updatePeerConfig(peerId, peerConfig);
boolean enabled = peerStorage.isPeerEnabled(peerId);
SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -441,6 +443,84 @@ public void testCyclicReplication3() throws Exception {
}
}

/**
* Tests that base replication peer configs are applied on peer creation
* and the configs are overriden if updated as part of updateReplicationPeerConfig()
*
*/
@Test
public void testBasePeerConfigsForPeerMutations()
throws Exception {
LOG.info("testBasePeerConfigsForPeerMutations");
String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
String firstCustomPeerConfigValue = "test";
String firstCustomPeerConfigUpdatedValue = "test_updated";

String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config";
String secondCustomPeerConfigValue = "testSecond";
String secondCustomPeerConfigUpdatedValue = "testSecondUpdated";
try {
baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
startMiniClusters(2);
addPeer("1", 0, 1);
addPeer("2", 0, 1);
Admin admin = utilities[0].getAdmin();

// Validates base configs 1 is present for both peer.
Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
getConfiguration().get(firstCustomPeerConfigKey));
Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
getConfiguration().get(firstCustomPeerConfigKey));

// override value of configuration 1 for peer "1".
ReplicationPeerConfig updatedReplicationConfigForPeer1 = ReplicationPeerConfig.
newBuilder(admin.getReplicationPeerConfig("1")).
putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build();

// add configuration 2 for peer "2".
ReplicationPeerConfig updatedReplicationConfigForPeer2 = ReplicationPeerConfig.
newBuilder(admin.getReplicationPeerConfig("2")).
putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build();

admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1);
admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2);

// validates configuration is overridden by updateReplicationPeerConfig
Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
getConfiguration().get(firstCustomPeerConfigKey));
Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
getConfiguration().get(secondCustomPeerConfigKey));

// Add second config to base config and perform restart.
utilities[0].getConfiguration().set(ReplicationPeerConfigUtil.
HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("=").
concat(firstCustomPeerConfigValue).concat(";").concat(secondCustomPeerConfigKey)
.concat("=").concat(secondCustomPeerConfigValue));

utilities[0].shutdownMiniHBaseCluster();
utilities[0].restartHBaseCluster(1);
admin = utilities[0].getAdmin();

// Both retains the value of base configuration 1 value as before restart.
// Peer 1 (Update value), Peer 2 (Base Value)
Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
getConfiguration().get(firstCustomPeerConfigKey));
Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
getConfiguration().get(firstCustomPeerConfigKey));

// Peer 1 gets new base config as part of restart.
Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
getConfiguration().get(secondCustomPeerConfigKey));
// Peer 2 retains the updated value as before restart.
Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
getConfiguration().get(secondCustomPeerConfigKey));
} finally {
shutDownMiniClusters();
baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
}
}

@After
public void tearDown() throws IOException {
configurations = null;
Expand Down