Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -494,29 +494,31 @@ public void rotateCache(
if (namenodeContexts == null || namenodeContexts.size() <= 1) {
return namenodeContexts;
}
FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0);
/*
* If the first nn in the cache is active, the active nn priority cannot be lowered.
* This happens when other threads have already updated the cache.
*/
if (firstNamenodeContext.getState().equals(ACTIVE)) {
return namenodeContexts;

// If there is active nn, rotateCache is not needed
// because the router has already loaded the cache.
for (FederationNamenodeContext namenodeContext : namenodeContexts) {
if (namenodeContext.getState() == ACTIVE) {
return namenodeContexts;
}
}
/*
* If the first nn in the cache at this time is not the nn
* that needs to be lowered in priority, there is no need to rotate.
* This happens when other threads have already rotated the cache.
*/
if (firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) {
List<FederationNamenodeContext> rotatedNnContexts = new ArrayList<>(namenodeContexts);
Collections.rotate(rotatedNnContexts, -1);
String firstNamenodeId = namenodeContexts.get(0).getNamenodeId();
LOG.info("Rotate cache of pair <ns: {}, observer first: {}>, put namenode: {} in the " +
"first position of the cache and namenode: {} in the last position of the cache",
nsId, listObserversFirst, firstNamenodeId, namenode.getNamenodeId());
return rotatedNnContexts;

// If the last namenode in the cache at this time is the namenode.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the namenode.
Should this be expanded to is the namenode whose priority needs to be lowered?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// No need to rotate cache, because other threads have already rotated the cache.
FederationNamenodeContext lastNamenode = namenodeContexts.get(namenodeContexts.size()-1);
if (lastNamenode.getRpcAddress().equals(namenode.getRpcAddress())) {
return namenodeContexts;
}
return namenodeContexts;

// Move the abnormal namenode to the end of the cache,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abnormal -> inaccessible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your advice, done

// to ensure that the current namenode will not be accessed first next time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the current namenode here the same as the abnormal one in the line above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I have modified the comment

List<FederationNamenodeContext> rotateNamenodeContexts =
(List<FederationNamenodeContext>) namenodeContexts;
rotateNamenodeContexts.remove(namenode);
rotateNamenodeContexts.add(namenode);
LOG.info("Rotate cache of pair<{}, {}> -> {}",
nsId, listObserversFirst, rotateNamenodeContexts);
return rotateNamenodeContexts;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,12 @@ private static IOException toIOException(Exception e) {
* @throws NoNamenodesAvailableException Exception that the retry policy
* generates for no available namenodes.
*/
private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
final String nsId) throws IOException {
private RetryDecision shouldRetry(
final IOException ioe, final int retryCount, final String nsId,
final FederationNamenodeContext namenode,
final boolean listObserverFirst) throws IOException {
// check for the case of cluster unavailable state
if (isClusterUnAvailable(nsId)) {
if (isClusterUnAvailable(nsId, namenode, listObserverFirst)) {
// we allow to retry once if cluster is unavailable
if (retryCount == 0) {
return RetryDecision.RETRY;
Expand Down Expand Up @@ -538,7 +540,7 @@ public Object invokeMethod(
ProxyAndInfo<?> client = connection.getClient();
final Object proxy = client.getProxy();

ret = invoke(nsId, 0, method, proxy, params);
ret = invoke(nsId, namenode, useObserver, 0, method, proxy, params);
if (failover &&
FederationNamenodeServiceState.OBSERVER != namenode.getState()) {
// Success on alternate server, update
Expand Down Expand Up @@ -594,13 +596,16 @@ public Object invokeMethod(
se.initCause(ioe);
throw se;
} else if (ioe instanceof NoNamenodesAvailableException) {
IOException cause = (IOException) ioe.getCause();
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpNoNamenodes(nsId);
}
LOG.error("Cannot get available namenode for {} {} error: {}",
nsId, rpcAddress, ioe.getMessage());
// Rotate cache so that client can retry the next namenode in the cache
this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver);
if (shouldRotateCache(cause)) {
this.namenodeResolver.rotateCache(nsId, namenode, useObserver);
}
// Throw RetriableException so that client can retry
throw new RetriableException(ioe);
} else {
Expand Down Expand Up @@ -708,7 +713,9 @@ private void addClientInfoToCallerContext(UserGroupInformation ugi) {
* @return Response from the remote server
* @throws IOException If error occurs.
*/
private Object invoke(String nsId, int retryCount, final Method method,
private Object invoke(
String nsId, FederationNamenodeContext namenode, Boolean listObserverFirst,
int retryCount, final Method method,
final Object obj, final Object... params) throws IOException {
try {
return method.invoke(obj, params);
Expand All @@ -721,14 +728,14 @@ private Object invoke(String nsId, int retryCount, final Method method,
IOException ioe = (IOException) cause;

// Check if we should retry.
RetryDecision decision = shouldRetry(ioe, retryCount, nsId);
RetryDecision decision = shouldRetry(ioe, retryCount, nsId, namenode, listObserverFirst);
if (decision == RetryDecision.RETRY) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpRetries();
}

// retry
return invoke(nsId, ++retryCount, method, obj, params);
return invoke(nsId, namenode, listObserverFirst, ++retryCount, method, obj, params);
} else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
// failover, invoker looks for standby exceptions for failover.
if (ioe instanceof StandbyException) {
Expand Down Expand Up @@ -772,13 +779,22 @@ public static boolean isUnavailableException(IOException ioe) {
* Check if the cluster of given nameservice id is available.
*
* @param nsId nameservice ID.
* @param namenode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* @param listObserverFirst
* @return true if the cluster with given nameservice id is available.
* @throws IOException if error occurs.
*/
private boolean isClusterUnAvailable(String nsId) throws IOException {
private boolean isClusterUnAvailable(
String nsId, FederationNamenodeContext namenode,
boolean listObserverFirst) throws IOException {
// Use observer and the namenode that causes the exception is an observer,
// false is returned so that the oberver can be marked as unavailable,so other observers
// or active namenode which is standby in the cache of the router can be retried.
if (listObserverFirst && namenode.getState() == FederationNamenodeServiceState.OBSERVER) {
return false;
}
List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
.getNamenodesForNameserviceId(nsId, false);

.getNamenodesForNameserviceId(nsId, listObserverFirst);
if (nnState != null) {
for (FederationNamenodeContext nnContext : nnState) {
// Once we find one NN is in active state, we assume this
Expand Down Expand Up @@ -1830,4 +1846,24 @@ private LongAccumulator getTimeOfLastCallToActive(String namespaceId) {
return lastActiveNNRefreshTimes
.computeIfAbsent(namespaceId, key -> new LongAccumulator(Math::max, 0));
}

/**
* Determine whether router rotated cache is required when NoNamenodesAvailableException occurs.
*
* @param ioe cause of the NoNamenodesAvailableException.
* @return true if NoNamenodesAvailableException occurs due to
* {@link RouterRpcClient#isUnavailableException(IOException) unavailable exception},
* otherwise false.
*/
private boolean shouldRotateCache(IOException ioe) {
if (isUnavailableException(ioe)) {
return true;
}
if (ioe instanceof RemoteException) {
RemoteException re = (RemoteException) ioe;
ioe = re.unwrapRemoteException();
ioe = getCleanException(ioe);
}
return isUnavailableException(ioe);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ public class MiniRouterDFSCluster {
/** Mini cluster. */
private MiniDFSCluster cluster;

protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
public static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(5);
protected static final long DEFAULT_CACHE_INTERVAL_MS =
public static final long DEFAULT_CACHE_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(5);
/** Heartbeat interval in milliseconds. */
private long heartbeatInterval;
Expand Down Expand Up @@ -253,6 +253,19 @@ public FileSystem getFileSystemWithObserverReadProxyProvider() throws IOExceptio
return DistributedFileSystem.get(observerReadConf);
}

public FileSystem getFileSystemWithConfiguredFailoverProxyProvider() throws IOException {
conf.set(DFS_NAMESERVICES,
conf.get(DFS_NAMESERVICES)+ ",router-service");
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1");
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1",
getFileSystemURI().toString());
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + "router-service", ConfiguredFailoverProxyProvider.class.getName());
DistributedFileSystem.setDefaultUri(conf, "hdfs://router-service");

return DistributedFileSystem.get(conf);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getFileSystemWithConfiguredFailoverProxyProvider and getFileSystemWithObserverReadProxyProvider() share the same configuration, beside the proxy provider. You can separate shared config these one into a separate function, then just set the proxy provider.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {

Expand Down
Loading