Skip to content

Commit 84e7cdb

Browse files
committed
HBASE-27222 Purge FutureReturnValueIgnored warnings from error prone (#4634)
Signed-off-by: Andrew Purtell <[email protected]> (cherry picked from commit 8b091c4)
1 parent 1fd772c commit 84e7cdb

File tree

22 files changed

+212
-82
lines changed

22 files changed

+212
-82
lines changed

hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,26 @@
2222
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
2323
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
2424
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
25+
import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
26+
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
27+
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
2528
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
2629
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
2730
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
2831

2932
import com.google.errorprone.annotations.RestrictedApi;
3033
import java.io.IOException;
31-
import java.io.InterruptedIOException;
3234
import java.nio.ByteBuffer;
35+
import java.util.ArrayList;
3336
import java.util.Collection;
3437
import java.util.Collections;
3538
import java.util.Iterator;
39+
import java.util.List;
3640
import java.util.Map;
3741
import java.util.Set;
3842
import java.util.concurrent.CompletableFuture;
3943
import java.util.concurrent.ConcurrentHashMap;
4044
import java.util.concurrent.ConcurrentLinkedDeque;
41-
import java.util.concurrent.ExecutionException;
4245
import java.util.concurrent.TimeUnit;
4346
import java.util.function.Supplier;
4447
import org.apache.hadoop.conf.Configuration;
@@ -48,6 +51,8 @@
4851
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
4952
import org.apache.hadoop.hbase.util.CancelableProgressable;
5053
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
54+
import org.apache.hadoop.hbase.util.FutureUtils;
55+
import org.apache.hadoop.hbase.util.NettyFutureUtils;
5156
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
5257
import org.apache.hadoop.hdfs.DFSClient;
5358
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -63,14 +68,13 @@
6368
import org.apache.yetus.audience.InterfaceAudience;
6469

6570
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
66-
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
6771
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
6872
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
6973
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
74+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
7075
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
7176
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
7277
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
73-
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
7478
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
7579
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
7680
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
@@ -252,7 +256,7 @@ private synchronized void failed(Channel channel, Supplier<Throwable> errorSuppl
252256
// disable further write, and fail all pending ack.
253257
state = State.BROKEN;
254258
failWaitingAckQueue(channel, errorSupplier);
255-
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
259+
datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose);
256260
}
257261

258262
private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) {
@@ -329,7 +333,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
329333
ByteBuf buf = alloc.buffer(len);
330334
heartbeat.putInBuffer(buf.nioBuffer(0, len));
331335
buf.writerIndex(len);
332-
ctx.channel().writeAndFlush(buf);
336+
safeWriteAndFlush(ctx.channel(), buf);
333337
}
334338
return;
335339
}
@@ -440,9 +444,9 @@ private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
440444
// TODO: we should perhaps measure time taken per DN here;
441445
// we could collect statistics per DN, and/or exclude bad nodes in createOutput.
442446
datanodeInfoMap.keySet().forEach(ch -> {
443-
ch.write(headerBuf.retainedDuplicate());
444-
ch.write(checksumBuf.retainedDuplicate());
445-
ch.writeAndFlush(dataBuf.retainedDuplicate());
447+
safeWrite(ch, headerBuf.retainedDuplicate());
448+
safeWrite(ch, checksumBuf.retainedDuplicate());
449+
safeWriteAndFlush(ch, dataBuf.retainedDuplicate());
446450
});
447451
checksumBuf.release();
448452
headerBuf.release();
@@ -562,31 +566,31 @@ private void endBlock() throws IOException {
562566
headerBuf.writerIndex(headerLen);
563567
CompletableFuture<Long> future = new CompletableFuture<>();
564568
waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
565-
datanodeInfoMap.keySet().forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
569+
datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate()));
566570
headerBuf.release();
567-
try {
568-
future.get();
569-
} catch (InterruptedException e) {
570-
throw (IOException) new InterruptedIOException().initCause(e);
571-
} catch (ExecutionException e) {
572-
Throwable cause = e.getCause();
573-
Throwables.propagateIfPossible(cause, IOException.class);
574-
throw new IOException(cause);
571+
FutureUtils.get(future);
572+
}
573+
574+
private void closeDataNodeChannelsAndAwait() {
575+
List<ChannelFuture> futures = new ArrayList<>();
576+
for (Channel ch : datanodeInfoMap.keySet()) {
577+
futures.add(ch.close());
578+
}
579+
for (ChannelFuture future : futures) {
580+
consume(future.awaitUninterruptibly());
575581
}
576582
}
577583

578584
/**
579585
* The close method when error occurred. Now we just call recoverFileLease.
580586
*/
581587
@Override
582-
@SuppressWarnings("FutureReturnValueIgnored")
583588
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
584589
if (buf != null) {
585590
buf.release();
586591
buf = null;
587592
}
588-
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
589-
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
593+
closeDataNodeChannelsAndAwait();
590594
endFileLease(client, fileId);
591595
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
592596
reporter == null ? new CancelOnClose(client) : reporter);
@@ -597,12 +601,10 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException
597601
* {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
598602
*/
599603
@Override
600-
@SuppressWarnings("FutureReturnValueIgnored")
601604
public void close() throws IOException {
602605
endBlock();
603606
state = State.CLOSED;
604-
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
605-
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
607+
closeDataNodeChannelsAndAwait();
606608
block.setNumBytes(ackedBlockLength);
607609
completeFile(client, namenode, src, clientName, block, fileId);
608610
}

hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
2121
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
22+
import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener;
23+
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
24+
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
2225
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
2326
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
2427
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
@@ -409,7 +412,7 @@ private static void requestWriteBlock(Channel channel, StorageType storageType,
409412
buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
410413
buffer.writeByte(Op.WRITE_BLOCK.code);
411414
proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
412-
channel.writeAndFlush(buffer);
415+
safeWriteAndFlush(channel, buffer);
413416
}
414417

415418
private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
@@ -418,7 +421,7 @@ private static void initialize(Configuration conf, Channel channel, DatanodeInfo
418421
throws IOException {
419422
Promise<Void> saslPromise = channel.eventLoop().newPromise();
420423
trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
421-
saslPromise.addListener(new FutureListener<Void>() {
424+
addListener(saslPromise, new FutureListener<Void>() {
422425

423426
@Override
424427
public void operationComplete(Future<Void> future) throws Exception {
@@ -462,7 +465,7 @@ private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSC
462465
Promise<Channel> promise = eventLoopGroup.next().newPromise();
463466
futureList.add(promise);
464467
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
465-
new Bootstrap().group(eventLoopGroup).channel(channelClass)
468+
addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass)
466469
.option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
467470

468471
@Override
@@ -471,7 +474,7 @@ protected void initChannel(Channel ch) throws Exception {
471474
// channel connected. Leave an empty implementation here because netty does not allow
472475
// a null handler.
473476
}
474-
}).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
477+
}).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() {
475478

476479
@Override
477480
public void operationComplete(ChannelFuture future) throws Exception {
@@ -593,12 +596,12 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
593596
if (!succ) {
594597
if (futureList != null) {
595598
for (Future<Channel> f : futureList) {
596-
f.addListener(new FutureListener<Channel>() {
599+
addListener(f, new FutureListener<Channel>() {
597600

598601
@Override
599602
public void operationComplete(Future<Channel> future) throws Exception {
600603
if (future.isSuccess()) {
601-
future.getNow().close();
604+
safeClose(future.getNow());
602605
}
603606
}
604607
});

hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.asyncfs;
1919

20+
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
2021
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
2122
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
2223

@@ -448,12 +449,12 @@ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
448449
size += CodedOutputStream.computeRawVarint32Size(size);
449450
ByteBuf buf = ctx.alloc().buffer(size);
450451
proto.writeDelimitedTo(new ByteBufOutputStream(buf));
451-
ctx.write(buf);
452+
safeWrite(ctx, buf);
452453
}
453454

454455
@Override
455456
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
456-
ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
457+
safeWrite(ctx, ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
457458
sendSaslMessage(ctx, new byte[0]);
458459
ctx.flush();
459460
step++;
@@ -642,7 +643,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
642643
cBuf.addComponent(buf);
643644
cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
644645
} else {
645-
ctx.write(msg);
646+
safeWrite(ctx, msg);
646647
}
647648
}
648649

@@ -656,7 +657,7 @@ public void flush(ChannelHandlerContext ctx) throws Exception {
656657
ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
657658
buf.writeInt(wrapped.length);
658659
buf.writeBytes(wrapped);
659-
ctx.write(buf);
660+
safeWrite(ctx, buf);
660661
}
661662
ctx.flush();
662663
}

hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.asyncfs;
1919

20+
import static org.apache.hadoop.hbase.util.FutureUtils.consume;
2021
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
2122
import static org.hamcrest.CoreMatchers.instanceOf;
2223
import static org.hamcrest.MatcherAssert.assertThat;
@@ -93,9 +94,9 @@ public static void setUp() throws Exception {
9394
}
9495

9596
@AfterClass
96-
public static void tearDown() throws IOException, InterruptedException {
97+
public static void tearDown() throws Exception {
9798
if (EVENT_LOOP_GROUP != null) {
98-
EVENT_LOOP_GROUP.shutdownGracefully().sync();
99+
EVENT_LOOP_GROUP.shutdownGracefully().get();
99100
}
100101
shutdownMiniDFSCluster();
101102
}
@@ -262,7 +263,7 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec
262263
byte[] b = new byte[50 * 1024 * 1024];
263264
Bytes.random(b);
264265
out.write(b);
265-
out.flush(false);
266+
consume(out.flush(false));
266267
assertEquals(b.length, out.flush(false).get().longValue());
267268
out.close();
268269
assertEquals(b.length, FS.getFileStatus(f).getLen());

hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.junit.Assert.assertTrue;
2121
import static org.junit.Assert.fail;
2222

23-
import java.io.IOException;
2423
import java.util.ArrayList;
2524
import java.util.Iterator;
2625
import java.util.List;
@@ -103,12 +102,12 @@ public static void setUp() throws Exception {
103102
}
104103

105104
@AfterClass
106-
public static void tearDown() throws IOException, InterruptedException {
105+
public static void tearDown() throws Exception {
107106
if (OUT != null) {
108107
OUT.recoverAndClose(null);
109108
}
110109
if (EVENT_LOOP_GROUP != null) {
111-
EVENT_LOOP_GROUP.shutdownGracefully().sync();
110+
EVENT_LOOP_GROUP.shutdownGracefully().get();
112111
}
113112
shutdownMiniDFSCluster();
114113
}

hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ public class TestLocalAsyncOutput {
5353
private static StreamSlowMonitor MONITOR;
5454

5555
@AfterClass
56-
public static void tearDownAfterClass() throws IOException {
56+
public static void tearDownAfterClass() throws Exception {
5757
TEST_UTIL.cleanupTestDir();
58-
GROUP.shutdownGracefully();
58+
GROUP.shutdownGracefully().get();
5959
MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), "testMonitor");
6060
}
6161

hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,9 @@ public static void setUpBeforeClass() throws Exception {
194194
}
195195

196196
@AfterClass
197-
public static void tearDownAfterClass() throws IOException, InterruptedException {
197+
public static void tearDownAfterClass() throws Exception {
198198
if (EVENT_LOOP_GROUP != null) {
199-
EVENT_LOOP_GROUP.shutdownGracefully().sync();
199+
EVENT_LOOP_GROUP.shutdownGracefully().get();
200200
}
201201
if (KDC != null) {
202202
KDC.stop();

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
21+
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
22+
2023
import java.io.Closeable;
2124
import java.io.IOException;
2225
import java.lang.reflect.Constructor;
@@ -183,7 +186,6 @@ public MulticastListener() {
183186

184187
@Override
185188
public void connect(Configuration conf) throws IOException {
186-
187189
String mcAddress =
188190
conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
189191
String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
@@ -218,16 +220,21 @@ public void connect(Configuration conf) throws IOException {
218220
}
219221

220222
LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
221-
channel.joinGroup(ina, ni, null, channel.newPromise());
223+
try {
224+
consume(channel.joinGroup(ina, ni, null).sync());
225+
} catch (InterruptedException e) {
226+
close();
227+
throw ExceptionUtil.asInterrupt(e);
228+
}
222229
}
223230

224231
@Override
225232
public void close() {
226233
if (channel != null) {
227-
channel.close();
234+
safeClose(channel);
228235
channel = null;
229236
}
230-
group.shutdownGracefully();
237+
consume(group.shutdownGracefully());
231238
}
232239

233240
/**

hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void testWrapConnectionException() throws Exception {
121121
}
122122

123123
@Test
124-
public void testExecute() throws IOException {
124+
public void testExecute() throws Exception {
125125
EventLoop eventLoop = new DefaultEventLoop();
126126
MutableInt executed = new MutableInt(0);
127127
MutableInt numStackTraceElements = new MutableInt(0);
@@ -156,7 +156,7 @@ public void run() {
156156
});
157157
FutureUtils.get(future);
158158
} finally {
159-
eventLoop.shutdownGracefully();
159+
eventLoop.shutdownGracefully().get();
160160
}
161161
}
162162
}

hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,17 @@ public static <T> void addListener(CompletableFuture<T> future,
9393
}, executor);
9494
}
9595

96+
/**
97+
* Log the error if the future indicates any failure.
98+
*/
99+
public static void consume(CompletableFuture<?> future) {
100+
addListener(future, (r, e) -> {
101+
if (e != null) {
102+
LOG.warn("Async operation fails", e);
103+
}
104+
});
105+
}
106+
96107
/**
97108
* Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
98109
* the callbacks in the given {@code executor}.

0 commit comments

Comments
 (0)