Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,20 @@ class BasicRetryingFuture<ResponseT> extends AbstractFuture<ResponseT>
private final Callable<ResponseT> callable;

private final RetryAlgorithm<ResponseT> retryAlgorithm;
private final RetryingContext retryingContext;

private volatile TimedAttemptSettings attemptSettings;

private volatile ApiFuture<ResponseT> latestCompletedAttemptResult;
private volatile ApiFuture<ResponseT> attemptResult;

BasicRetryingFuture(Callable<ResponseT> callable, RetryAlgorithm<ResponseT> retryAlgorithm) {
BasicRetryingFuture(
Callable<ResponseT> callable,
RetryAlgorithm<ResponseT> retryAlgorithm,
RetryingContext context) {
this.callable = checkNotNull(callable);
this.retryAlgorithm = checkNotNull(retryAlgorithm);
this.retryingContext = checkNotNull(context);

this.attemptSettings = retryAlgorithm.createFirstAttempt();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@
* <p>This class is thread-safe.
*/
class CallbackChainRetryingFuture<ResponseT> extends BasicRetryingFuture<ResponseT> {
private final RetryingExecutor<ResponseT> retryingExecutor;
private final ScheduledRetryingExecutor<ResponseT> retryingExecutor;

This comment was marked as spam.

This comment was marked as spam.

private volatile AttemptCompletionListener attemptFutureCompletionListener;

CallbackChainRetryingFuture(
Callable<ResponseT> callable,
RetryAlgorithm<ResponseT> retryAlgorithm,
RetryingExecutor<ResponseT> retryingExecutor) {
super(callable, retryAlgorithm);
ScheduledRetryingExecutor<ResponseT> retryingExecutor,
RetryingContext context) {
super(callable, retryAlgorithm, context);
this.retryingExecutor = checkNotNull(retryingExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import java.io.InterruptedIOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.Callable;
Expand All @@ -46,7 +47,7 @@
*
* @param <ResponseT> response type
*/
public class DirectRetryingExecutor<ResponseT> implements RetryingExecutor<ResponseT> {
public class DirectRetryingExecutor<ResponseT> implements RetryingExecutorWithContext<ResponseT> {

private final RetryAlgorithm<ResponseT> retryAlgorithm;

Expand All @@ -70,15 +71,30 @@ public DirectRetryingExecutor(RetryAlgorithm<ResponseT> retryAlgorithm) {
*/
@Override
public RetryingFuture<ResponseT> createFuture(Callable<ResponseT> callable) {
return new BasicRetryingFuture<>(callable, retryAlgorithm);
return createFuture(callable, NoopRetryingContext.create());
}

/**
* Creates a {@link RetryingFuture}, which is a facade, returned to the client code to wait for
* any retriable operation to complete. The future is bounded to {@code this} executor instance.
*
* @param callable the actual callable, which should be executed in a retriable context
* @return retrying future facade
*/
@BetaApi("The surface for passing per operation state is not yet stable")
@Override
public RetryingFuture<ResponseT> createFuture(
Callable<ResponseT> callable, RetryingContext context) {
return new BasicRetryingFuture<>(callable, retryAlgorithm, context);
}

/**
* Submits an attempt for execution in the current thread, causing the current thread to sleep for
* the specified by the {@link RetryingFuture#getAttemptSettings()} amount of time. As result,
* this method completes execution only after the specified {@code retryingFuture} completes.
*
* @param retryingFuture the future previously returned by {@link #createFuture(Callable)}
* @param retryingFuture the future previously returned by {@link #createFuture(Callable,
* RetryingContext)}
* @return returns completed {@code retryingFuture}
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2018 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.retrying;

// TODO(igorbernstein2): Remove this class once RetryingExecutor#createFuture(Callable) is
// deprecated and removed.
/**
* Backwards compatibility class to aid in transition to adding operation state to {@link
* RetryingFuture} implementations.
*/
class NoopRetryingContext implements RetryingContext {
public static RetryingContext create() {
return new NoopRetryingContext();
}
}
40 changes: 40 additions & 0 deletions gax/src/main/java/com/google/api/gax/retrying/RetryingContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2018 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.retrying;

import com.google.api.core.BetaApi;

/**
* Context for a retryable operation.
*
* <p>It provides state to individual {@link RetryingFuture}s via the {@link RetryingExecutor}.
*/
@BetaApi("The surface for passing per operation state is not yet stable")
public interface RetryingContext {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2018 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.retrying;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;

/**
* A {@link RetryingExecutor} that accepts a per-operation context.
*
* @see RetryingExecutor
*/
// TODO(igorbernstein2): Consider replacing this with a default implementation in RetryingExecutor
// once support for java 7 is dropped
@BetaApi("The surface for per invocation state is unstable and will probably change in the future")
@InternalExtensionOnly
public interface RetryingExecutorWithContext<ResponseT> extends RetryingExecutor<ResponseT> {
RetryingFuture<ResponseT> createFuture(
@Nonnull Callable<ResponseT> callable, @Nonnull RetryingContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.ListenableFutureToApiFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
Expand All @@ -53,7 +54,8 @@
*
* @param <ResponseT> response type
*/
public class ScheduledRetryingExecutor<ResponseT> implements RetryingExecutor<ResponseT> {
public class ScheduledRetryingExecutor<ResponseT>
implements RetryingExecutorWithContext<ResponseT> {

private final RetryAlgorithm<ResponseT> retryAlgorithm;
private final ListeningScheduledExecutorService scheduler;
Expand Down Expand Up @@ -81,13 +83,30 @@ public ScheduledRetryingExecutor(
*/
@Override
public RetryingFuture<ResponseT> createFuture(Callable<ResponseT> callable) {
return new CallbackChainRetryingFuture<>(callable, retryAlgorithm, this);
return createFuture(callable, NoopRetryingContext.create());
}

/**
* Creates a {@link RetryingFuture}, which is a facade, returned to the client code to wait for
* any retriable operation to complete. The returned future is bounded to {@code this} executor
* instance.
*
* @param callable the actual callable, which should be executed in a retriable context
* @param context the context for this operation
* @return retrying future facade
*/
@BetaApi("The surface for passing per operation state is not yet stable")
@Override
public RetryingFuture<ResponseT> createFuture(
Callable<ResponseT> callable, RetryingContext context) {
return new CallbackChainRetryingFuture<>(callable, retryAlgorithm, this, context);
}

/**
* Submits an attempt for execution in a different thread.
*
* @param retryingFuture the future previously returned by {@link #createFuture(Callable)}
* @param retryingFuture the future previously returned by {@link #createFuture(Callable,
* RetryingContext)}
* @return submitted attempt future
*/
@Override
Expand Down
3 changes: 2 additions & 1 deletion gax/src/main/java/com/google/api/gax/rpc/ApiCallContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.retrying.RetryingContext;
import com.google.auth.Credentials;
import java.util.List;
import java.util.Map;
Expand All @@ -48,7 +49,7 @@
* <p>This is transport specific and each transport has an implementation with its own options.
*/
@InternalExtensionOnly
public interface ApiCallContext {
public interface ApiCallContext extends RetryingContext {

/** Returns a new ApiCallContext with the given credentials set. */
ApiCallContext withCredentials(Credentials credentials);
Expand Down
3 changes: 1 addition & 2 deletions gax/src/main/java/com/google/api/gax/rpc/Callables.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.StreamingRetryAlgorithm;

Expand Down Expand Up @@ -64,7 +63,7 @@ public static <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> retrying(
new ApiResultRetryAlgorithm<ResponseT>(),
new ExponentialRetryAlgorithm(
callSettings.getRetrySettings(), clientContext.getClock()));
RetryingExecutor<ResponseT> retryingExecutor =
ScheduledRetryingExecutor<ResponseT> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
return new RetryingCallable<>(
clientContext.getDefaultCallContext(), innerCallable, retryingExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationFutureImpl;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.RetryingFuture;

/**
Expand All @@ -50,14 +50,14 @@ class OperationCallableImpl<RequestT, ResponseT, MetadataT>
extends OperationCallable<RequestT, ResponseT, MetadataT> {

private final UnaryCallable<RequestT, OperationSnapshot> initialCallable;
private final RetryingExecutor<OperationSnapshot> executor;
private final RetryingExecutorWithContext<OperationSnapshot> executor;
private final LongRunningClient longRunningClient;
private final ApiFunction<OperationSnapshot, ResponseT> responseTransformer;
private final ApiFunction<OperationSnapshot, MetadataT> metadataTransformer;

OperationCallableImpl(
UnaryCallable<RequestT, OperationSnapshot> initialCallable,
RetryingExecutor<OperationSnapshot> executor,
RetryingExecutorWithContext<OperationSnapshot> executor,
LongRunningClient longRunningClient,
OperationCallSettings<RequestT, ResponseT, MetadataT> operationCallSettings) {
this.initialCallable = checkNotNull(initialCallable);
Expand Down
10 changes: 6 additions & 4 deletions gax/src/main/java/com/google/api/gax/rpc/RecheckingCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
package com.google.api.gax.rpc;

import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.common.base.Preconditions;

Expand All @@ -43,10 +43,11 @@
*/
class RecheckingCallable<RequestT, ResponseT> extends UnaryCallable<RequestT, ResponseT> {
private final UnaryCallable<RequestT, ResponseT> callable;
private final RetryingExecutor<ResponseT> executor;
private final RetryingExecutorWithContext<ResponseT> executor;

RecheckingCallable(
UnaryCallable<RequestT, ResponseT> callable, RetryingExecutor<ResponseT> executor) {
UnaryCallable<RequestT, ResponseT> callable,
RetryingExecutorWithContext<ResponseT> executor) {
this.callable = Preconditions.checkNotNull(callable);
this.executor = Preconditions.checkNotNull(executor);
}
Expand All @@ -56,7 +57,8 @@ public RetryingFuture<ResponseT> futureCall(RequestT ignored, ApiCallContext inp
CheckingAttemptCallable<RequestT, ResponseT> checkingAttemptCallable =
new CheckingAttemptCallable<>(callable, inputContext);

RetryingFuture<ResponseT> retryingFuture = executor.createFuture(checkingAttemptCallable);
RetryingFuture<ResponseT> retryingFuture =
executor.createFuture(checkingAttemptCallable, inputContext);
checkingAttemptCallable.setExternalFuture(retryingFuture);
checkingAttemptCallable.call();

Expand Down
Loading