-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HDFS-17232. RBF: Fix NoNamenodesAvailableException for a long time, when use observer. #6208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
6bbd7ed
d94604c
f29fcf9
4535f16
528f9c7
bd1d203
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -457,14 +457,17 @@ private static IOException toIOException(Exception e) { | |
| * @param ioe IOException reported. | ||
| * @param retryCount Number of retries. | ||
| * @param nsId Nameservice ID. | ||
| * @param namenode namenode context. | ||
| * @param listObserverFirst Observer read case, observer NN will be ranked first. | ||
| * @return Retry decision. | ||
| * @throws NoNamenodesAvailableException Exception that the retry policy | ||
| * generates for no available namenodes. | ||
| * @throws IOException An IO Error occurred. | ||
| */ | ||
| private RetryDecision shouldRetry(final IOException ioe, final int retryCount, | ||
| final String nsId) throws IOException { | ||
| private RetryDecision shouldRetry( | ||
| final IOException ioe, final int retryCount, final String nsId, | ||
| final FederationNamenodeContext namenode, | ||
| final boolean listObserverFirst) throws IOException { | ||
| // check for the case of cluster unavailable state | ||
| if (isClusterUnAvailable(nsId)) { | ||
| if (isClusterUnAvailable(nsId, namenode, listObserverFirst)) { | ||
| // we allow to retry once if cluster is unavailable | ||
| if (retryCount == 0) { | ||
| return RetryDecision.RETRY; | ||
|
|
@@ -538,7 +541,7 @@ public Object invokeMethod( | |
| ProxyAndInfo<?> client = connection.getClient(); | ||
| final Object proxy = client.getProxy(); | ||
|
|
||
| ret = invoke(nsId, 0, method, proxy, params); | ||
| ret = invoke(nsId, namenode, useObserver, 0, method, proxy, params); | ||
| if (failover && | ||
| FederationNamenodeServiceState.OBSERVER != namenode.getState()) { | ||
| // Success on alternate server, update | ||
|
|
@@ -594,13 +597,16 @@ public Object invokeMethod( | |
| se.initCause(ioe); | ||
| throw se; | ||
| } else if (ioe instanceof NoNamenodesAvailableException) { | ||
| IOException cause = (IOException) ioe.getCause(); | ||
| if (this.rpcMonitor != null) { | ||
| this.rpcMonitor.proxyOpNoNamenodes(nsId); | ||
| } | ||
| LOG.error("Cannot get available namenode for {} {} error: {}", | ||
| nsId, rpcAddress, ioe.getMessage()); | ||
| // Rotate cache so that client can retry the next namenode in the cache | ||
| this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver); | ||
| if (shouldRotateCache(cause)) { | ||
| this.namenodeResolver.rotateCache(nsId, namenode, useObserver); | ||
| } | ||
| // Throw RetriableException so that client can retry | ||
| throw new RetriableException(ioe); | ||
| } else { | ||
|
|
@@ -708,7 +714,9 @@ private void addClientInfoToCallerContext(UserGroupInformation ugi) { | |
| * @return Response from the remote server | ||
| * @throws IOException If error occurs. | ||
| */ | ||
| private Object invoke(String nsId, int retryCount, final Method method, | ||
| private Object invoke( | ||
| String nsId, FederationNamenodeContext namenode, Boolean listObserverFirst, | ||
| int retryCount, final Method method, | ||
| final Object obj, final Object... params) throws IOException { | ||
| try { | ||
| return method.invoke(obj, params); | ||
|
|
@@ -721,14 +729,14 @@ private Object invoke(String nsId, int retryCount, final Method method, | |
| IOException ioe = (IOException) cause; | ||
|
|
||
| // Check if we should retry. | ||
| RetryDecision decision = shouldRetry(ioe, retryCount, nsId); | ||
| RetryDecision decision = shouldRetry(ioe, retryCount, nsId, namenode, listObserverFirst); | ||
| if (decision == RetryDecision.RETRY) { | ||
| if (this.rpcMonitor != null) { | ||
| this.rpcMonitor.proxyOpRetries(); | ||
| } | ||
|
|
||
| // retry | ||
| return invoke(nsId, ++retryCount, method, obj, params); | ||
| return invoke(nsId, namenode, listObserverFirst, ++retryCount, method, obj, params); | ||
| } else if (decision == RetryDecision.FAILOVER_AND_RETRY) { | ||
| // failover, invoker looks for standby exceptions for failover. | ||
| if (ioe instanceof StandbyException) { | ||
|
|
@@ -772,13 +780,22 @@ public static boolean isUnavailableException(IOException ioe) { | |
| * Check if the cluster of given nameservice id is available. | ||
| * | ||
| * @param nsId nameservice ID. | ||
| * @param namenode namenode context. | ||
| * @param listObserverFirst Observer read case, observer NN will be ranked first. | ||
| * @return true if the cluster with given nameservice id is available. | ||
| * @throws IOException if error occurs. | ||
| */ | ||
| private boolean isClusterUnAvailable(String nsId) throws IOException { | ||
| private boolean isClusterUnAvailable( | ||
| String nsId, FederationNamenodeContext namenode, | ||
| boolean listObserverFirst) throws IOException { | ||
| // Use observer and the namenode that causes the exception is an observer, | ||
| // false is returned so that the observer can be marked as unavailable,so other observers | ||
| // or active namenode which is standby in the cache of the router can be retried. | ||
|
||
| if (listObserverFirst && namenode.getState() == FederationNamenodeServiceState.OBSERVER) { | ||
| return false; | ||
| } | ||
| List<? extends FederationNamenodeContext> nnState = this.namenodeResolver | ||
| .getNamenodesForNameserviceId(nsId, false); | ||
|
|
||
| .getNamenodesForNameserviceId(nsId, listObserverFirst); | ||
| if (nnState != null) { | ||
| for (FederationNamenodeContext nnContext : nnState) { | ||
| // Once we find one NN is in active state, we assume this | ||
|
|
@@ -1830,4 +1847,24 @@ private LongAccumulator getTimeOfLastCallToActive(String namespaceId) { | |
| return lastActiveNNRefreshTimes | ||
| .computeIfAbsent(namespaceId, key -> new LongAccumulator(Math::max, 0)); | ||
| } | ||
|
|
||
| /** | ||
| * Determine whether router rotated cache is required when NoNamenodesAvailableException occurs. | ||
| * | ||
| * @param ioe cause of the NoNamenodesAvailableException. | ||
| * @return true if NoNamenodesAvailableException occurs due to | ||
| * {@link RouterRpcClient#isUnavailableException(IOException) unavailable exception}, | ||
| * otherwise false. | ||
| */ | ||
| private boolean shouldRotateCache(IOException ioe) { | ||
| if (isUnavailableException(ioe)) { | ||
| return true; | ||
| } | ||
| if (ioe instanceof RemoteException) { | ||
| RemoteException re = (RemoteException) ioe; | ||
| ioe = re.unwrapRemoteException(); | ||
| ioe = getCleanException(ioe); | ||
| } | ||
| return isUnavailableException(ioe); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -132,9 +132,9 @@ public class MiniRouterDFSCluster { | |
| /** Mini cluster. */ | ||
| private MiniDFSCluster cluster; | ||
|
|
||
| protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS = | ||
| public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = | ||
| TimeUnit.SECONDS.toMillis(5); | ||
| protected static final long DEFAULT_CACHE_INTERVAL_MS = | ||
| public static final long DEFAULT_CACHE_INTERVAL_MS = | ||
| TimeUnit.SECONDS.toMillis(5); | ||
| /** Heartbeat interval in milliseconds. */ | ||
| private long heartbeatInterval; | ||
|
|
@@ -253,6 +253,19 @@ public FileSystem getFileSystemWithObserverReadProxyProvider() throws IOExceptio | |
| return DistributedFileSystem.get(observerReadConf); | ||
| } | ||
|
|
||
| public FileSystem getFileSystemWithConfiguredFailoverProxyProvider() throws IOException { | ||
| conf.set(DFS_NAMESERVICES, | ||
| conf.get(DFS_NAMESERVICES)+ ",router-service"); | ||
| conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1"); | ||
| conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1", | ||
| getFileSystemURI().toString()); | ||
| conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX | ||
| + "." + "router-service", ConfiguredFailoverProxyProvider.class.getName()); | ||
| DistributedFileSystem.setDefaultUri(conf, "hdfs://router-service"); | ||
|
|
||
| return DistributedFileSystem.get(conf); | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
|
|
||
| public DFSClient getClient(UserGroupInformation user) | ||
| throws IOException, URISyntaxException, InterruptedException { | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.