-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-26679 Wait on the future returned by FanOutOneBlockAsyncDFSOutp… #4039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
8272568
27a27a3
4824bbd
7ed13c4
2cc37b3
220b87e
51fba83
3c25273
fd66b24
e6b0ac8
3313689
4f3d56c
d9b6a0e
78d56c1
2b8f92b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import static org.hamcrest.MatcherAssert.assertThat; | ||
| import static org.junit.Assert.assertArrayEquals; | ||
| import static org.junit.Assert.assertEquals; | ||
| import static org.junit.Assert.assertTrue; | ||
| import static org.junit.Assert.fail; | ||
|
|
||
| import java.io.FileNotFoundException; | ||
|
|
@@ -30,9 +31,12 @@ | |
| import java.lang.reflect.InvocationTargetException; | ||
| import java.lang.reflect.Method; | ||
| import java.util.ArrayList; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Random; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CyclicBarrier; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ThreadLocalRandom; | ||
| import org.apache.hadoop.fs.FSDataInputStream; | ||
|
|
@@ -46,6 +50,7 @@ | |
| import org.apache.hadoop.hbase.testclassification.MiscTests; | ||
| import org.apache.hadoop.hdfs.DistributedFileSystem; | ||
| import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; | ||
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; | ||
| import org.apache.hadoop.hdfs.server.datanode.DataNode; | ||
| import org.apache.hadoop.ipc.RemoteException; | ||
| import org.junit.AfterClass; | ||
|
|
@@ -57,13 +62,16 @@ | |
| import org.junit.rules.TestName; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; | ||
| import org.apache.hbase.thirdparty.io.netty.channel.Channel; | ||
| import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; | ||
| import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter; | ||
| import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; | ||
| import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; | ||
| import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; | ||
| import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; | ||
|
|
||
|
|
||
| @Category({ MiscTests.class, MediumTests.class }) | ||
| public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { | ||
|
|
||
|
|
@@ -272,4 +280,142 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec | |
| } | ||
| assertArrayEquals(b, actual); | ||
| } | ||
|
|
||
| /** | ||
| * <pre> | ||
| * This test is for HBASE-26679. Consider there are two dataNodes: dn1 and dn2,dn2 is a slow DN. | ||
| * The threads sequence before HBASE-26679 is: | ||
| * 1.We write some data to {@link FanOutOneBlockAsyncDFSOutput} and then flush it, there are one | ||
| * {@link FanOutOneBlockAsyncDFSOutput.Callback} in | ||
| * {@link FanOutOneBlockAsyncDFSOutput#waitingAckQueue}. | ||
| * 2.The ack from dn1 arrives firstly and triggers Netty to invoke | ||
| * {@link FanOutOneBlockAsyncDFSOutput#completed} with dn1's channel, then in | ||
| * {@link FanOutOneBlockAsyncDFSOutput#completed}, dn1's channel is removed from | ||
| * {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas}. | ||
| * 3.But dn2 responds slowly, before dn2 sending ack,dn1 is shut down or have a exception, | ||
| * so {@link FanOutOneBlockAsyncDFSOutput#failed} is triggered by Netty with dn1's channel, | ||
| * and because the {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas} does not | ||
| * contain dn1's channel,the {@link FanOutOneBlockAsyncDFSOutput.Callback} is skipped in | ||
| * {@link FanOutOneBlockAsyncDFSOutput#failed} method,and | ||
| * {@link FanOutOneBlockAsyncDFSOutput#state} is set to | ||
| * {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},and dn1,dn2 are all closed at the end of | ||
| * {@link FanOutOneBlockAsyncDFSOutput#failed}. | ||
| * 4.{@link FanOutOneBlockAsyncDFSOutput#failed} is triggered again by dn2 because it is closed, | ||
| * but because {@link FanOutOneBlockAsyncDFSOutput#state} is already | ||
| * {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},the whole | ||
| * {@link FanOutOneBlockAsyncDFSOutput#failed} is skipped. So wait on the future | ||
| * returned by {@link FanOutOneBlockAsyncDFSOutput#flush} would be stuck for ever. | ||
| * After HBASE-26679, for above step 4,even if the {@link FanOutOneBlockAsyncDFSOutput#state} | ||
| * is already {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN}, we would still try to trigger | ||
| * {@link FanOutOneBlockAsyncDFSOutput.Callback#future}. | ||
| * </pre> | ||
| */ | ||
| @Test | ||
| public void testFlushStuckWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception { | ||
|
||
| Path f = new Path("/" + name.getMethodName()); | ||
| EventLoop eventLoop = EVENT_LOOP_GROUP.next(); | ||
|
|
||
| DataNodeProperties firstDataNodeProperties = null; | ||
| try { | ||
| FanOutOneBlockAsyncDFSOutput out = | ||
| FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, | ||
| true, false, (short) 2, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); | ||
| final CyclicBarrier dn1AckReceivedCyclicBarrier = new CyclicBarrier(2); | ||
| Map<Channel,DatanodeInfo> datanodeInfoMap = out.getDatanodeInfoMap(); | ||
| Iterator<Map.Entry<Channel,DatanodeInfo>> iterator = datanodeInfoMap.entrySet().iterator(); | ||
| assertTrue(iterator.hasNext()); | ||
| Map.Entry<Channel,DatanodeInfo> dn1Entry= iterator.next(); | ||
| Channel dn1Channel = dn1Entry.getKey(); | ||
| DatanodeInfo dn1DatanodeInfo = dn1Entry.getValue(); | ||
| final List<String> protobufDecoderNames = new ArrayList<String>(); | ||
| dn1Channel.pipeline().forEach((entry) -> { | ||
| if (ProtobufDecoder.class.isInstance(entry.getValue())) { | ||
| protobufDecoderNames.add(entry.getKey()); | ||
| } | ||
| }); | ||
| assertTrue(protobufDecoderNames.size() == 1); | ||
| dn1Channel.pipeline().addAfter(protobufDecoderNames.get(0), "dn1AckReceivedHandler", | ||
|
||
| new ChannelInboundHandlerAdapter() { | ||
| @Override | ||
| public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | ||
| super.channelRead(ctx, msg); | ||
| dn1AckReceivedCyclicBarrier.await(); | ||
| } | ||
| }); | ||
|
|
||
| assertTrue(iterator.hasNext()); | ||
| Map.Entry<Channel,DatanodeInfo> dn2Entry= iterator.next(); | ||
| Channel dn2Channel= dn2Entry.getKey(); | ||
|
|
||
| /** | ||
| * Here we add a {@link ChannelInboundHandlerAdapter} to eat all the responses to simulate a | ||
| * slow dn2. | ||
| */ | ||
| dn2Channel.pipeline().addFirst(new ChannelInboundHandlerAdapter() { | ||
|
|
||
| @Override | ||
| public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | ||
| if (!(msg instanceof ByteBuf)) { | ||
| ctx.fireChannelRead(msg); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| byte[] b = new byte[10]; | ||
| ThreadLocalRandom.current().nextBytes(b); | ||
| out.write(b, 0, b.length); | ||
| CompletableFuture<Long> future = out.flush(false); | ||
| /** | ||
| * Wait for ack from dn1. | ||
| */ | ||
| dn1AckReceivedCyclicBarrier.await(); | ||
| /** | ||
| * First ack is received from dn1,we could stop dn1 now. | ||
| */ | ||
| firstDataNodeProperties = findAndKillFirstDataNode(dn1DatanodeInfo); | ||
| assertTrue(firstDataNodeProperties != null); | ||
| try { | ||
| /** | ||
| * Before HBASE-26679,here we should be stuck, after HBASE-26679,we would fail soon with | ||
| * {@link ExecutionException}. | ||
| */ | ||
| future.get(); | ||
| fail(); | ||
| } catch (ExecutionException e) { | ||
| assertTrue(e != null); | ||
| LOG.info("expected exception caught when get future", e); | ||
| } | ||
| /** | ||
| * Make sure all the data node channel are closed. | ||
| */ | ||
| datanodeInfoMap.keySet().forEach(ch -> { | ||
| try { | ||
| ch.closeFuture().get(); | ||
| } catch (InterruptedException | ExecutionException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| }); | ||
| } finally { | ||
| if (firstDataNodeProperties != null) { | ||
| CLUSTER.restartDataNode(firstDataNodeProperties); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static DataNodeProperties findAndKillFirstDataNode( | ||
| DatanodeInfo firstDatanodeInfo) { | ||
| assertTrue(firstDatanodeInfo != null); | ||
| ArrayList<DataNode> dataNodes = CLUSTER.getDataNodes(); | ||
| ArrayList<Integer> foundIndexes = new ArrayList<Integer>(); | ||
| int index = 0; | ||
| for (DataNode dataNode : dataNodes) { | ||
| if (firstDatanodeInfo.getXferAddr().equals(dataNode.getDatanodeId().getXferAddr())) { | ||
| foundIndexes.add(index); | ||
| } | ||
| index++; | ||
| } | ||
| assertTrue(foundIndexes.size() == 1); | ||
| return CLUSTER.stopDataNode(foundIndexes.get(0)); | ||
| } | ||
|
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.