Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private boolean dropOnDeletedTables;
private boolean dropOnDeletedColumnFamilies;
private boolean isSerial = false;
private long lastSinkFetchTime;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JVM will initialize this to zero which is of consequence to the log message (will make sure that you get the LOG.warn the first time).

Explicitly initialize it here so with a comment so we know that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
Expand Down Expand Up @@ -513,8 +514,14 @@ public boolean replicate(ReplicateContext replicateContext) {

int numSinks = replicationSinkMgr.getNumSinks();
if (numSinks == 0) {
LOG.warn("{} No replication sinks found, returning without replicating. "
+ "The source should retry with the same set of edits.", logPeerId());
if((System.currentTimeMillis() - lastSinkFetchTime) >= (maxRetriesMultiplier*1000)) {
LOG.warn(
"No replication sinks found, returning without replicating. "
+ "The source should retry with the same set of edits. Not logging this again for "
+ "the next " + maxRetriesMultiplier + " seconds.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, parameterized logging

lastSinkFetchTime = System.currentTimeMillis();
}
sleepForRetries("No sinks available at peer", sleepMultiplier);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -150,9 +151,14 @@ public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
*/
public synchronized void chooseSinks() {
List<ServerName> slaveAddresses = endpoint.getRegionServers();
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
sinks = slaveAddresses.subList(0, numSinks);
if(slaveAddresses==null){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like HBaseReplicationEndpoint ever returns null. Guarding against custom endpoint implementations?

We should expose getRegionServers on a base-class or interface and explicitly say that we expect a non-null answer. Follow-on..

If easy, this would be good to write a quick unit test to cover this method.

Copy link
Contributor Author

@wchevreuil wchevreuil Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like HBaseReplicationEndpoint ever returns null. Guarding against custom endpoint implementations?

Yeah, that's indeed the case.

We should expose getRegionServers on a base-class or interface and explicitly say that we expect a non-null answer. Follow-on..
If easy, this would be good to write a quick unit test to cover this method.

So we can leave getRegionServers at HBaseReplicationEndpoint, emphasize it should never return null. I'm not sure a UT for this would be effective, though. Custom implementations of HBaseReplicationEndpoint returning null would still blow it at runtime.

So maybe we can safely log warning for if(slaveAddresses.size()==0) ?

Yeah, can just leave that.

LOG.warn("No sinks available at peer. Will not be able to replicate");
sinks = new ArrayList<ServerName>();
} else {
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
sinks = slaveAddresses.subList(0, numSinks);
}
lastUpdateToPeers = System.currentTimeMillis();
badReportCounts.clear();
}
Expand Down