Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw)
}

/**
* Get a list of all the addresses of all the region servers
* for this peer cluster
* Get a list of all the addresses of all the available region servers
* for this peer cluster, or an empty list if no region servers available at peer cluster.
* @return list of addresses
*/
// Synchronize peer cluster connection attempts to avoid races and rate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private boolean dropOnDeletedTables;
private boolean dropOnDeletedColumnFamilies;
private boolean isSerial = false;
//Initialising as 0 to guarantee at least one logging message
private long lastSinkFetchTime = 0;

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
Expand Down Expand Up @@ -513,8 +515,14 @@ public boolean replicate(ReplicateContext replicateContext) {

int numSinks = replicationSinkMgr.getNumSinks();
if (numSinks == 0) {
LOG.warn("{} No replication sinks found, returning without replicating. "
+ "The source should retry with the same set of edits.", logPeerId());
if((System.currentTimeMillis() - lastSinkFetchTime) >= (maxRetriesMultiplier*1000)) {
LOG.warn(
"No replication sinks found, returning without replicating. "
+ "The source should retry with the same set of edits. Not logging this again for "
+ "the next {} seconds.", maxRetriesMultiplier);
lastSinkFetchTime = System.currentTimeMillis();
}
sleepForRetries("No sinks available at peer", sleepMultiplier);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
*/
public synchronized void chooseSinks() {
List<ServerName> slaveAddresses = endpoint.getRegionServers();
if(slaveAddresses.isEmpty()){
LOG.warn("No sinks available at peer. Will not be able to replicate");
}
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
sinks = slaveAddresses.subList(0, numSinks);
Expand Down