Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import lombok.*;
Expand Down Expand Up @@ -280,24 +278,19 @@ public void measure(@NonNull Metric metric, double value) {
* @param level telemetry level.
* @param operationSupplier operation to record this execution as.
* @param operationCode the future to measure the execution of.
* @param operationTimeout Timeout duration (in milliseconds) for operation
* @return an instance of {@link T} that returns the same result as the one passed in.
* @throws IOException if the underlying operation threw an IOException
*/
@Override
public <T> T measureJoin(
@NonNull TelemetryLevel level,
@NonNull OperationSupplier operationSupplier,
@NonNull CompletableFuture<T> operationCode,
long operationTimeout)
@NonNull CompletableFuture<T> operationCode)
throws IOException {
if (operationCode.isDone()) {
return handleCompletableFutureJoin(operationCode, operationTimeout);
return handleCompletableFutureJoin(operationCode);
} else {
return this.measure(
level,
operationSupplier,
() -> handleCompletableFutureJoin(operationCode, operationTimeout));
level, operationSupplier, () -> handleCompletableFutureJoin(operationCode));
}
}

Expand All @@ -306,15 +299,13 @@ public <T> T measureJoin(
*
* @param <T> - return type of the CompletableFuture
* @param future the CompletableFuture to join
* @param operationTimeout Timeout duration (in milliseconds) for operation
* @return the result of the CompletableFuture
* @throws IOException if the underlying future threw an IOException
*/
private <T> T handleCompletableFutureJoin(CompletableFuture<T> future, long operationTimeout)
throws IOException {
private <T> T handleCompletableFutureJoin(CompletableFuture<T> future) throws IOException {
try {
return future.get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
return future.get();
} catch (ExecutionException | InterruptedException e) {
Throwable cause = e.getCause();
if (cause instanceof UncheckedIOException) {
throw ((UncheckedIOException) cause).getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,12 @@ <T> CompletableFuture<T> measure(
* @param level telemetry level.
* @param operationSupplier operation to record this execution as.
* @param operationCode the future to measure the execution of.
* @param operationTimeout Timeout duration (in milliseconds) for operation
* @return an instance of {@link T} that returns the same result as the one passed in.
* @throws IOException if the underlying operation threw an IOException
*/
default <T> T measureJoin(
@NonNull TelemetryLevel level,
@NonNull OperationSupplier operationSupplier,
@NonNull CompletableFuture<T> operationCode,
long operationTimeout)
@NonNull CompletableFuture<T> operationCode)
throws IOException {
if (operationCode.isDone()) {
return operationCode.join();
Expand Down Expand Up @@ -151,16 +148,12 @@ default <T> CompletableFuture<T> measureCritical(
* @param <T> - return type of the {@link CompletableFuture}.
* @param operationSupplier operation to record this execution as.
* @param operationCode the future to measure the execution of.
* @param operationTimeout Timeout duration (in milliseconds) for operation
* @return an instance of {@link T} that returns the same result as the one passed in.
* @throws IOException if the underlying operation threw an IOException
*/
default <T> T measureJoinCritical(
OperationSupplier operationSupplier,
CompletableFuture<T> operationCode,
long operationTimeout)
throws IOException {
return measureJoin(TelemetryLevel.CRITICAL, operationSupplier, operationCode, operationTimeout);
OperationSupplier operationSupplier, CompletableFuture<T> operationCode) throws IOException {
return measureJoin(TelemetryLevel.CRITICAL, operationSupplier, operationCode);
}

/**
Expand Down Expand Up @@ -216,16 +209,11 @@ default <T> CompletableFuture<T> measureStandard(
* @param <T> - return type of the {@link CompletableFuture}.
* @param operationSupplier operation to record this execution as.
* @param operationCode the future to measure the execution of.
* @param operationTimeout Timeout duration (in milliseconds) for operation
* @return an instance of {@link T} that returns the same result as the one passed in.
* @throws IOException if the underlying operation threw an IOException
*/
default <T> T measureJoinStandard(
OperationSupplier operationSupplier,
CompletableFuture<T> operationCode,
long operationTimeout)
throws IOException {
return measureJoin(TelemetryLevel.STANDARD, operationSupplier, operationCode, operationTimeout);
OperationSupplier operationSupplier, CompletableFuture<T> operationCode) throws IOException {
return measureJoin(TelemetryLevel.STANDARD, operationSupplier, operationCode);
}

/**
Expand Down Expand Up @@ -281,16 +269,11 @@ default <T> CompletableFuture<T> measureVerbose(
* @param <T> - return type of the {@link CompletableFuture}.
* @param operationSupplier operation to record this execution as.
* @param operationCode the future to measure the execution of.
* @param operationTimeout Timeout duration (in milliseconds) for operation
* @return an instance of {@link T} that returns the same result as the one passed in.
* @throws IOException if the underlying operation threw an IOException
*/
default <T> T measureJoinVerbose(
OperationSupplier operationSupplier,
CompletableFuture<T> operationCode,
long operationTimeout)
throws IOException {
return measureJoin(TelemetryLevel.VERBOSE, operationSupplier, operationCode, operationTimeout);
OperationSupplier operationSupplier, CompletableFuture<T> operationCode) throws IOException {
return measureJoin(TelemetryLevel.VERBOSE, operationSupplier, operationCode);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,17 @@ public class OpenStreamInformation {
private final InputPolicy inputPolicy;
@Builder.Default private RequestCallback requestCallback = new DefaultRequestCallbackImpl();
private final EncryptionSecrets encryptionSecrets;

@Builder.Default private final RetryStrategy retryStrategy = new DefaultRetryStrategyImpl();

/** Default set of settings for {@link OpenStreamInformation} */
public static final OpenStreamInformation DEFAULT = OpenStreamInformation.builder().build();

/**
* Default set of settings for {@link OpenStreamInformation}
*
* @return new OpenStreamInformation instance
*/
public static OpenStreamInformation ofDefaults() {
return OpenStreamInformation.builder().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,34 @@
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.FailsafeExecutor;
import java.io.IOException;
import dev.failsafe.Timeout;
import dev.failsafe.TimeoutExceededException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.SneakyThrows;
import software.amazon.s3.analyticsaccelerator.common.Preconditions;

/**
* Retry strategy implementation for seekable input stream operations. Uses Failsafe library to
* execute operations with configurable retry policies.
*
* <p>This strategy will be additive to readTimeout and readRetryCount set on PhysicalIO
* configuration.
* <p>If provided with a timeout this strategy will overwrite readTimeout and readRetryCount set on
* PhysicalIOConfiguration. If not, values from PhysicalIOConfiguration will be used to manage
* storage read timeouts.
*/
public class DefaultRetryStrategyImpl implements RetryStrategy {
private final List<RetryPolicy> retryPolicies;
FailsafeExecutor<Object> failsafeExecutor;
private Timeout<Object> timeoutPolicy;
@Getter private boolean timeoutSet;

/** Creates a retry strategy with no retry policies (no retries). */
public DefaultRetryStrategyImpl() {
this.retryPolicies = new ArrayList<>();
this.failsafeExecutor = Failsafe.none();
}

/**
Expand All @@ -57,7 +62,6 @@ public DefaultRetryStrategyImpl(RetryPolicy outerPolicy, RetryPolicy... policies
if (policies != null && policies.length > 0) {
this.retryPolicies.addAll(Arrays.asList(policies));
}
this.failsafeExecutor = Failsafe.with(getDelegates());
}

/**
Expand All @@ -69,19 +73,18 @@ public DefaultRetryStrategyImpl(List<RetryPolicy> policies) {
Preconditions.checkNotNull(policies);
this.retryPolicies = new ArrayList<>();
this.retryPolicies.addAll(policies);
this.failsafeExecutor = Failsafe.with(getDelegates());
}

/**
* Executes a runnable operation with retry logic.
*
* @param runnable the operation to execute
* @throws IOException if the operation fails after all retries
*/
@Override
public void execute(IORunnable runnable) throws IOException {
@SneakyThrows
public void execute(IORunnable runnable) {
try {
this.failsafeExecutor.run(runnable::apply);
executor().run(runnable::apply);
} catch (Exception ex) {
throw handleExceptionAfterRetry(ex);
}
Expand All @@ -93,12 +96,12 @@ public void execute(IORunnable runnable) throws IOException {
* @param <T> return type of the supplier
* @param supplier the operation that returns a byte array
* @return the result of the supplier operation
* @throws IOException if the operation fails after all retries
*/
@Override
public <T> T get(IOSupplier<T> supplier) throws IOException {
@SneakyThrows
public <T> T get(IOSupplier<T> supplier) {
try {
return this.failsafeExecutor.get(supplier::apply);
return executor().get(supplier::apply);
} catch (Exception ex) {
throw handleExceptionAfterRetry(ex);
}
Expand All @@ -107,16 +110,14 @@ public <T> T get(IOSupplier<T> supplier) throws IOException {
@Override
public RetryStrategy amend(RetryPolicy policy) {
Preconditions.checkNotNull(policy);
this.failsafeExecutor = this.failsafeExecutor.compose(policy.getDelegate());
this.retryPolicies.add(policy);
return this;
}

@Override
public RetryStrategy merge(RetryStrategy strategy) {
Preconditions.checkNotNull(strategy);
for (RetryPolicy policy : strategy.getRetryPolicies()) {
this.failsafeExecutor = this.failsafeExecutor.compose(policy.getDelegate());
}
this.retryPolicies.addAll(strategy.getRetryPolicies());
return this;
}

Expand All @@ -135,24 +136,51 @@ private List<dev.failsafe.Policy<Object>> getDelegates() {
}

/**
* Handles exceptions after retry attempts are exhausted.
* Handles exceptions after retry attempts are exhausted. This is needed to unwrap Failsafe
* exception
*
* @param e the exception that occurred
* @return an IOException to throw
*/
private IOException handleExceptionAfterRetry(Exception e) {
IOException toThrow = new IOException("Failed to execute operation with retries", e);

private Exception handleExceptionAfterRetry(Exception e) {
if (e instanceof FailsafeException) {
Optional<Throwable> cause = Optional.ofNullable(e.getCause());
if (cause.isPresent()) {
if (cause.get() instanceof IOException) {
return (IOException) cause.get();
} else {
toThrow = new IOException("Failed to execute operation with retries", cause.get());
}
return (Exception) cause.get();
}
}
return toThrow;
return e;
}

private FailsafeExecutor<Object> executor() {
FailsafeExecutor<Object> executor;
if (retryPolicies.isEmpty()) {
executor = Failsafe.none();
} else {
executor = Failsafe.with(getDelegates());
}
if (this.timeoutSet) executor = executor.compose(timeoutPolicy);
return executor;
}

/**
* Create a timeout for read from storage operations and with specified retry count. This will
* override settings in PhysicalIOConfiguration (blockreadtimeout and blockreadretrycount) if set.
* If user does not set a timeout in their retry strategy, a timeout will be set based on
* aforementioned configuration. set blockreadtimeout = 0 to disable timeouts
*
* @param timeoutDurationMillis Timeout duration for reading from storage
* @param retryCount Number of times to retry if Timeout Exceeds
*/
public void timeout(long timeoutDurationMillis, int retryCount) {
this.timeoutPolicy =
Timeout.builder(Duration.ofMillis(timeoutDurationMillis)).withInterrupt().build();
RetryPolicy timeoutRetries =
RetryPolicy.builder()
.handle(TimeoutExceededException.class)
.withMaxRetries(retryCount)
.build();
this.retryPolicies.add(timeoutRetries);
this.timeoutSet = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
*/
package software.amazon.s3.analyticsaccelerator.util.retry;

import java.io.IOException;

/** A functional interface that mimics {@link Runnable}, but allows IOException to be thrown. */
public interface IORunnable {
/**
* Functional representation of the code that takes no parameters and returns no value. The code
* is allowed to throw any exception.
*
* @throws IOException on error condition.
* @throws Exception on error condition.
*/
void apply() throws IOException;
void apply() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package software.amazon.s3.analyticsaccelerator.util.retry;

import java.io.IOException;

/**
* A function that mimics {@link java.util.function.Supplier}, but allows IOException to be thrown
* and returns T.
Expand All @@ -29,7 +27,7 @@ public interface IOSupplier<T> {
* {@link T}. The code is allowed to throw any exception.
*
* @return a value of type {@link T}.
* @throws IOException on error condition.
* @throws Exception on error condition.
*/
T apply() throws IOException;
T apply() throws Exception;
}
Loading
Loading