-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-20902 when WAL sync failed, we should bypass the failed DN that previously used #205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
6ad0a40
a39ff69
a27093a
35b6876
d055d11
55afae6
934edcf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,7 @@ | |
| import org.apache.hadoop.fs.UnresolvedLinkException; | ||
| import org.apache.hadoop.fs.permission.FsPermission; | ||
| import org.apache.hadoop.hbase.client.ConnectionUtils; | ||
| import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; | ||
| import org.apache.hadoop.hbase.util.CancelableProgressable; | ||
| import org.apache.hadoop.hbase.util.FSUtils; | ||
| import org.apache.hadoop.hdfs.DFSClient; | ||
|
|
@@ -682,7 +683,7 @@ private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSC | |
| DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); | ||
| boolean connectToDnViaHostname = | ||
| conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); | ||
| int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); | ||
| int timeoutMs = conf.getInt(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, AbstractFSWAL.DEFAULT_WAL_SYNC_TIMEOUT_MS); | ||
| ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); | ||
| blockCopy.setNumBytes(locatedBlock.getBlockSize()); | ||
| ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() | ||
|
|
@@ -740,7 +741,7 @@ public NameNodeException(Throwable cause) { | |
| } | ||
| } | ||
|
|
||
| private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, | ||
| private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, Path oldPath, | ||
| boolean overwrite, boolean createParent, short replication, long blockSize, | ||
| EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { | ||
| Configuration conf = dfs.getConf(); | ||
|
|
@@ -751,6 +752,24 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d | |
| int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, | ||
| DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); | ||
| DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY; | ||
| if (oldPath != null) { | ||
| String oldPathStr = oldPath.toUri().getPath(); | ||
| long len = namenode.getFileInfo(oldPathStr).getLen(); | ||
| for(LocatedBlock block : namenode.getBlockLocations(oldPathStr, Math.max(0, len - 1), len) | ||
| .getLocatedBlocks()) { | ||
| for(DatanodeInfo dn : block.getLocations()) { | ||
| excludesNodes = ArrayUtils.add(excludesNodes, dn); | ||
| } | ||
| } | ||
| if (LOG.isDebugEnabled()) { | ||
| StringBuilder sb = new StringBuilder("create new output because old wal sync failed, old path is: "); | ||
| sb.append(oldPathStr).append(", newPath excludesNodes are :"); | ||
|
||
| for(DatanodeInfo info : excludesNodes) { | ||
|
||
| sb.append(info.getInfoAddr()).append(";"); | ||
|
||
| } | ||
|
||
| LOG.debug(sb.toString()); | ||
|
||
| } | ||
| } | ||
| for (int retry = 0;; retry++) { | ||
| HdfsFileStatus stat; | ||
| try { | ||
|
|
@@ -837,15 +856,15 @@ public void operationComplete(Future<Channel> future) throws Exception { | |
| * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it | ||
| * inside an {@link EventLoop}. | ||
| */ | ||
| public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, | ||
| public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, Path oldPath, | ||
| boolean overwrite, boolean createParent, short replication, long blockSize, | ||
| EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { | ||
| return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { | ||
|
|
||
| @Override | ||
| public FanOutOneBlockAsyncDFSOutput doCall(Path p) | ||
| throws IOException, UnresolvedLinkException { | ||
| return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, | ||
| return createOutput(dfs, p.toUri().getPath(), oldPath, overwrite, createParent, replication, | ||
| blockSize, eventLoopGroup, channelClass); | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it always adding nodes into exclude list but never check and remove even after the DN recovers, right? So it's possible that one day all DN nodes are excluded and the OutputStream will fail due to
could only be replicated to 0 nodes?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for your review @carp84, there seems to be some difference opinions with HBASE-22301, i will fix the checkstyle first.
excludesNodes are not a global variable, each FanOutOneBlockAsyncDFSOutput will use different instance, when new FanOutOneBlockAsyncDFSOutput created, it's Initial excludesNodes will be an empty array(code in FanOutOneBlockAsyncDFSOutputHelper#createOutput)