Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit 8989241

Browse files
Fix LRO callables so that the ApiCallContext is always passed through. (#600)
1 parent eca14d4 commit 8989241

8 files changed

Lines changed: 255 additions & 37 deletions

File tree

benchmark/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ project.version = "0.35.1-SNAPSHOT" // {x-version-update:benchmark:current}
22

33
buildscript {
44
repositories {
5-
jcenter()
65
maven {
76
url "https://plugins.gradle.org/m2/"
87
}

gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.api.core.ApiFutures;
3434
import com.google.api.gax.retrying.NonCancellableFuture;
3535
import com.google.api.gax.retrying.RetryingFuture;
36+
import com.google.common.base.Preconditions;
3637
import java.util.concurrent.Callable;
3738
import org.threeten.bp.Duration;
3839

@@ -54,26 +55,25 @@ class AttemptCallable<RequestT, ResponseT> implements Callable<ResponseT> {
5455

5556
AttemptCallable(
5657
UnaryCallable<RequestT, ResponseT> callable, RequestT request, ApiCallContext callContext) {
57-
this.callable = callable;
58-
this.request = request;
59-
this.originalCallContext = callContext;
58+
this.callable = Preconditions.checkNotNull(callable);
59+
this.request = Preconditions.checkNotNull(request);
60+
this.originalCallContext = Preconditions.checkNotNull(callContext);
6061
}
6162

6263
public void setExternalFuture(RetryingFuture<ResponseT> externalFuture) {
63-
this.externalFuture = externalFuture;
64+
this.externalFuture = Preconditions.checkNotNull(externalFuture);
6465
}
6566

6667
@Override
6768
public ResponseT call() {
6869
ApiCallContext callContext = originalCallContext;
6970

7071
try {
71-
if (callContext != null) {
72-
Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout();
73-
if (!rpcTimeout.isZero()) {
74-
callContext = callContext.withTimeout(rpcTimeout);
75-
}
72+
Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout();
73+
if (!rpcTimeout.isZero()) {
74+
callContext = callContext.withTimeout(rpcTimeout);
7675
}
76+
7777
externalFuture.setAttemptFuture(new NonCancellableFuture<ResponseT>());
7878
if (externalFuture.isDone()) {
7979
return null;

gax/src/main/java/com/google/api/gax/rpc/CheckingAttemptCallable.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import com.google.api.core.ApiFutures;
3434
import com.google.api.gax.retrying.NonCancellableFuture;
3535
import com.google.api.gax.retrying.RetryingFuture;
36+
import com.google.common.base.Preconditions;
3637
import java.util.concurrent.Callable;
38+
import org.threeten.bp.Duration;
3739

3840
/**
3941
* A callable representing an attempt to check the status of something by issuing a call to a
@@ -46,25 +48,37 @@
4648
*/
4749
class CheckingAttemptCallable<RequestT, ResponseT> implements Callable<ResponseT> {
4850
private final UnaryCallable<RequestT, ResponseT> callable;
51+
private final ApiCallContext originalCallContext;
4952

5053
private volatile RetryingFuture<ResponseT> externalFuture;
5154

52-
CheckingAttemptCallable(UnaryCallable<RequestT, ResponseT> callable) {
53-
this.callable = callable;
55+
CheckingAttemptCallable(UnaryCallable<RequestT, ResponseT> callable, ApiCallContext callContext) {
56+
this.callable = Preconditions.checkNotNull(callable);
57+
this.originalCallContext = Preconditions.checkNotNull(callContext);
5458
}
5559

5660
public void setExternalFuture(RetryingFuture<ResponseT> externalFuture) {
57-
this.externalFuture = externalFuture;
61+
this.externalFuture = Preconditions.checkNotNull(externalFuture);
5862
}
5963

6064
@Override
6165
public ResponseT call() {
66+
ApiCallContext callContext = originalCallContext;
67+
6268
try {
69+
Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout();
70+
if (!rpcTimeout.isZero()) {
71+
callContext = callContext.withTimeout(rpcTimeout);
72+
}
73+
6374
externalFuture.setAttemptFuture(new NonCancellableFuture<ResponseT>());
6475
if (externalFuture.isDone()) {
6576
return null;
6677
}
67-
ApiFuture<ResponseT> internalFuture = callable.futureCall(null, null);
78+
// NOTE: The callable here is an OperationCheckingCallable, which will compose its own
79+
// request using a resolved operation name and ignore anything that we pass here for the
80+
// request.
81+
ApiFuture<ResponseT> internalFuture = callable.futureCall(null, callContext);
6882
externalFuture.setAttemptFuture(internalFuture);
6983
} catch (Throwable e) {
7084
externalFuture.setAttemptFuture(ApiFutures.<ResponseT>immediateFailedFuture(e));

gax/src/main/java/com/google/api/gax/rpc/OperationCallableImpl.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,21 +73,27 @@ class OperationCallableImpl<RequestT, ResponseT, MetadataT>
7373
* time.
7474
*
7575
* @param request The request to initiate the operation.
76-
* @param context {@link ApiCallContext} to make the call with
76+
* @param callContext {@link ApiCallContext} to make the call with
7777
* @return {@link OperationFuture} for the call result
7878
*/
7979
@Override
8080
public OperationFuture<ResponseT, MetadataT> futureCall(
81-
RequestT request, ApiCallContext context) {
82-
return futureCall(initialCallable.futureCall(request, context));
81+
RequestT request, ApiCallContext callContext) {
82+
ApiFuture<OperationSnapshot> initialFuture = initialCallable.futureCall(request, callContext);
83+
return futureCall(initialFuture, callContext);
8384
}
8485

85-
OperationFutureImpl<ResponseT, MetadataT> futureCall(ApiFuture<OperationSnapshot> initialFuture) {
86+
/** Waits for the initialFuture to resolve and then starts to poll the return operation. */
87+
OperationFutureImpl<ResponseT, MetadataT> futureCall(
88+
ApiFuture<OperationSnapshot> initialFuture, ApiCallContext callContext) {
89+
8690
RecheckingCallable<RequestT, OperationSnapshot> callable =
8791
new RecheckingCallable<>(
8892
new OperationCheckingCallable<RequestT>(longRunningClient, initialFuture), executor);
8993

90-
RetryingFuture<OperationSnapshot> pollingFuture = callable.futureCall(null, null);
94+
// NOTE: OperationCheckingCallable will compose its own request using the resolved
95+
// initialFuture. So the request parameter to futureCall is ignored
96+
RetryingFuture<OperationSnapshot> pollingFuture = callable.futureCall(null, callContext);
9197
return new OperationFutureImpl<>(
9298
pollingFuture, initialFuture, responseTransformer, metadataTransformer);
9399
}
@@ -98,24 +104,26 @@ OperationFutureImpl<ResponseT, MetadataT> futureCall(ApiFuture<OperationSnapshot
98104
* operation finishes.
99105
*
100106
* @param operationName The name of the operation to resume.
101-
* @param context {@link ApiCallContext} to make the call with
107+
* @param callContext {@link ApiCallContext} to make the call with
102108
* @return {@link OperationFuture} for the call result.
103109
*/
104110
@Override
105111
public OperationFuture<ResponseT, MetadataT> resumeFutureCall(
106-
String operationName, ApiCallContext context) {
107-
return futureCall(longRunningClient.getOperationCallable().futureCall(operationName, context));
112+
String operationName, ApiCallContext callContext) {
113+
ApiFuture<OperationSnapshot> firstAttempt =
114+
longRunningClient.getOperationCallable().futureCall(operationName, callContext);
115+
return futureCall(firstAttempt, callContext);
108116
}
109117

110118
/**
111119
* Sends a cancellation request to the server for the operation with name {@code operationName}.
112120
*
113121
* @param operationName The name of the operation to cancel.
114-
* @param context {@link ApiCallContext} to make the call with
122+
* @param callContext {@link ApiCallContext} to make the call with
115123
* @return the future which completes once the operation is canceled on the server side.
116124
*/
117125
@Override
118-
public ApiFuture<Void> cancel(String operationName, ApiCallContext context) {
119-
return longRunningClient.cancelOperationCallable().futureCall(operationName, context);
126+
public ApiFuture<Void> cancel(String operationName, ApiCallContext callContext) {
127+
return longRunningClient.cancelOperationCallable().futureCall(operationName, callContext);
120128
}
121129
}

gax/src/main/java/com/google/api/gax/rpc/OperationCheckingCallable.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,24 +59,26 @@ class OperationCheckingCallable<RequestT> extends UnaryCallable<RequestT, Operat
5959
/**
6060
* This method is supposed to be called from {@link AttemptCallable#call()}
6161
*
62-
* @param request request
63-
* @param context call context
62+
* @param ignored The ignored request; the actual request will be composed based on the result of
63+
* the {@code initialFuture}.
64+
* @param callContext call context
6465
*/
6566
@Override
66-
public ApiFuture<OperationSnapshot> futureCall(RequestT request, ApiCallContext context) {
67+
public ApiFuture<OperationSnapshot> futureCall(RequestT ignored, ApiCallContext callContext) {
6768
try {
6869
if (!initialFuture.isDone() || initialFuture.isCancelled()) {
6970
return initialFuture;
7071
}
7172
// Since initialFuture is done at this point, the following call should be non-blocking
7273
OperationSnapshot initialOperation = initialFuture.get();
7374

74-
// Note Future.isDone() and Operation.getDone() are two fundamentally different things.
7575
if (initialOperation.isDone()) {
7676
return initialFuture;
7777
}
7878

79-
return longRunningClient.getOperationCallable().futureCall(initialOperation.getName(), null);
79+
return longRunningClient
80+
.getOperationCallable()
81+
.futureCall(initialOperation.getName(), callContext);
8082
} catch (ExecutionException e) {
8183
return ApiFutures.immediateFailedFuture(e.getCause());
8284
} catch (InterruptedException e) {

gax/src/main/java/com/google/api/gax/rpc/RecheckingCallable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
* A UnaryCallable that will keep issuing calls to an inner callable until a terminal condition is
3838
* met.
3939
*
40-
* <p>Note: Any request or context passed to this class is ignored.
40+
* <p>Note: Any request passed to this class is ignored.
4141
*
4242
* <p>Package-private for internal use.
4343
*/
@@ -52,9 +52,9 @@ class RecheckingCallable<RequestT, ResponseT> extends UnaryCallable<RequestT, Re
5252
}
5353

5454
@Override
55-
public RetryingFuture<ResponseT> futureCall(RequestT request, ApiCallContext inputContext) {
55+
public RetryingFuture<ResponseT> futureCall(RequestT ignored, ApiCallContext inputContext) {
5656
CheckingAttemptCallable<RequestT, ResponseT> checkingAttemptCallable =
57-
new CheckingAttemptCallable<>(callable);
57+
new CheckingAttemptCallable<>(callable, inputContext);
5858

5959
RetryingFuture<ResponseT> retryingFuture = executor.createFuture(checkingAttemptCallable);
6060
checkingAttemptCallable.setExternalFuture(retryingFuture);
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2016 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.rpc;
31+
32+
import static com.google.common.truth.Truth.assertThat;
33+
34+
import com.google.api.core.SettableApiFuture;
35+
import com.google.api.gax.retrying.RetrySettings;
36+
import com.google.api.gax.retrying.RetryingFuture;
37+
import com.google.api.gax.retrying.TimedAttemptSettings;
38+
import com.google.api.gax.rpc.testing.FakeCallContext;
39+
import org.junit.Before;
40+
import org.junit.Test;
41+
import org.junit.runner.RunWith;
42+
import org.mockito.ArgumentCaptor;
43+
import org.mockito.Mock;
44+
import org.mockito.Mockito;
45+
import org.mockito.invocation.InvocationOnMock;
46+
import org.mockito.junit.MockitoJUnitRunner;
47+
import org.mockito.stubbing.Answer;
48+
import org.threeten.bp.Duration;
49+
50+
@RunWith(MockitoJUnitRunner.class)
51+
public class CheckingAttemptCallableTest {
52+
@Mock UnaryCallable<String, String> mockInnerCallable;
53+
ArgumentCaptor<ApiCallContext> capturedCallContext;
54+
@Mock RetryingFuture<String> mockExternalFuture;
55+
TimedAttemptSettings currentAttemptSettings;
56+
57+
@Before
58+
public void setUp() {
59+
capturedCallContext = ArgumentCaptor.forClass(ApiCallContext.class);
60+
Mockito.when(mockInnerCallable.futureCall(Mockito.<String>any(), capturedCallContext.capture()))
61+
.thenReturn(SettableApiFuture.<String>create());
62+
63+
currentAttemptSettings =
64+
TimedAttemptSettings.newBuilder()
65+
.setGlobalSettings(RetrySettings.newBuilder().build())
66+
.setAttemptCount(0)
67+
.setFirstAttemptStartTimeNanos(0)
68+
.setRetryDelay(Duration.ofSeconds(1))
69+
.setRandomizedRetryDelay(Duration.ofSeconds(1))
70+
.setRpcTimeout(Duration.ZERO)
71+
.build();
72+
73+
Mockito.when(mockExternalFuture.getAttemptSettings())
74+
.thenAnswer(
75+
new Answer<TimedAttemptSettings>() {
76+
@Override
77+
public TimedAttemptSettings answer(InvocationOnMock invocation) throws Throwable {
78+
return currentAttemptSettings;
79+
}
80+
});
81+
}
82+
83+
@Test
84+
public void testRpcTimeout() {
85+
CheckingAttemptCallable<String, String> callable =
86+
new CheckingAttemptCallable<>(mockInnerCallable, FakeCallContext.createDefault());
87+
callable.setExternalFuture(mockExternalFuture);
88+
89+
// Make sure that the rpc timeout is set
90+
Duration timeout = Duration.ofSeconds(10);
91+
currentAttemptSettings = currentAttemptSettings.toBuilder().setRpcTimeout(timeout).build();
92+
93+
callable.call();
94+
95+
assertThat(capturedCallContext.getValue().getTimeout()).isEqualTo(timeout);
96+
97+
// Make sure that subsequent attempts can extend the time out
98+
Duration longerTimeout = Duration.ofSeconds(20);
99+
currentAttemptSettings =
100+
currentAttemptSettings.toBuilder().setRpcTimeout(longerTimeout).build();
101+
callable.call();
102+
assertThat(capturedCallContext.getValue().getTimeout()).isEqualTo(longerTimeout);
103+
}
104+
}

0 commit comments

Comments
 (0)