From 3ef0fdf5438458633889dda3778d8673b33cb592 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Mon, 12 Dec 2022 15:45:28 -0800 Subject: [PATCH 1/7] HBASE-27529 Attach WAL extended attributes to mutations at replication sink --- .../regionserver/ReplicationSink.java | 6 + ...tReplicationWithWALExtendedAttributes.java | 258 ++++++++++++++++++ 2 files changed, 264 insertions(+) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index cba8256163d5..e02bfde6fbac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -219,6 +219,7 @@ public void replicateEntries(List entries, final CellScanner cells, Cell previousCell = null; Mutation mutation = null; int count = entry.getAssociatedCellCount(); + List attributeList = entry.getKey().getExtendedAttributesList(); for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off if (!cells.advance()) { @@ -265,6 +266,11 @@ public void replicateEntries(List entries, final CellScanner cells, mutation.setClusterIds(clusterIds); mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, HConstants.EMPTY_BYTE_ARRAY); + if (attributeList != null) { + for (WALProtos.Attribute attribute : attributeList) { + mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray()); + } + } addToHashMultiMap(rowMap, table, clusterIds, mutation); } if (CellUtil.isDelete(cell)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java new file mode 100644 index 000000000000..021d4235f114 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationWithWALExtendedAttributes { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationWithWALExtendedAttributes.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestReplicationWithWALExtendedAttributes.class); + + private static Configuration conf1 = HBaseConfiguration.create(); + + private static Admin replicationAdmin; + + private static Connection connection1; + + private static Table htable1; + private static Table htable2; + + private static HBaseTestingUtil utility1; + private static HBaseTestingUtil utility2; + private static final long SLEEP_TIME = 500; + private static final int NB_RETRIES = 10; + + private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithWALAnnotation"); + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final byte[] ROW = Bytes.toBytes("row"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + conf1.setInt("replication.source.size.capacity", 10240); + conf1.setLong("replication.source.sleepforretries", 100); + conf1.setInt("hbase.regionserver.maxlogs", 10); + conf1.setLong("hbase.master.logcleaner.ttl", 10); + conf1.setInt("zookeeper.recovery.retry", 1); + conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf1.setInt("replication.stats.thread.period.seconds", 5); + conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); + conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + TestCoprocessorForWALAnnotationAtSource.class.getName()); + + utility1 = new HBaseTestingUtil(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + // Have to reget conf1 in case zk cluster location different + // than default + conf1 = utility1.getConfiguration(); + LOG.info("Setup first Zk"); + + // Base conf2 on conf1 so it gets the right zk cluster. + Configuration conf2 = HBaseConfiguration.create(conf1); + conf2.setInt("hfile.format.version", 3); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); + conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + TestCoprocessorForWALAnnotationAtSink.class.getName()); + + utility2 = new HBaseTestingUtil(conf2); + utility2.setZkCluster(miniZK); + + LOG.info("Setup second Zk"); + utility1.startMiniCluster(2); + utility2.startMiniCluster(2); + + connection1 = ConnectionFactory.createConnection(conf1); + replicationAdmin = connection1.getAdmin(); + ReplicationPeerConfig rpc = + ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build(); + replicationAdmin.addReplicationPeer("2", rpc); + + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .build(); + try (Connection conn = ConnectionFactory.createConnection(conf1); + Admin admin = conn.getAdmin()) { + admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); + } + try (Connection conn = ConnectionFactory.createConnection(conf2); + Admin admin = conn.getAdmin()) { + admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); + } + htable1 = utility1.getConnection().getTable(TABLE_NAME); + htable2 = utility2.getConnection().getTable(TABLE_NAME); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + Closeables.close(replicationAdmin, true); + Closeables.close(connection1, true); + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + @Test + public void testReplicationWithWALExtendedAttributes() throws Exception { + Put put = new Put(ROW); + put.addColumn(FAMILY, ROW, ROW); + + htable1 = utility1.getConnection().getTable(TABLE_NAME); + htable1.put(put); + + Put put2 = new Put(ROW2); + put2.addColumn(FAMILY, ROW2, ROW2); + + htable1.batch(Collections.singletonList(put2), new Object[1]); + + assertGetValues(new Get(ROW), ROW); + assertGetValues(new Get(ROW2), ROW2); + } + + private static void assertGetValues(Get get, byte[] value) + throws IOException, InterruptedException { + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.isEmpty()) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(value, res.value()); + break; + } + } + } + + public static class TestCoprocessorForWALAnnotationAtSource + implements RegionCoprocessor, RegionObserver { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preWALAppend(ObserverContext ctx, WALKey key, + WALEdit edit) throws IOException { + key.addExtendedAttribute("extendedAttr1", Bytes.toBytes("Value of Extended attribute 01")); + key.addExtendedAttribute("extendedAttr2", Bytes.toBytes("Value of Extended attribute 02")); + } + } + + public static class TestCoprocessorForWALAnnotationAtSink + implements RegionCoprocessor, RegionObserver { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void prePut(ObserverContext c, Put put, WALEdit edit) + throws IOException { + String attrVal1 = Bytes.toString(put.getAttribute("extendedAttr1")); + String attrVal2 = Bytes.toString(put.getAttribute("extendedAttr2")); + if (attrVal1 == null || attrVal2 == null) { + throw new IOException("Failed to retrieve WAL annotations"); + } + if ( + attrVal1.equals("Value of Extended attribute 01") + && attrVal2.equals("Value of Extended attribute 02") + ) { + return; + } + throw new IOException("Failed to retrieve WAL annotations.."); + } + + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + String attrVal1 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr1")); + String attrVal2 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr2")); + if (attrVal1 == null || attrVal2 == null) { + throw new IOException("Failed to retrieve WAL annotations"); + } + if ( + attrVal1.equals("Value of Extended attribute 01") + && attrVal2.equals("Value of Extended attribute 02") + ) { + return; + } + throw new IOException("Failed to retrieve WAL annotations.."); + } + } +} From 9789d4d0c097bfe96b9f23d4248586ae1bef74e4 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Tue, 13 Dec 2022 10:14:39 -0800 Subject: [PATCH 2/7] configurable change --- .../regionserver/ReplicationSink.java | 23 +++++++++++++++---- ...tReplicationWithWALExtendedAttributes.java | 2 ++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index e02bfde6fbac..e18379365489 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -107,6 +107,14 @@ public class ReplicationSink { private SourceFSConfigurationProvider provider; private WALEntrySinkFilter walEntrySinkFilter; + /** + * If enabled at sink cluster site config, extended WAL attributes would be attached as Mutation + * attributes. This is useful for source cluster coproc to provide coproc specific metadata as WAL + * annotations and have them attached back to Mutations generated from WAL entries at sink side. + */ + public static final String HBASE_REPLICATION_SINK_ATTRIBUTES_WAL_TO_MUTATIONS = + "hbase.replication.sink.attributes.wal.to.mutations"; + /** * Row size threshold for multi requests above which a warning is logged */ @@ -266,10 +274,8 @@ public void replicateEntries(List entries, final CellScanner cells, mutation.setClusterIds(clusterIds); mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, HConstants.EMPTY_BYTE_ARRAY); - if (attributeList != null) { - for (WALProtos.Attribute attribute : attributeList) { - mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray()); - } + if (this.conf.getBoolean(HBASE_REPLICATION_SINK_ATTRIBUTES_WAL_TO_MUTATIONS, false)) { + attachWALExtendedAttributesToMutation(mutation, attributeList); } addToHashMultiMap(rowMap, table, clusterIds, mutation); } @@ -321,6 +327,15 @@ public void replicateEntries(List entries, final CellScanner cells, } } + private static void attachWALExtendedAttributesToMutation(Mutation mutation, + List attributeList) { + if (attributeList != null) { + for (WALProtos.Attribute attribute : attributeList) { + mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray()); + } + } + } + /* * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not. * If false, then ignore this cell. If set to true, de-serialize value into diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java index 021d4235f114..eb723451f641 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -125,6 +126,7 @@ public static void setUpBeforeClass() throws Exception { conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, TestCoprocessorForWALAnnotationAtSink.class.getName()); + conf2.setBoolean(ReplicationSink.HBASE_REPLICATION_SINK_ATTRIBUTES_WAL_TO_MUTATIONS, true); utility2 = new HBaseTestingUtil(conf2); utility2.setZkCluster(miniZK); From de0228fd99258605be0da9e26b738b04327ba3da Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Fri, 16 Dec 2022 16:49:54 -0800 Subject: [PATCH 3/7] minor change --- .../hadoop/hbase/replication/regionserver/ReplicationSink.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index e18379365489..72e5611bc004 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -227,7 +227,6 @@ public void replicateEntries(List entries, final CellScanner cells, Cell previousCell = null; Mutation mutation = null; int count = entry.getAssociatedCellCount(); - List attributeList = entry.getKey().getExtendedAttributesList(); for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off if (!cells.advance()) { @@ -275,6 +274,8 @@ public void replicateEntries(List entries, final CellScanner cells, mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, HConstants.EMPTY_BYTE_ARRAY); if (this.conf.getBoolean(HBASE_REPLICATION_SINK_ATTRIBUTES_WAL_TO_MUTATIONS, false)) { + List attributeList = + entry.getKey().getExtendedAttributesList(); attachWALExtendedAttributesToMutation(mutation, attributeList); } addToHashMultiMap(rowMap, table, clusterIds, mutation); From 8699e507a2da5c0abfd05971b2c24e1b7c549610 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Wed, 4 Jan 2023 16:46:05 -0800 Subject: [PATCH 4/7] addressing review --- .../coprocessor/RegionServerObserver.java | 17 ++++++++ .../RegionServerCoprocessorHost.java | 13 +++++++ .../ReplicationSinkServiceImpl.java | 8 +++- .../regionserver/ReplicationSink.java | 26 ++----------- ...tReplicationWithWALExtendedAttributes.java | 39 ++++++++++++++++++- .../regionserver/TestReplicationSink.java | 30 +++++++------- .../regionserver/TestWALEntrySinkFilter.java | 2 +- 7 files changed, 94 insertions(+), 41 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java index dc37ac324eb5..16caf7873229 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java @@ -19,10 +19,13 @@ import java.io.IOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; + /** * Defines coprocessor hooks for interacting with operations on the * {@link org.apache.hadoop.hbase.regionserver.HRegionServer} process. Since most implementations @@ -137,4 +140,18 @@ default void preExecuteProcedures(ObserverContext ctx) throws IOException { } + + /** + * This will be called before replication sink mutations are executed on the sink table as part of + * batch call. + * @param ctx the environment to interact with the framework and region server. + * @param walEntry wal entry from which mutation is formed. + * @param mutation mutation to be applied at sink cluster. + * @throws IOException if something goes wrong. + */ + default void preReplicationSinkBatchMutate( + ObserverContext ctx, AdminProtos.WALEntry walEntry, + Mutation mutation) throws IOException { + + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index 1100addb0c70..4366c61865e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.SharedConnection; import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; @@ -40,6 +41,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; + @InterfaceAudience.Private public class RegionServerCoprocessorHost extends CoprocessorHost { @@ -166,6 +169,16 @@ public void call(RegionServerObserver observer) throws IOException { }); } + public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation) + throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { + @Override + public void call(RegionServerObserver observer) throws IOException { + observer.preReplicationSinkBatchMutate(this, walEntry, mutation); + } + }); + } + public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) throws IOException { if (this.coprocEnvironments.isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java index 7021bd27cfe7..84818fd7d87f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink; @@ -57,8 +59,12 @@ public class ReplicationSinkServiceImpl implements ReplicationSinkService { public void replicateLogEntries(List entries, CellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException { + RegionServerCoprocessorHost rsServerHost = null; + if (server instanceof HRegionServer) { + rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); + } this.replicationSink.replicateEntries(entries, cells, replicationClusterId, - sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath); + sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, rsServerHost); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 72e5611bc004..93da19095698 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes; @@ -107,14 +108,6 @@ public class ReplicationSink { private SourceFSConfigurationProvider provider; private WALEntrySinkFilter walEntrySinkFilter; - /** - * If enabled at sink cluster site config, extended WAL attributes would be attached as Mutation - * attributes. This is useful for source cluster coproc to provide coproc specific metadata as WAL - * annotations and have them attached back to Mutations generated from WAL entries at sink side. - */ - public static final String HBASE_REPLICATION_SINK_ATTRIBUTES_WAL_TO_MUTATIONS = - "hbase.replication.sink.attributes.wal.to.mutations"; - /** * Row size threshold for multi requests above which a warning is logged */ @@ -195,7 +188,7 @@ private void decorateConf() { */ public void replicateEntries(List entries, final CellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, - String sourceHFileArchiveDirPath) throws IOException { + String sourceHFileArchiveDirPath, RegionServerCoprocessorHost rsServerHost) throws IOException { if (entries.isEmpty()) { return; } @@ -273,10 +266,8 @@ public void replicateEntries(List entries, final CellScanner cells, mutation.setClusterIds(clusterIds); mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, HConstants.EMPTY_BYTE_ARRAY); - if (this.conf.getBoolean(HBASE_REPLICATION_SINK_ATTRIBUTES_WAL_TO_MUTATIONS, false)) { - List attributeList = - entry.getKey().getExtendedAttributesList(); - attachWALExtendedAttributesToMutation(mutation, attributeList); + if (rsServerHost != null) { + rsServerHost.preReplicationSinkBatchMutate(entry, mutation); } addToHashMultiMap(rowMap, table, clusterIds, mutation); } @@ -328,15 +319,6 @@ public void replicateEntries(List entries, final CellScanner cells, } } - private static void attachWALExtendedAttributesToMutation(Mutation mutation, - List attributeList) { - if (attributeList != null) { - for (WALProtos.Attribute attribute : attributeList) { - mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray()); - } - } - } - /* * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not. * If false, then ignore this cell. If set to true, de-serialize value into diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java index eb723451f641..85af290ffe63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -46,8 +47,10 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -64,6 +67,9 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + @Category({ ReplicationTests.class, MediumTests.class }) public class TestReplicationWithWALExtendedAttributes { @@ -126,7 +132,8 @@ public static void setUpBeforeClass() throws Exception { conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, TestCoprocessorForWALAnnotationAtSink.class.getName()); - conf2.setBoolean(ReplicationSink.HBASE_REPLICATION_SINK_ATTRIBUTES_WAL_TO_MUTATIONS, true); + conf2.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + TestReplicationSinkRegionServerEndpoint.class.getName()); utility2 = new HBaseTestingUtil(conf2); utility2.setZkCluster(miniZK); @@ -257,4 +264,32 @@ public void preBatchMutate(ObserverContext c, throw new IOException("Failed to retrieve WAL annotations.."); } } + + public static final class TestReplicationSinkRegionServerEndpoint + implements RegionServerCoprocessor, RegionServerObserver { + + @Override + public Optional getRegionServerObserver() { + return Optional.of(this); + } + + @Override + public void preReplicationSinkBatchMutate( + ObserverContext ctx, AdminProtos.WALEntry walEntry, + Mutation mutation) throws IOException { + RegionServerObserver.super.preReplicationSinkBatchMutate(ctx, walEntry, mutation); + List attributeList = walEntry.getKey().getExtendedAttributesList(); + attachWALExtendedAttributesToMutation(mutation, attributeList); + } + + private void attachWALExtendedAttributesToMutation(Mutation mutation, + List attributeList) { + if (attributeList != null) { + for (WALProtos.Attribute attribute : attributeList) { + mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray()); + } + } + } + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index 07e2d7ae819a..cdfe897e3b9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -167,7 +167,7 @@ public void testBatchSink() throws Exception { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length); @@ -184,7 +184,7 @@ public void testMixedPutDelete() throws Exception { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, - baseNamespaceDir, hfileArchiveDir); + baseNamespaceDir, hfileArchiveDir, null); entries = new ArrayList<>(BATCH_SIZE); cells = new ArrayList<>(); @@ -194,7 +194,7 @@ public void testMixedPutDelete() throws Exception { } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); assertEquals(BATCH_SIZE / 2, scanRes.next(BATCH_SIZE).length); @@ -208,7 +208,7 @@ public void testLargeEditsPutDelete() throws Exception { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, - baseNamespaceDir, hfileArchiveDir); + baseNamespaceDir, hfileArchiveDir, null); ResultScanner resultScanner = table1.getScanner(new Scan()); int totalRows = 0; @@ -224,7 +224,7 @@ public void testLargeEditsPutDelete() throws Exception { i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, - baseNamespaceDir, hfileArchiveDir); + baseNamespaceDir, hfileArchiveDir, null); resultScanner = table1.getScanner(new Scan()); totalRows = 0; while (resultScanner.next() != null) { @@ -245,7 +245,7 @@ public void testMixedPutTables() throws Exception { } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); Scan scan = new Scan(); ResultScanner scanRes = table2.getScanner(scan); for (Result res : scanRes) { @@ -268,7 +268,7 @@ public void testMixedDeletes() throws Exception { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); entries = new ArrayList<>(3); cells = new ArrayList<>(); entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells)); @@ -276,7 +276,7 @@ public void testMixedDeletes() throws Exception { entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells)); SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); @@ -299,7 +299,7 @@ public void testApplyDeleteBeforePut() throws Exception { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); Get get = new Get(Bytes.toBytes(1)); Result res = table1.get(get); assertEquals(0, res.size()); @@ -315,7 +315,7 @@ public void testRethrowRetriesExhaustedException() throws Exception { } try { SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); Assert.fail("Should re-throw TableNotFoundException."); } catch (TableNotFoundException e) { } @@ -329,7 +329,7 @@ public void testRethrowRetriesExhaustedException() throws Exception { admin.disableTable(TABLE_NAME1); try { SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); Assert.fail("Should re-throw RetriesExhaustedWithDetailsException."); } catch (RetriesExhaustedException e) { } finally { @@ -410,7 +410,7 @@ public void testReplicateEntriesForHFiles() throws Exception { } // 7. Replicate the bulk loaded entry SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); try (ResultScanner scanner = table1.getScanner(new Scan())) { // 8. Assert data is replicated assertEquals(numRows, scanner.next(numRows).length); @@ -433,7 +433,7 @@ public void testFailedReplicationSinkMetrics() throws IOException { cells.clear(); // cause IndexOutOfBoundsException try { SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); Assert.fail("Should re-throw ArrayIndexOutOfBoundsException."); } catch (ArrayIndexOutOfBoundsException e) { errorCount++; @@ -448,7 +448,7 @@ public void testFailedReplicationSinkMetrics() throws IOException { } try { SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); Assert.fail("Should re-throw TableNotFoundException."); } catch (TableNotFoundException e) { errorCount++; @@ -466,7 +466,7 @@ public void testFailedReplicationSinkMetrics() throws IOException { admin.disableTable(TABLE_NAME1); try { SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); Assert.fail("Should re-throw IOException."); } catch (IOException e) { errorCount++; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java index c7ac87f08219..d23fb5e0fb77 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java @@ -159,7 +159,7 @@ public boolean advance() throws IOException { } }; // Call our sink. - sink.replicateEntries(entries, cellScanner, null, null, null); + sink.replicateEntries(entries, cellScanner, null, null, null, null); // Check what made it through and what was filtered. assertTrue(FILTERED.get() > 0); assertTrue(UNFILTERED.get() > 0); From 9223fb4de04281fe0204afdfcbee1cffe07edf84 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Wed, 4 Jan 2023 17:02:18 -0800 Subject: [PATCH 5/7] addendum --- .../hadoop/hbase/replication/regionserver/ReplicationSink.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 93da19095698..ca0f492c6b87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -179,11 +179,14 @@ private void decorateConf() { /** * Replicate this array of entries directly into the local cluster using the native client. Only * operates against raw protobuf type saving on a conversion from pb to pojo. + * @param entries WAL entries to be replicated. + * @param cells cell scanner for iteration. * @param replicationClusterId Id which will uniquely identify source cluster FS client * configurations in the replication configuration directory * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace * directory * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory + * @param rsServerHost regionserver coproc host. * @throws IOException If failed to replicate the data */ public void replicateEntries(List entries, final CellScanner cells, From e8bed4709b40329c9273e8a75b8320bab60129e2 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Tue, 10 Jan 2023 12:01:44 -0800 Subject: [PATCH 6/7] addendum --- .../hbase/coprocessor/RegionServerObserver.java | 15 +++++++++++++++ .../regionserver/RegionServerCoprocessorHost.java | 10 ++++++++++ .../replication/regionserver/ReplicationSink.java | 12 ++++++++++++ .../TestReplicationWithWALExtendedAttributes.java | 9 +++++++++ 4 files changed, 46 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java index 16caf7873229..236667b4be7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java @@ -154,4 +154,19 @@ default void preReplicationSinkBatchMutate( Mutation mutation) throws IOException { } + + /** + * This will be called after replication sink mutations are executed on the sink table as part of + * batch call. + * @param ctx the environment to interact with the framework and region server. + * @param walEntry wal entry from which mutation is formed. + * @param mutation mutation to be applied at sink cluster. + * @throws IOException if something goes wrong. + */ + default void postReplicationSinkBatchMutate( + ObserverContext ctx, AdminProtos.WALEntry walEntry, + Mutation mutation) throws IOException { + + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index 4366c61865e6..af1e923760d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -179,6 +179,16 @@ public void call(RegionServerObserver observer) throws IOException { }); } + public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation) + throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { + @Override + public void call(RegionServerObserver observer) throws IOException { + observer.postReplicationSinkBatchMutate(this, walEntry, mutation); + } + }); + } + public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) throws IOException { if (this.coprocEnvironments.isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index ca0f492c6b87..c15c6208f5d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -204,6 +204,8 @@ public void replicateEntries(List entries, final CellScanner cells, Map, List>> rowMap = new TreeMap<>(); Map, Map>>>> bulkLoadsPerClusters = null; + Pair, List> mutationsToWalEntriesPairs = + new Pair<>(new ArrayList<>(), new ArrayList<>()); for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); if (this.walEntrySinkFilter != null) { @@ -271,6 +273,8 @@ public void replicateEntries(List entries, final CellScanner cells, HConstants.EMPTY_BYTE_ARRAY); if (rsServerHost != null) { rsServerHost.preReplicationSinkBatchMutate(entry, mutation); + mutationsToWalEntriesPairs.getFirst().add(mutation); + mutationsToWalEntriesPairs.getSecond().add(entry); } addToHashMultiMap(rowMap, table, clusterIds, mutation); } @@ -294,6 +298,14 @@ public void replicateEntries(List entries, final CellScanner cells, LOG.debug("Finished replicating mutations."); } + if (rsServerHost != null) { + List mutations = mutationsToWalEntriesPairs.getFirst(); + List walEntries = mutationsToWalEntriesPairs.getSecond(); + for (int i = 0; i < mutations.size(); i++) { + rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i)); + } + } + if (bulkLoadsPerClusters != null) { for (Entry, Map>>>> entry : bulkLoadsPerClusters.entrySet()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java index 85af290ffe63..a41d47df64d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java @@ -282,6 +282,15 @@ public void preReplicationSinkBatchMutate( attachWALExtendedAttributesToMutation(mutation, attributeList); } + @Override + public void postReplicationSinkBatchMutate( + ObserverContext ctx, AdminProtos.WALEntry walEntry, + Mutation mutation) throws IOException { + RegionServerObserver.super.postReplicationSinkBatchMutate(ctx, walEntry, mutation); + LOG.info("WALEntry extended attributes: {}", walEntry.getKey().getExtendedAttributesList()); + LOG.info("Mutation attributes: {}", mutation.getAttributesMap()); + } + private void attachWALExtendedAttributesToMutation(Mutation mutation, List attributeList) { if (attributeList != null) { From 40d86db1bcdfb6a741f26c645cecfbaf280ee7a8 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Fri, 13 Jan 2023 15:17:41 -0800 Subject: [PATCH 7/7] addendum --- .../ReplicationSinkServiceImpl.java | 12 +++---- .../regionserver/ReplicationSink.java | 9 +++-- .../regionserver/TestReplicationSink.java | 35 ++++++++++--------- .../regionserver/TestWALEntrySinkFilter.java | 4 +-- 4 files changed, 33 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java index 84818fd7d87f..c8141b683406 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java @@ -59,12 +59,8 @@ public class ReplicationSinkServiceImpl implements ReplicationSinkService { public void replicateLogEntries(List entries, CellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException { - RegionServerCoprocessorHost rsServerHost = null; - if (server instanceof HRegionServer) { - rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); - } this.replicationSink.replicateEntries(entries, cells, replicationClusterId, - sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, rsServerHost); + sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath); } @Override @@ -78,7 +74,11 @@ public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir @Override public void startReplicationService() throws IOException { - this.replicationSink = new ReplicationSink(this.conf); + RegionServerCoprocessorHost rsServerHost = null; + if (server instanceof HRegionServer) { + rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); + } + this.replicationSink = new ReplicationSink(this.conf, rsServerHost); this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore( "ReplicationSinkStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond))); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index c15c6208f5d2..8610a6d43bd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -114,13 +114,17 @@ public class ReplicationSink { private final int rowSizeWarnThreshold; private boolean replicationSinkTrackerEnabled; + private final RegionServerCoprocessorHost rsServerHost; + /** * Create a sink for replication * @param conf conf object * @throws IOException thrown when HDFS goes bad or bad file name */ - public ReplicationSink(Configuration conf) throws IOException { + public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost) + throws IOException { this.conf = HBaseConfiguration.create(conf); + this.rsServerHost = rsServerHost; rowSizeWarnThreshold = conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, @@ -186,12 +190,11 @@ private void decorateConf() { * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace * directory * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory - * @param rsServerHost regionserver coproc host. * @throws IOException If failed to replicate the data */ public void replicateEntries(List entries, final CellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, - String sourceHFileArchiveDirPath, RegionServerCoprocessorHost rsServerHost) throws IOException { + String sourceHFileArchiveDirPath) throws IOException { if (entries.isEmpty()) { return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index cdfe897e3b9a..dc634632c946 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -129,7 +130,9 @@ public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName()); TEST_UTIL.startMiniCluster(3); - SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration())); + RegionServerCoprocessorHost rsCpHost = + TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getRegionServerCoprocessorHost(); + SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), rsCpHost); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); @@ -167,7 +170,7 @@ public void testBatchSink() throws Exception { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length); @@ -184,7 +187,7 @@ public void testMixedPutDelete() throws Exception { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, - baseNamespaceDir, hfileArchiveDir, null); + baseNamespaceDir, hfileArchiveDir); entries = new ArrayList<>(BATCH_SIZE); cells = new ArrayList<>(); @@ -194,7 +197,7 @@ public void testMixedPutDelete() throws Exception { } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); assertEquals(BATCH_SIZE / 2, scanRes.next(BATCH_SIZE).length); @@ -208,7 +211,7 @@ public void testLargeEditsPutDelete() throws Exception { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, - baseNamespaceDir, hfileArchiveDir, null); + baseNamespaceDir, hfileArchiveDir); ResultScanner resultScanner = table1.getScanner(new Scan()); int totalRows = 0; @@ -224,7 +227,7 @@ public void testLargeEditsPutDelete() throws Exception { i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, - baseNamespaceDir, hfileArchiveDir, null); + baseNamespaceDir, hfileArchiveDir); resultScanner = table1.getScanner(new Scan()); totalRows = 0; while (resultScanner.next() != null) { @@ -245,7 +248,7 @@ public void testMixedPutTables() throws Exception { } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); ResultScanner scanRes = table2.getScanner(scan); for (Result res : scanRes) { @@ -268,7 +271,7 @@ public void testMixedDeletes() throws Exception { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); entries = new ArrayList<>(3); cells = new ArrayList<>(); entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells)); @@ -276,7 +279,7 @@ public void testMixedDeletes() throws Exception { entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells)); SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); @@ -299,7 +302,7 @@ public void testApplyDeleteBeforePut() throws Exception { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Get get = new Get(Bytes.toBytes(1)); Result res = table1.get(get); assertEquals(0, res.size()); @@ -315,7 +318,7 @@ public void testRethrowRetriesExhaustedException() throws Exception { } try { SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Assert.fail("Should re-throw TableNotFoundException."); } catch (TableNotFoundException e) { } @@ -329,7 +332,7 @@ public void testRethrowRetriesExhaustedException() throws Exception { admin.disableTable(TABLE_NAME1); try { SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Assert.fail("Should re-throw RetriesExhaustedWithDetailsException."); } catch (RetriesExhaustedException e) { } finally { @@ -410,7 +413,7 @@ public void testReplicateEntriesForHFiles() throws Exception { } // 7. Replicate the bulk loaded entry SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); try (ResultScanner scanner = table1.getScanner(new Scan())) { // 8. Assert data is replicated assertEquals(numRows, scanner.next(numRows).length); @@ -433,7 +436,7 @@ public void testFailedReplicationSinkMetrics() throws IOException { cells.clear(); // cause IndexOutOfBoundsException try { SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Assert.fail("Should re-throw ArrayIndexOutOfBoundsException."); } catch (ArrayIndexOutOfBoundsException e) { errorCount++; @@ -448,7 +451,7 @@ public void testFailedReplicationSinkMetrics() throws IOException { } try { SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Assert.fail("Should re-throw TableNotFoundException."); } catch (TableNotFoundException e) { errorCount++; @@ -466,7 +469,7 @@ public void testFailedReplicationSinkMetrics() throws IOException { admin.disableTable(TABLE_NAME1); try { SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, null); + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Assert.fail("Should re-throw IOException."); } catch (IOException e) { errorCount++; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java index d23fb5e0fb77..d66aef492ffe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java @@ -113,7 +113,7 @@ public void testWALEntryFilter() throws IOException { IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL, DevNullAsyncClusterConnection.class, AsyncClusterConnection.class); - ReplicationSink sink = new ReplicationSink(conf); + ReplicationSink sink = new ReplicationSink(conf, null); // Create some dumb walentries. List entries = new ArrayList<>(); AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); @@ -159,7 +159,7 @@ public boolean advance() throws IOException { } }; // Call our sink. - sink.replicateEntries(entries, cellScanner, null, null, null, null); + sink.replicateEntries(entries, cellScanner, null, null, null); // Check what made it through and what was filtered. assertTrue(FILTERED.get() > 0); assertTrue(UNFILTERED.get() > 0);