From f9c77341e9ed28abb96a22d0d5af26aa77dff636 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Wed, 15 Jan 2020 11:48:29 +0000 Subject: [PATCH 1/3] HBASE-23683 Make HBaseInterClusterReplicationEndpoint more extensible (#1027) Signed-off-by: Bharath Vissapragada Signed-off-by: Josh Elser Signed-off-by: binlijin --- .../HBaseInterClusterReplicationEndpoint.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index db543382d799..86826f3c0962 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -114,6 +114,25 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private boolean dropOnDeletedTables; private boolean isSerial = false; + /* + * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different + * Connection implementations, or initialize it in a different way, so defining createConnection + * as protected for possible overridings. + */ + protected Connection createConnection(Configuration conf) throws IOException { + return ConnectionFactory.createConnection(conf); + } + + /* + * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different + * ReplicationSinkManager implementations, or initialize it in a different way, + * so defining createReplicationSinkManager as protected for possible overridings. + */ + protected ReplicationSinkManager createReplicationSinkManager(Connection conn) { + return new ReplicationSinkManager((ClusterConnection) conn, this.ctx.getPeerId(), + this, this.conf); + } + @Override public void init(Context context) throws IOException { super.init(context); @@ -133,12 +152,12 @@ public void init(Context context) throws IOException { // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. - this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf); + this.conn = (ClusterConnection) createConnection(this.conf); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics(); // ReplicationQueueInfo parses the peerId out of the znode for us - this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf); + this.replicationSinkMgr = createReplicationSinkManager(conn); // per sink thread pool this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); From 0868dfd10bf5e3727fd0bdb41c21fb3d64fc1b60 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 20 Jan 2020 14:08:02 +0000 Subject: [PATCH 2/3] Addressing last round of reviews --- .../HBaseInterClusterReplicationEndpoint.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 86826f3c0962..a919fda31db1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,17 +116,17 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private boolean isSerial = false; /* - * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different - * Connection implementations, or initialize it in a different way, so defining createConnection - * as protected for possible overridings. + * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating + * different Connection implementations, or initialize it in a different way, + * so defining createConnection as protected for possible overridings. */ protected Connection createConnection(Configuration conf) throws IOException { return ConnectionFactory.createConnection(conf); } /* - * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different - * ReplicationSinkManager implementations, or initialize it in a different way, + * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating + * different ReplicationSinkManager implementations, or initialize it in a different way, * so defining createReplicationSinkManager as protected for possible overridings. */ protected ReplicationSinkManager createReplicationSinkManager(Connection conn) { @@ -152,7 +153,11 @@ public void init(Context context) throws IOException { // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. - this.conn = (ClusterConnection) createConnection(this.conf); + Connection connection = createConnection(this.conf); + //Since createConnection method may be overridden by extending classes, we need to make sure + //it's indeed returning a ClusterConnection instance. + Preconditions.checkState(connection instanceof ClusterConnection); + this.conn = (ClusterConnection) connection; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics(); From fea133b6b9ed1163e0949c730056e2209492a281 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 21 Jan 2020 14:46:52 +0000 Subject: [PATCH 3/3] changed import order --- .../regionserver/HBaseInterClusterReplicationEndpoint.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index a919fda31db1..83918c93735b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -59,14 +59,15 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; /**