Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1698,17 +1698,20 @@

<property>
<name>fs.s3a.retry.throttle.limit</name>
<value>${fs.s3a.attempts.maximum}</value>
<value>20</value>
<description>
Number of times to retry any throttled request.
</description>
</property>

<property>
<name>fs.s3a.retry.throttle.interval</name>
<value>1000ms</value>
<value>100ms</value>
<description>
Interval between retry attempts on throttled requests.
Initial between retry attempts on throttled requests, +/- 50%. chosen at random.
i.e. for an intial value of 3000ms, the initial delay would be in the range 1500ms to 4500ms.
Backoffs are exponential; again randomness is used to avoid the thundering heard problem.
500ms is the default value used by the AWS S3 Retry policy.
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,7 @@ public void testRenameWithNonEmptySubDir() throws Throwable {
assertPathExists("not created in src/sub dir",
new Path(srcSubDir, "subfile.txt"));

boolean rename = fs.rename(srcDir, finalDir);
assertTrue("rename(" + srcDir + ", " + finalDir + ") failed",
rename);
rename(srcDir, finalDir);

// Accept both POSIX rename behavior and CLI rename behavior
if (renameRemoveEmptyDest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,9 @@ public static boolean rm(FileSystem fileSystem,
public static void rename(FileSystem fileSystem, Path src, Path dst)
throws IOException {
rejectRootOperation(src, false);
assertTrue(fileSystem.rename(src, dst));
assertPathDoesNotExist(fileSystem, "renamed", src);
assertTrue("rename(" + src + ", " + dst + ") failed",
fileSystem.rename(src, dst));
assertPathDoesNotExist(fileSystem, "renamed source dir", src);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,34 @@ private Constants() {
public static final String PROXY_DOMAIN = "fs.s3a.proxy.domain";
public static final String PROXY_WORKSTATION = "fs.s3a.proxy.workstation";

// number of times we should retry errors
/**
* Number of times the AWS client library should retry errors before
* escalating to the S3A code: {@value}.
*/
public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";

/**
* Default number of times the AWS client library should retry errors before
* escalating to the S3A code: {@value}.
*/
public static final int DEFAULT_MAX_ERROR_RETRIES = 10;

/**
* Experimental/Unstable feature: should the AWS client library retry
* throttle responses before escalating to the S3A code: {@value}.
*
* When set to false, the S3A connector sees all S3 throttle events,
* And so can update it counters and the metrics, and use its own retry
* policy.
* However, this may have adverse effects on some operations where the S3A
* code cannot retry as efficiently as the AWS client library.
*
* This only applies to S3 operations, not to DynamoDB or other services.
*/
@InterfaceStability.Unstable
public static final String EXPERIMENTAL_AWS_INTERNAL_THROTTLING =
"fs.s3a.experimental.aws.internal.throttling";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the default value for this defined?

Copy link
Contributor Author

@steveloughran steveloughran Feb 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its true, but yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the value, also changing the name to ""fs.s3a.experimental.aws.s3.throttling" to make clear its s3 only


// seconds until we give up trying to establish a connection to s3
public static final String ESTABLISH_TIMEOUT =
"fs.s3a.connection.establish.timeout";
Expand Down Expand Up @@ -225,6 +249,33 @@ private Constants() {
public static final String ENABLE_MULTI_DELETE =
"fs.s3a.multiobjectdelete.enable";

/**
* Number of objects to delete in a single multi-object delete {@value}.
* Max: 1000.
*
* A bigger value it means fewer POST requests when deleting a directory
* tree with many objects.
* However, as you are limited to only a a few thousand requests per
* second against a single partition of an S3 bucket,
* a large page size can easily overload the bucket and so trigger
* throttling.
*
* Furthermore, as the reaction to this request is being throttled
* is simply to retry it -it can take a while for the situation to go away.
* While a large value may give better numbers on tests and benchmarks
* where only a single operations being executed, once multiple
* applications start working with the same bucket these large
* deletes can be highly disruptive.
*/
public static final String BULK_DELETE_PAGE_SIZE =
"fs.s3a.bulk.delete.page.size";

/**
* Default Number of objects to delete in a single multi-object
* delete: {@value}.
*/
public static final int BULK_DELETE_PAGE_SIZE_DEFAULT = 250;

// comma separated list of directories
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";

Expand Down Expand Up @@ -733,8 +784,7 @@ private Constants() {
/**
* Default throttled retry limit: {@value}.
*/
public static final int RETRY_THROTTLE_LIMIT_DEFAULT =
DEFAULT_MAX_ERROR_RETRIES;
public static final int RETRY_THROTTLE_LIMIT_DEFAULT = 20;

/**
* Interval between retry attempts on throttled requests: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;

import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;

Expand All @@ -56,7 +57,13 @@ public AmazonS3 createS3Client(URI name,
final String userAgentSuffix) throws IOException {
Configuration conf = getConf();
final ClientConfiguration awsConf = S3AUtils
.createAwsConf(getConf(), bucket, Constants.AWS_SERVICE_IDENTIFIER_S3);
.createAwsConf(conf, bucket, Constants.AWS_SERVICE_IDENTIFIER_S3);

// throttling is explicitly disabled on the S3 client so that
// all failures are collected
awsConf.setUseThrottleRetries(
conf.getBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING, true));

if (!StringUtils.isEmpty(userAgentSuffix)) {
awsConf.setUserAgentSuffix(userAgentSuffix);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
import org.apache.hadoop.fs.s3a.impl.StandardInvokeRetryHandler;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
Expand Down Expand Up @@ -170,6 +171,9 @@
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
Expand Down Expand Up @@ -273,6 +277,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,

private ITtlTimeProvider ttlTimeProvider;

/**
* Page size for deletions.
*/
private int pageSize;

/**
* Specific operations used by rename and delete operations.
*/
Expand Down Expand Up @@ -440,6 +449,9 @@ public void initialize(URI name, Configuration originalConf)
}

initMultipartUploads(conf);

pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
stopAllServices();
Expand Down Expand Up @@ -1388,7 +1400,8 @@ private long innerRename(Path source, Path dest)
createStoreContext(),
src, srcKey, p.getLeft(),
dst, dstKey, p.getRight(),
operationCallbacks);
operationCallbacks,
pageSize);
return renameOperation.execute();
}

Expand Down Expand Up @@ -1483,8 +1496,21 @@ public void finishRename(final Path sourceRenamed, final Path destCreated)
Path destParent = destCreated.getParent();
if (!sourceRenamed.getParent().equals(destParent)) {
LOG.debug("source & dest parents are different; fix up dir markers");
deleteUnnecessaryFakeDirectories(destParent);
maybeCreateFakeParentDirectory(sourceRenamed);
// kick off an async delete
List<CompletableFuture<Void>> ops = new ArrayList<>(2);
ops.add(submit(
boundedThreadPool,
() -> {
deleteUnnecessaryFakeDirectories(destParent);
return null;
}));
ops.add(submit(
boundedThreadPool,
() -> {
maybeCreateFakeParentDirectory(sourceRenamed);
return null;
}));
waitForCompletion(ops);
}
}

Expand Down Expand Up @@ -1648,10 +1674,11 @@ protected void incrementGauge(Statistic statistic, long count) {
* @param ex exception.
*/
public void operationRetried(Exception ex) {
Statistic stat = isThrottleException(ex)
? STORE_IO_THROTTLED
: IGNORED_ERRORS;
incrementStatistic(stat);
if (isThrottleException(ex)) {
operationThrottled(false);
} else {
incrementStatistic(IGNORED_ERRORS);
}
}

/**
Expand Down Expand Up @@ -1684,11 +1711,28 @@ public void operationRetried(
public void metastoreOperationRetried(Exception ex,
int retries,
boolean idempotent) {
operationRetried(ex);
incrementStatistic(S3GUARD_METADATASTORE_RETRY);
if (isThrottleException(ex)) {
operationThrottled(true);
} else {
incrementStatistic(IGNORED_ERRORS);
}
}

/**
* Note that an operation was throttled -this will update
* specific counters/metrics.
* @param metastore was the throttling observed in the S3Guard metastore?
*/
private void operationThrottled(boolean metastore) {
LOG.debug("Request throttled on {}", metastore ? "S3": "DynamoDB");
if (metastore) {
incrementStatistic(S3GUARD_METADATASTORE_THROTTLED);
instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, 1);
instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE,
1);
} else {
incrementStatistic(STORE_IO_THROTTLED);
instrumentation.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
}
}

Expand Down Expand Up @@ -1927,11 +1971,17 @@ private void blockRootDelete(String key) throws InvalidRequestException {
private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
throws MultiObjectDeleteException, AmazonClientException, IOException {
incrementWriteOperations();
StandardInvokeRetryHandler retryHandler =
new StandardInvokeRetryHandler(createStoreContext());
try(DurationInfo ignored =
new DurationInfo(LOG, false, "DELETE %d keys",
deleteRequest.getKeys().size())) {
return invoker.retryUntranslated("delete",
DELETE_CONSIDERED_IDEMPOTENT,
(text, e, r, i) -> {
// error triggering retry
retryHandler.bulkDeleteRetried(deleteRequest, e);
},
() -> {
incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
return s3.deleteObjects(deleteRequest);
Expand Down Expand Up @@ -2254,7 +2304,7 @@ private void noteDeleted(final int count, final boolean deleteFakeDir) {
*/
@VisibleForTesting
@Retries.RetryMixed
void removeKeys(
public void removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final boolean deleteFakeDir,
final BulkOperationState operationState)
Expand Down Expand Up @@ -2349,7 +2399,7 @@ public boolean delete(Path f, boolean recursive) throws IOException {
innerGetFileStatus(f, true, StatusProbeEnum.ALL),
recursive,
operationCallbacks,
InternalConstants.MAX_ENTRIES_TO_DELETE);
pageSize);
boolean outcome = deleteOperation.execute();
if (outcome) {
try {
Expand Down Expand Up @@ -2830,7 +2880,7 @@ S3AFileStatus innerGetFileStatus(final Path f,
S3AFileStatus s3GetFileStatus(final Path path,
final String key,
final Set<StatusProbeEnum> probes,
final Set<Path> tombstones) throws IOException {
@Nullable Set<Path> tombstones) throws IOException {
if (!key.isEmpty()) {
if (probes.contains(StatusProbeEnum.Head) && !key.endsWith("/")) {
try {
Expand Down Expand Up @@ -3515,7 +3565,14 @@ void finishedWrite(String key, long length, String eTag, String versionId,
key, length, eTag, versionId);
Path p = keyToQualifiedPath(key);
Preconditions.checkArgument(length >= 0, "content length is negative");
deleteUnnecessaryFakeDirectories(p.getParent());
final boolean isDir = objectRepresentsDirectory(key, length);
// kick off an async delete
final CompletableFuture<?> deletion = submit(
boundedThreadPool,
() -> {
deleteUnnecessaryFakeDirectories(p.getParent());
return null;
});
// this is only set if there is a metastore to update and the
// operationState parameter passed in was null.
BulkOperationState stateToClose = null;
Expand All @@ -3534,7 +3591,6 @@ void finishedWrite(String key, long length, String eTag, String versionId,
activeState = stateToClose;
}
S3Guard.addAncestors(metadataStore, p, ttlTimeProvider, activeState);
final boolean isDir = objectRepresentsDirectory(key, length);
S3AFileStatus status = createUploadFileStatus(p,
isDir, length,
getDefaultBlockSize(p), username, eTag, versionId);
Expand All @@ -3557,6 +3613,8 @@ void finishedWrite(String key, long length, String eTag, String versionId,
activeState);
}
}
// and catch up with any delete operation.
waitForCompletionIgnoringExceptions(deletion);
} catch (IOException e) {
if (failOnMetadataWriteError) {
throw new MetadataPersistenceException(p.toString(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
private final MutableCounterLong ignoredErrors;
private final MutableQuantiles putLatencyQuantile;
private final MutableQuantiles throttleRateQuantile;
private final MutableQuantiles s3GuardThrottleRateQuantile;
private final MutableCounterLong numberOfFilesCreated;
private final MutableCounterLong numberOfFilesCopied;
private final MutableCounterLong bytesOfFilesCopied;
Expand Down Expand Up @@ -248,7 +249,9 @@ public S3AInstrumentation(URI name) {
int interval = 1;
putLatencyQuantile = quantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY,
"ops", "latency", interval);
throttleRateQuantile = quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE,
s3GuardThrottleRateQuantile = quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE,
"events", "frequency (Hz)", interval);
throttleRateQuantile = quantiles(STORE_IO_THROTTLE_RATE,
"events", "frequency (Hz)", interval);

registerAsMetricsSource(name);
Expand Down Expand Up @@ -617,6 +620,7 @@ public void close() {
// task in a shared thread pool.
putLatencyQuantile.stop();
throttleRateQuantile.stop();
s3GuardThrottleRateQuantile.stop();
metricsSystem.unregisterSource(metricsSourceName);
int activeSources = --metricsSourceActiveCounter;
if (activeSources == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ public enum Statistic {
"S3Guard metadata store authoritative directories updated from S3"),

STORE_IO_THROTTLED("store_io_throttled", "Requests throttled and retried"),
STORE_IO_THROTTLE_RATE("store_io_throttle_rate",
"Rate of S3 request throttling"),

DELEGATION_TOKENS_ISSUED("delegation_tokens_issued",
"Number of delegation tokens issued");
Expand Down
Loading