-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19235. IPC client uses CompletableFuture to support asynchronous operations. #6888
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
1f842eb
9ce4e5e
c30a91d
a901691
3a08429
8862575
7e2c44f
4f322f6
860435e
674204e
b278213
ec6beed
f0f2454
09c2feb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -96,8 +96,8 @@ public class Client implements AutoCloseable { | |
| private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>(); | ||
| private static final ThreadLocal<Object> EXTERNAL_CALL_HANDLER | ||
| = new ThreadLocal<>(); | ||
| private static final ThreadLocal<AsyncGet<? extends Writable, IOException>> | ||
| ASYNC_RPC_RESPONSE = new ThreadLocal<>(); | ||
| private static final ThreadLocal<CompletableFuture<Writable>> ASYNC_RPC_RESPONSE | ||
| = new ThreadLocal<>(); | ||
| private static final ThreadLocal<Boolean> asynchronousMode = | ||
| new ThreadLocal<Boolean>() { | ||
| @Override | ||
|
|
@@ -110,7 +110,50 @@ protected Boolean initialValue() { | |
| @Unstable | ||
| public static <T extends Writable> AsyncGet<T, IOException> | ||
| getAsyncRpcResponse() { | ||
| return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get(); | ||
| CompletableFuture<Writable> responseFuture = ASYNC_RPC_RESPONSE.get(); | ||
| return new AsyncGet<T, IOException>() { | ||
| @Override | ||
| public T get(long timeout, TimeUnit unit) | ||
| throws IOException, TimeoutException, InterruptedException { | ||
| try { | ||
| if (unit == null || timeout < 0) { | ||
| return (T) responseFuture.get(); | ||
| } | ||
| return (T) responseFuture.get(timeout, unit); | ||
| } catch (ExecutionException e) { | ||
| Throwable cause = e.getCause(); | ||
| if (cause == null) { | ||
| throw new IOException(e); | ||
| } | ||
| if (cause instanceof IOException) { | ||
| throw (IOException) cause; | ||
| } else { | ||
| throw new IOException(cause); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isDone() { | ||
| return responseFuture.isDone(); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Retrieves the current response future from the thread-local storage. | ||
| * | ||
| * @return A {@link CompletableFuture} of type T that represents the | ||
| * asynchronous operation. If no response future is present in | ||
| * the thread-local storage, this method returns {@code null}. | ||
| * @param <T> The type of the value completed by the returned | ||
| * {@link CompletableFuture}. It must be a subclass of | ||
| * {@link Writable}. | ||
| * @see CompletableFuture | ||
| * @see Writable | ||
| */ | ||
| public static <T extends Writable> CompletableFuture<T> getResponseFuture() { | ||
| return (CompletableFuture<T>) ASYNC_RPC_RESPONSE.get(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -283,6 +326,7 @@ static class Call { | |
| boolean done; // true when call is done | ||
| private final Object externalHandler; | ||
| private AlignmentContext alignmentContext; | ||
| private CompletableFuture<Object> completableFuture; | ||
|
||
|
|
||
| private Call(RPC.RpcKind rpcKind, Writable param) { | ||
| this.rpcKind = rpcKind; | ||
|
|
@@ -304,6 +348,9 @@ private Call(RPC.RpcKind rpcKind, Writable param) { | |
| } | ||
|
|
||
| this.externalHandler = EXTERNAL_CALL_HANDLER.get(); | ||
| if (Client.isAsynchronousMode()) { | ||
| completableFuture = new CompletableFuture<>(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -322,6 +369,9 @@ protected synchronized void callComplete() { | |
| externalHandler.notify(); | ||
| } | ||
| } | ||
| if (completableFuture != null) { | ||
| completableFuture.complete(this); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1495,36 +1545,16 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, | |
| } | ||
|
|
||
| if (isAsynchronousMode()) { | ||
| final AsyncGet<Writable, IOException> asyncGet | ||
| = new AsyncGet<Writable, IOException>() { | ||
| @Override | ||
| public Writable get(long timeout, TimeUnit unit) | ||
| throws IOException, TimeoutException{ | ||
| boolean done = true; | ||
| try { | ||
| final Writable w = getRpcResponse(call, connection, timeout, unit); | ||
| if (w == null) { | ||
| done = false; | ||
| throw new TimeoutException(call + " timed out " | ||
| + timeout + " " + unit); | ||
| } | ||
| return w; | ||
| } finally { | ||
| if (done) { | ||
| releaseAsyncCall(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isDone() { | ||
| synchronized (call) { | ||
| return call.done; | ||
| } | ||
| CompletableFuture<Writable> result = call.completableFuture.thenApply(o -> { | ||
| try { | ||
| return getRpcResponse(call, connection, -1, null); | ||
| } catch (IOException e) { | ||
| throw new CompletionException(e); | ||
| } finally { | ||
| releaseAsyncCall(); | ||
| } | ||
| }; | ||
|
|
||
| ASYNC_RPC_RESPONSE.set(asyncGet); | ||
| }); | ||
| ASYNC_RPC_RESPONSE.set(result); | ||
| return null; | ||
| } else { | ||
| return getRpcResponse(call, connection, -1, null); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.hadoop.util.functional.FutureIO should have what you need here already, with
raiseInnerCausedesigned to expand that excution exception in a lot more detail.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steveloughran Thank you for your suggestion. FutureIO is indeed a good util. we can consider using this util to improve this area in the future.