1818
1919package org .apache .hadoop .hbase .replication ;
2020
21+ import static org .apache .hadoop .hbase .HConstants .DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT ;
22+ import static org .apache .hadoop .hbase .HConstants .HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY ;
23+
2124import java .io .IOException ;
2225import java .util .ArrayList ;
2326import java .util .Collections ;
2831
2932import org .apache .hadoop .conf .Configuration ;
3033import org .apache .hadoop .fs .Path ;
34+ import org .apache .hadoop .hbase .Abortable ;
3135import org .apache .hadoop .hbase .HBaseConfiguration ;
36+ import org .apache .hadoop .hbase .ChoreService ;
3237import org .apache .hadoop .hbase .client .AsyncClusterConnection ;
3338import org .apache .hadoop .hbase .client .AsyncRegionServerAdmin ;
3439import org .apache .hadoop .hbase .client .AsyncReplicationServerAdmin ;
3540import org .apache .hadoop .hbase .client .ClusterConnectionFactory ;
3641import org .apache .hadoop .hbase .protobuf .ReplicationProtobufUtil ;
42+ import org .apache .hadoop .hbase .ScheduledChore ;
43+ import org .apache .hadoop .hbase .Server ;
44+ import org .apache .hadoop .hbase .ServerName ;
3745import org .apache .hadoop .hbase .security .User ;
46+ import org .apache .hadoop .hbase .security .UserProvider ;
47+ import org .apache .hadoop .hbase .util .FutureUtils ;
3848import org .apache .hadoop .hbase .wal .WAL ;
39- import org .apache .hadoop .hbase .zookeeper .ZKListener ;
40- import org .apache .yetus .audience .InterfaceAudience ;
41- import org .apache .hadoop .hbase .Abortable ;
42- import org .apache .hadoop .hbase .ServerName ;
4349import org .apache .hadoop .hbase .zookeeper .ZKClusterId ;
50+ import org .apache .hadoop .hbase .zookeeper .ZKListener ;
4451import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
4552import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
53+ import org .apache .yetus .audience .InterfaceAudience ;
4654import org .apache .zookeeper .KeeperException ;
4755import org .apache .zookeeper .KeeperException .AuthFailedException ;
4856import org .apache .zookeeper .KeeperException .ConnectionLossException ;
5260
5361import org .apache .hbase .thirdparty .com .google .common .annotations .VisibleForTesting ;
5462import org .apache .hbase .thirdparty .com .google .common .collect .Maps ;
63+ import org .apache .hbase .thirdparty .com .google .protobuf .ServiceException ;
64+
65+ import org .apache .hadoop .hbase .shaded .protobuf .ProtobufUtil ;
66+ import org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProtos .ListReplicationSinkServersRequest ;
67+ import org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProtos .ListReplicationSinkServersResponse ;
68+ import org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProtos .MasterService ;
5569
5670/**
5771 * A {@link BaseReplicationEndpoint} for replication endpoints whose
@@ -63,6 +77,13 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
6377
6478 private static final Logger LOG = LoggerFactory .getLogger (HBaseReplicationEndpoint .class );
6579
80+ public static final String FETCH_SERVERS_USE_ZK_CONF_KEY =
81+ "hbase.replication.fetch.servers.usezk" ;
82+
83+ public static final String FETCH_SERVERS_INTERVAL_CONF_KEY =
84+ "hbase.replication.fetch.servers.interval" ;
85+ public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000 ; // 10 mins
86+
6687 private ZKWatcher zkw = null ;
6788 private final Object zkwLock = new Object ();
6889
@@ -94,6 +115,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
94115
95116 private List <ServerName > sinkServers = new ArrayList <>(0 );
96117
118+ private AsyncClusterConnection peerConnection ;
119+ private boolean fetchServersUseZk = false ;
120+ private FetchServersChore fetchServersChore ;
121+ private int shortOperationTimeout ;
122+
97123 /*
98124 * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
99125 * Connection implementations, or initialize it in a different way, so defining createConnection
@@ -129,6 +155,19 @@ protected void disconnect() {
129155 LOG .warn ("{} Failed to close the connection" , ctx .getPeerId ());
130156 }
131157 }
158+ if (fetchServersChore != null ) {
159+ ChoreService choreService = ctx .getServer ().getChoreService ();
160+ if (null != choreService ) {
161+ choreService .cancelChore (fetchServersChore );
162+ }
163+ }
164+ if (peerConnection != null ) {
165+ try {
166+ peerConnection .close ();
167+ } catch (IOException e ) {
168+ LOG .warn ("Attempt to close peerConnection failed." , e );
169+ }
170+ }
132171 }
133172
134173 /**
@@ -159,8 +198,27 @@ public void stop() {
159198 }
160199
161200 @ Override
162- protected void doStart () {
201+ protected synchronized void doStart () {
202+ this .shortOperationTimeout = ctx .getLocalConfiguration ().getInt (
203+ HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY , DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT );
163204 try {
205+ if (ctx .getLocalConfiguration ().getBoolean (FETCH_SERVERS_USE_ZK_CONF_KEY , false )) {
206+ fetchServersUseZk = true ;
207+ } else {
208+ try {
209+ if (ReplicationUtils .isPeerClusterSupportReplicationOffload (getPeerConnection ())) {
210+ fetchServersChore = new FetchServersChore (ctx .getServer (), this );
211+ ctx .getServer ().getChoreService ().scheduleChore (fetchServersChore );
212+ fetchServersUseZk = false ;
213+ } else {
214+ fetchServersUseZk = true ;
215+ }
216+ } catch (Throwable t ) {
217+ fetchServersUseZk = true ;
218+ LOG .warn ("Peer {} try to fetch servers by admin failed. Using zk impl." ,
219+ ctx .getPeerId (), t );
220+ }
221+ }
164222 reloadZkWatcher ();
165223 connectPeerCluster ();
166224 notifyStarted ();
@@ -203,7 +261,9 @@ private void reloadZkWatcher() throws IOException {
203261 }
204262 zkw = new ZKWatcher (ctx .getConfiguration (),
205263 "connection to cluster: " + ctx .getPeerId (), this );
206- zkw .registerListener (new PeerRegionServerListener (this ));
264+ if (fetchServersUseZk ) {
265+ zkw .registerListener (new PeerRegionServerListener (this ));
266+ }
207267 }
208268 }
209269
@@ -228,12 +288,47 @@ public boolean isAborted() {
228288 return false ;
229289 }
230290
291+ /**
292+ * Get the connection to peer cluster
293+ * @return connection to peer cluster
294+ * @throws IOException If anything goes wrong connecting
295+ */
296+ private synchronized AsyncClusterConnection getPeerConnection () throws IOException {
297+ if (peerConnection == null ) {
298+ Configuration conf = ctx .getConfiguration ();
299+ peerConnection = ClusterConnectionFactory .createAsyncClusterConnection (conf , null ,
300+ UserProvider .instantiate (conf ).getCurrent ());
301+ }
302+ return peerConnection ;
303+ }
304+
305+ /**
306+ * Get the list of all the servers that are responsible for replication sink
307+ * from the specified peer master
308+ * @return list of server addresses or an empty list if the slave is unavailable
309+ */
310+ protected List <ServerName > fetchSlavesAddresses () {
311+ try {
312+ AsyncClusterConnection peerConn = getPeerConnection ();
313+ ServerName master = FutureUtils .get (peerConn .getAdmin ().getMaster ());
314+ MasterService .BlockingInterface masterStub = MasterService .newBlockingStub (
315+ peerConn .getRpcClient ()
316+ .createBlockingRpcChannel (master , User .getCurrent (), shortOperationTimeout ));
317+ ListReplicationSinkServersResponse resp = masterStub
318+ .listReplicationSinkServers (null , ListReplicationSinkServersRequest .newBuilder ().build ());
319+ return ProtobufUtil .toServerNameList (resp .getServerNameList ());
320+ } catch (ServiceException | IOException e ) {
321+ LOG .error ("Peer {} fetches servers failed" , ctx .getPeerId (), e );
322+ }
323+ return Collections .emptyList ();
324+ }
325+
231326 /**
232327 * Get the list of all the region servers from the specified peer
233328 *
234329 * @return list of region server addresses or an empty list if the slave is unavailable
235330 */
236- protected List <ServerName > fetchSlavesAddresses () {
331+ protected List <ServerName > fetchSlavesAddressesByZK () {
237332 List <String > children = null ;
238333 try {
239334 synchronized (zkwLock ) {
@@ -256,7 +351,12 @@ protected List<ServerName> fetchSlavesAddresses() {
256351 }
257352
258353 protected synchronized void chooseSinks () {
259- List <ServerName > slaveAddresses = fetchSlavesAddresses ();
354+ List <ServerName > slaveAddresses = Collections .emptyList ();
355+ if (fetchServersUseZk ) {
356+ slaveAddresses = fetchSlavesAddressesByZK ();
357+ } else {
358+ slaveAddresses = fetchSlavesAddresses ();
359+ }
260360 if (slaveAddresses .isEmpty ()) {
261361 LOG .warn ("No sinks available at peer. Will not be able to replicate" );
262362 }
@@ -287,6 +387,14 @@ protected synchronized SinkPeer getReplicationSink() throws IOException {
287387 return createSinkPeer (serverName );
288388 }
289389
390+ private SinkPeer createSinkPeer (ServerName serverName ) throws IOException {
391+ if (ReplicationUtils .isPeerClusterSupportReplicationOffload (conn )) {
392+ return new ReplicationServerSinkPeer (serverName , conn .getReplicationServerAdmin (serverName ));
393+ } else {
394+ return new RegionServerSinkPeer (serverName , conn .getRegionServerAdmin (serverName ));
395+ }
396+ }
397+
290398 /**
291399 * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
292400 * failed). If a single SinkPeer is reported as bad more than
@@ -396,11 +504,23 @@ public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
396504 }
397505 }
398506
399- private SinkPeer createSinkPeer (ServerName serverName ) throws IOException {
400- if (ReplicationUtils .isPeerClusterSupportReplicationOffload (conn )) {
401- return new ReplicationServerSinkPeer (serverName , conn .getReplicationServerAdmin (serverName ));
402- } else {
403- return new RegionServerSinkPeer (serverName , conn .getRegionServerAdmin (serverName ));
507+ /**
508+ * Chore that will fetch the list of servers from peer master.
509+ */
510+ public static class FetchServersChore extends ScheduledChore {
511+
512+ private HBaseReplicationEndpoint endpoint ;
513+
514+ public FetchServersChore (Server server , HBaseReplicationEndpoint endpoint ) {
515+ super ("Peer-" + endpoint .ctx .getPeerId () + "-FetchServersChore" , server ,
516+ server .getConfiguration ()
517+ .getInt (FETCH_SERVERS_INTERVAL_CONF_KEY , DEFAULT_FETCH_SERVERS_INTERVAL ));
518+ this .endpoint = endpoint ;
519+ }
520+
521+ @ Override
522+ protected void chore () {
523+ endpoint .chooseSinks ();
404524 }
405525 }
406526}
0 commit comments