Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,25 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private boolean dropOnDeletedTables;
private boolean isSerial = false;

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instantiating..

* Connection implementations, or initialize it in a different way, so defining createConnection
* as protected for possible overridings.
*/
protected Connection createConnection(Configuration conf) throws IOException {
return ConnectionFactory.createConnection(conf);
}

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* ReplicationSinkManager implementations, or initialize it in a different way,
* so defining createReplicationSinkManager as protected for possible overridings.
*/
protected ReplicationSinkManager createReplicationSinkManager(Connection conn) {
return new ReplicationSinkManager((ClusterConnection) conn, this.ctx.getPeerId(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: redundant cast to conn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compilation fails without this cast, as ReplicationSinkManager expects ClusterConnection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I meant pass ClusterConnection directly, but I guess that brings us back to your other comment as to why you want to use generic "Connection" object.

this, this.conf);
}

@Override
public void init(Context context) throws IOException {
super.init(context);
Expand All @@ -133,12 +152,12 @@ public void init(Context context) throws IOException {
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
this.conn = (ClusterConnection) createConnection(this.conf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need force createConnection() to return a connection of type ClusterConnection? Otherwise, theoretically overrides can return any type of Connection (may or may not be ClusterConnection) and the type cast here fails. Just a thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that kind of restrict the extensibility, which is what we are trying to improve here, but addressing that would require an extra refactoring effort, just as @joshelser mentioned on the master PR. For example, in master branch, we are forced to return AsyncClusterConnection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that kind of restrict the extensibility, which is what we are trying to improve here,

Hmm.. May be I'm missing something, what does your use case look like here where you have an override that does a generic Connection rather than (some type of) ClusterConnection? I'm curious how it restricts extensibility because to use this, it has to be a ClusterConnection, otherwise it fails at runtime, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does your use case look like here where you have an override that does a generic Connection rather than (some type of) ClusterConnection?

I have a HBaseInterClusterReplicationEndpoint extension where I'm still using ConnectionFactory for creating connections, but I use different factory method than what is used by HBaseInterClusterReplicationEndpoint. I don't really care about which connection implementation the factory is returning, I just rely on whatever is the default (in 2.x, it's ClusterConnection). I noticed, though, that default connection type changed between 2.x and 3.0, from ClusterConnection to AsycnClusterConnection. So, if we keep create methods referencing generic Connection only, I won't need to change my HBaseInterClusterReplicationEndpoint extension if I upgrade to hbase 3.x (provided, of course, that HBaseInterClusterReplicationEndpoint.conn variable also changes to same default returned by ConnectionFactory).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see.. I think I get the problem. Basically you are just relying on ConnectionFactory and not worried about the type it is returning. Ya, refactoring it will be a bigger patch.

nit: How about adding a preconditions check.. Preconditions.checkState(returnedConn instanceof ClusterConnection) and back it up with a comment why it could fail?

this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
this.replicationSinkMgr = createReplicationSinkManager(conn);
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
Expand Down