From 0787a7c09fa64697bd644b569cd0c892dcd6072d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 24 Jan 2024 16:29:37 +0000 Subject: [PATCH 1/5] HADOOP-18679. Add API for bulk/paged object deletion A more minimal design that is easier to use and implement. Caller creates a BulkOperation; they get the page size of it and then submit batches to delete of less than that size. The outcome of each call contains a list of failures. S3A implementation to show how straightforward it is. Even with the single entry page size, it is still more efficient to use this as it doesn't try to recreate a parent dir or perform any probes to see if it is a directory: it maps straight to a DELETE call. Change-Id: Ibe8737e7933fe03d39070e7138a710b50c3d60c2 --- .../java/org/apache/hadoop/fs/BulkDelete.java | 143 ++++++++++++++++++ .../apache/hadoop/fs/BulkDeleteSource.java | 55 +++++++ .../hadoop/fs/CommonPathCapabilities.java | 6 + .../fs/statistics/StoreStatisticNames.java | 3 + .../apache/hadoop/fs/s3a/S3AFileSystem.java | 65 +++++++- .../org/apache/hadoop/fs/s3a/Statistic.java | 4 + .../fs/s3a/impl/BulkDeleteOperation.java | 117 ++++++++++++++ .../s3a/impl/MultiObjectDeleteException.java | 20 ++- 8 files changed, 406 insertions(+), 7 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java new file mode 100644 index 0000000000000..4f55b25d9c7ee --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; + +/** + * API for bulk deletion of objects/files, + * but not directories. + * After use, call {@code close()} to release any resources and + * to guarantee store IOStatistics are updated. + *

+ * Callers MUST have no expectation that parent directories will exist after the + * operation completes; if an object store needs to explicitly look for and create + * directory markers, that step will be omitted. + *

+ *

+ * Be aware that on some stores (AWS S3) each object listed in a bulk delete counts + * against the write IOPS limit; large page sizes are counterproductive here. + * @see HADOOP-16823. + * Large DeleteObject requests are their own Thundering Herd + *

+ */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDelete extends IOStatisticsSource, Closeable { + + /** + * The maximum number of objects/files to delete in a single request. + * @return a number greater than or equal to zero. + */ + int pageSize(); + + /** + * Base path of a bulk delete operation. + * All paths submitted in {@link #bulkDelete(List)} must be under this path. + */ + Path basePath(); + + /** + * Delete a list of files/objects. + *

+ * @param paths list of paths which must be absolute and under the base path. + * provided in {@link #basePath()}. + * @throws IOException IO problems including networking, authentication and more. + * @throws IllegalArgumentException if a path argument is invalid. + */ + BulkDeleteOutcome bulkDelete(List paths) + throws IOException, IllegalArgumentException; + + /** + * The outcome: a list of paths which failed to delete. + * An empty list means all files were successfully deleted. + * There are no guarantees about the ordering of the list. + * Reasons for failures are not provided. + * File Not Found is not a failure. + */ + class BulkDeleteOutcome { + + /** + * List of paths which failed to delete. + */ + private final List failures; + + /** + * Constructor. + * @param failures list of failures. This must be non-null. + */ + public BulkDeleteOutcome(final List failures) { + this.failures = failures; + } + + /** + * Get the list of failures. + * @return a possibly empty list of failures. + */ + public List getFailures() { + return failures; + } + } + + class BulkDeleteOutcomeElement { + private final Path path; + private final String error; + private final Exception exception; + + public BulkDeleteOutcomeElement( + final Path path, + final String error, + final Exception exception) { + this.path = path; + this.error = error; + this.exception = exception; + } + + public Path getPath() { + return path; + } + + public String getError() { + return error; + } + + public Exception getException() { + return exception; + } + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java new file mode 100644 index 0000000000000..9d7dc66b41842 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface for bulk deletion. + * Filesystems which support bulk deletion should implement this interface + * and MUST also declare their support in the path capability + * {@link CommonPathCapabilities#BULK_DELETE}. + * Exporting the interface does not guarantee that the operation is supported; + * returning a {@link BulkDelete} object from the call {@link #createBulkDelete(Path)} + * is. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDeleteSource { + + /** + * Create a bulk delete operation. + * There is no network IO at this point, simply the creation of + * a bulk delete object. + * A path must be supplied to ensure that on viewfs and similar filesystems, + * @param path path to delete under. + * @return the bulk delete. + * @throws UnsupportedOperationException the filesystem does not support delete under that path. + * @throws IllegalArgumentException path not valid. + * @throws IOException problems resolving paths + */ + default BulkDelete createBulkDelete(Path path) + throws UnsupportedOperationException, IllegalArgumentException, IOException { + throw new UnsupportedOperationException("Bulk delete not supported"); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java index 9ec07cbe966e9..2005f0ae3be31 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java @@ -181,4 +181,10 @@ private CommonPathCapabilities() { */ public static final String DIRECTORY_LISTING_INCONSISTENT = "fs.capability.directory.listing.inconsistent"; + + /** + * Capability string to probe for bulk delete: {@value}. + */ + public static final String BULK_DELETE = "fs.capability.bulk.delete"; + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java index 19ee9d1414ecf..d7d4bc45dba08 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java @@ -46,6 +46,9 @@ public final class StoreStatisticNames { /** {@value}. */ public static final String OP_APPEND = "op_append"; + /** {@value}. */ + public static final String OP_BULK_DELETE = "op_bulk-delete"; + /** {@value}. */ public static final String OP_COPY_FROM_LOCAL_FILE = "op_copy_from_local_file"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 3aec03766dacf..1b75643afd887 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -103,6 +103,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.BulkDeleteSource; import org.apache.hadoop.fs.CommonPathCapabilities; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -119,6 +121,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; import org.apache.hadoop.fs.s3a.impl.AWSCannedACL; import org.apache.hadoop.fs.s3a.impl.AWSHeaders; +import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation; import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper; @@ -280,7 +283,8 @@ @InterfaceStability.Evolving public class S3AFileSystem extends FileSystem implements StreamCapabilities, AWSPolicyProvider, DelegationTokenProvider, IOStatisticsSource, - AuditSpanSource, ActiveThreadSpanSource { + AuditSpanSource, ActiveThreadSpanSource, + BulkDeleteSource { /** * Default blocksize as used in blocksize and FS status queries. @@ -3394,6 +3398,13 @@ private void removeKeysS3( // exit fast if there are no keys to delete return; } + if (keysToDelete.size() == 1) { + // single object is a single delete call. + // this is more informative in server logs and may be more efficient.. + deleteObject(keysToDelete.get(0).key()); + noteDeleted(1, deleteFakeDir); + return; + } for (ObjectIdentifier objectIdentifier : keysToDelete) { blockRootDelete(objectIdentifier.key()); } @@ -5457,7 +5468,11 @@ public boolean hasPathCapability(final Path path, final String capability) case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE: return true; - // multi object delete flag + // this is always true, even if multi object + // delete is disabled -the page size is simply reduced to 1. + case CommonPathCapabilities.BULK_DELETE: + return true; + case ENABLE_MULTI_DELETE: return enableMultiObjectsDelete; @@ -5740,4 +5755,50 @@ public boolean isMultipartUploadEnabled() { return isMultipartUploadEnabled; } + @Override + public BulkDelete createBulkDelete(final Path path) + throws IllegalArgumentException, IOException { + + final Path p = makeQualified(path); + final AuditSpanS3A span = createSpan("bulkdelete", p.toString(), null); + return new BulkDeleteOperation( + createStoreContext(), + createBulkDeleteCallbacks(span), + p, + enableMultiObjectsDelete ? 1: pageSize, + span); + } + + /** + * Override point for mocking. + * @param span span for operations. + * @return an instance of the Bulk Delette callbacks. + */ + protected BulkDeleteOperation.BulkDeleteOperationCallbacks createBulkDeleteCallbacks( + final AuditSpanS3A span) { + return new BulkDeleteOperationCallbacksImpl(span); + } + + /** + * Callbacks for the bulk delete operation. + */ + protected class BulkDeleteOperationCallbacksImpl implements + BulkDeleteOperation.BulkDeleteOperationCallbacks { + + /** span for operations. */ + private final AuditSpan span; + + protected BulkDeleteOperationCallbacksImpl(AuditSpan span) { + this.span = span; + } + + @Override + @Retries.RetryTranslated + public void bulkDelete(final List keys) + throws MultiObjectDeleteException, IOException, IllegalArgumentException { + span.activate(); + once("bulkDelete", "", () -> + S3AFileSystem.this.removeKeys(keys, false)); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index ce3af3de803a4..7102623996e95 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -103,6 +103,10 @@ public enum Statistic { StoreStatisticNames.OP_ACCESS, "Calls of access()", TYPE_DURATION), + INVOCATION_BULK_DELETE( + StoreStatisticNames.OP_BULK_DELETE, + "Calls of bulk delete()", + TYPE_COUNTER), INVOCATION_COPY_FROM_LOCAL_FILE( StoreStatisticNames.OP_COPY_FROM_LOCAL_FILE, "Calls of copyFromLocalFile()", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java new file mode 100644 index 0000000000000..f4d0fea16e7b9 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; + +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.store.audit.AuditSpan; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * S3A Implementation of the {@link BulkDelete} interface. + */ +public class BulkDeleteOperation extends AbstractStoreOperation implements BulkDelete { + + private final BulkDeleteOperationCallbacks callbacks; + + private final Path basePath; + + private final int pageSize; + + public BulkDeleteOperation(final StoreContext storeContext, + final BulkDeleteOperationCallbacks callbacks, + final Path basePath, + final int pageSize, + final AuditSpan span) { + super(storeContext, span); + this.callbacks = requireNonNull(callbacks); + this.basePath = requireNonNull(basePath); + checkArgument(pageSize > 0, "Page size must be greater than 0"); + this.pageSize = pageSize; + } + + @Override + public int pageSize() { + return pageSize; + } + + @Override + public Path basePath() { + return basePath; + } + + @Override + public BulkDeleteOutcome bulkDelete(final List paths) + throws IOException, IllegalArgumentException { + requireNonNull(paths); + checkArgument(paths.size() <= pageSize, + "Number of paths (%d) is larger than the page size (%d)", paths.size(), pageSize); + + final List objects = paths.stream().map(p -> { + checkArgument(p.isAbsolute(), "Path %s is not absolute", p); + final String k = getStoreContext().pathToKey(p); + return ObjectIdentifier.builder().key(k).build(); + }).collect(Collectors.toList()); + try { + callbacks.bulkDelete(objects); + } catch (MultiObjectDeleteException e) { + final List outcomeElements = e.errors() + .stream() + .map(error -> new BulkDeleteOutcomeElement( + getStoreContext().keyToPath(error.key()), + MultiObjectDeleteException.errorToString(error), + null)) + .collect(Collectors.toList()); + return new BulkDeleteOutcome(outcomeElements); + } + return new BulkDeleteOutcome(Collections.emptyList()); + } + + @Override + public void close() throws IOException { + + } + + public interface BulkDeleteOperationCallbacks { + + /** + * Attempt a bulk delete operation. + * @param keys key list + * @throws MultiObjectDeleteException one or more of the keys could not + * be deleted in a multiple object delete operation. + * @throws AwsServiceException amazon-layer failure. + * @throws IOException other IO Exception. + * @throws IllegalArgumentException illegal arguments + */ + @Retries.RetryTranslated + void bulkDelete(final List keys) + throws MultiObjectDeleteException, IOException, IllegalArgumentException; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java index 72ead1fb151fc..14ad559ead293 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java @@ -118,11 +118,7 @@ public IOException translateException(final String message) { String exitCode = ""; for (S3Error error : errors()) { String code = error.code(); - String item = String.format("%s: %s%s: %s%n", code, error.key(), - (error.versionId() != null - ? (" (" + error.versionId() + ")") - : ""), - error.message()); + String item = errorToString(error); LOG.info(item); result.append(item); if (exitCode == null || exitCode.isEmpty() || ACCESS_DENIED.equals(code)) { @@ -136,4 +132,18 @@ public IOException translateException(final String message) { return new AWSS3IOException(result.toString(), this); } } + + /** + * Convert an error to a string. + * @param error error from a delete request + * @return string value + */ + public static String errorToString(final S3Error error) { + String code = error.code(); + return String.format("%s: %s%s: %s%n", code, error.key(), + (error.versionId() != null + ? (" (" + error.versionId() + ")") + : ""), + error.message()); + } } From 89826f881721139f982c0a9409e677871c424148 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 9 Feb 2024 13:47:27 +0000 Subject: [PATCH 2/5] HADOOP-18679. Bulk Delete: utility methods Add methods in FileUtil to take an FS, cast to a BulkDeleteSource then perform the pageSize/bulkDelete operations. This is to make reflection based access straightforward: no new interfaces or types to work with, just two new methods with type-erased lists. Change-Id: I2d7b1bf8198422de635253fc087ca551a8bc6609 --- .../java/org/apache/hadoop/fs/BulkDelete.java | 4 +- .../apache/hadoop/fs/BulkDeleteSource.java | 11 +++- .../java/org/apache/hadoop/fs/FileUtil.java | 62 +++++++++++++++++++ 3 files changed, 73 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java index 4f55b25d9c7ee..94dbb303863c2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java @@ -26,6 +26,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import static java.util.Objects.requireNonNull; + /** * API for bulk deletion of objects/files, * but not directories. @@ -101,7 +103,7 @@ class BulkDeleteOutcome { * @param failures list of failures. This must be non-null. */ public BulkDeleteOutcome(final List failures) { - this.failures = failures; + this.failures = requireNonNull(failures); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java index 9d7dc66b41842..d4cffb39bdf7c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java @@ -36,20 +36,25 @@ @InterfaceStability.Unstable public interface BulkDeleteSource { + /** + * Exception message for the case where bulk delete is not supported. + */ + String BULK_DELETE_NOT_SUPPORTED = "Bulk delete not supported"; + /** * Create a bulk delete operation. * There is no network IO at this point, simply the creation of * a bulk delete object. - * A path must be supplied to ensure that on viewfs and similar filesystems, + * A path must be supplied to assist in link resolution. * @param path path to delete under. * @return the bulk delete. - * @throws UnsupportedOperationException the filesystem does not support delete under that path. + * @throws UnsupportedOperationException bulk delete under that path is not supported. * @throws IllegalArgumentException path not valid. * @throws IOException problems resolving paths */ default BulkDelete createBulkDelete(Path path) throws UnsupportedOperationException, IllegalArgumentException, IOException { - throw new UnsupportedOperationException("Bulk delete not supported"); + throw new UnsupportedOperationException(BULK_DELETE_NOT_SUPPORTED); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java index fa87bb48aaa69..049a76da2da83 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java @@ -55,6 +55,7 @@ import java.util.jar.Attributes; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; +import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import org.apache.commons.collections.map.CaseInsensitiveMap; @@ -2108,4 +2109,65 @@ public static void maybeIgnoreMissingDirectory(FileSystem fs, LOG.info("Ignoring missing directory {}", path); LOG.debug("Directory missing", e); } + + /** + * Get the maximum number of objects/files to delete in a single request. + * @param fs filesystem + * @param path path to delete under. + * @return a number greater than or equal to zero. + * @throws UnsupportedOperationException bulk delete under that path is not supported. + * @throws IllegalArgumentException path not valid. + * @throws IOException problems resolving paths + */ + public static int bulkDeletePageSize(FileSystem fs, Path path) throws IOException { + try (BulkDelete bulk = toBulkDeleteSource(fs).createBulkDelete(path)) { + return bulk.pageSize(); + } + } + + /** + * Convert a filesystem to a bulk delete source. + * @param fs filesystem + * @return cast fs. + * @throws UnsupportedOperationException FS doesn't implement the interface. + */ + private static BulkDeleteSource toBulkDeleteSource(final FileSystem fs) { + if (!(fs instanceof BulkDeleteSource)) { + throw new UnsupportedOperationException(BulkDeleteSource.BULK_DELETE_NOT_SUPPORTED); + } + return (BulkDeleteSource) fs; + } + + /** + * Delete a list of files/objects. + *
    + *
  • Files must be under the path provided in {@code base}.
  • + *
  • The size of the list must be equal to or less than the page size.
  • + *
  • Directories are not supported; the outcome of attempting to delete + * directories is undefined (ignored; undetected, listed as failures...).
  • + *
  • The operation is not atomic.
  • + *
  • The operation is treated as idempotent: network failures may + * trigger resubmission of the request -any new objects created under a + * path in the list may then be deleted.
  • + *
  • There is no guarantee that any parent directories exist after this call. + *
  • + *
+ * @param fs filesystem + * @param base path to delete under. + * @param paths list of paths which must be absolute and under the base path. + * @return a list of all the paths which couldn't be deleted for a reason other than "not found". + * @throws UnsupportedOperationException bulk delete under that path is not supported. + * @throws IOException IO problems including networking, authentication and more. + * @throws IllegalArgumentException if a path argument is invalid. + */ + public static List bulkDelete(FileSystem fs, Path base, List paths) + throws IOException { + try (BulkDelete bulk = toBulkDeleteSource(fs).createBulkDelete(base)) { + final BulkDelete.BulkDeleteOutcome outcome = bulk.bulkDelete(paths); + return outcome.getFailures().stream() + .map(BulkDelete.BulkDeleteOutcomeElement::getPath) + .collect(Collectors.toList()); + } + } + } From 593b79162b56d595b15ee79c0bc8e25cf2e618cf Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 15 Feb 2024 16:05:13 +0000 Subject: [PATCH 3/5] HADOOP-18679. ?: values the wrong way round Change-Id: Ib098c07cc1f7747ed1a3131b252656c96c520a75 --- .../src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 1b75643afd887..6ed13a1d44317 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -5765,7 +5765,7 @@ public BulkDelete createBulkDelete(final Path path) createStoreContext(), createBulkDeleteCallbacks(span), p, - enableMultiObjectsDelete ? 1: pageSize, + enableMultiObjectsDelete ? pageSize : 1, span); } From ea19f4305ab3608860dd2dde2afe0aba7915b746 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 13 Mar 2024 18:59:52 +0000 Subject: [PATCH 4/5] HADOOP-18679. Bulk Delete: initial S3AStore interface/impl Using this PR to start with the initial design, implementation and services offered by having lower-level interaction with S3 pushed down into an S3AStore class, with interface/impl split. The bulk delete callbacks now to talk to the store, *not* s3afs, with some minor changes in behaviour (IllegalArgumentException is raised if root paths / are to be deleted) Mock tests are failing; I expected that: they are always brittle. What next? get this in and then move lower level fs ops over a method calling s3client at a time, or in groups, as appropriate. The metric of success are: * all callback classes created in S3A FS can work through the store * no s3client direct invocation in S3AFS Change-Id: Ib5bc58991533fd5d13e78426e88b884b6ae5205c --- .../java/org/apache/hadoop/fs/BulkDelete.java | 4 +- .../apache/hadoop/util/functional/Tuples.java | 87 ++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 165 +++---- .../apache/hadoop/fs/s3a/S3AInternals.java | 12 +- .../org/apache/hadoop/fs/s3a/S3AStore.java | 119 +++++ .../fs/s3a/impl/BulkDeleteOperation.java | 15 +- .../BulkDeleteOperationCallbacksImpl.java | 94 ++++ .../hadoop/fs/s3a/impl/S3AStoreBuilder.java | 104 +++++ .../hadoop/fs/s3a/impl/S3AStoreImpl.java | 412 ++++++++++++++++++ .../fs/s3a/impl/StoreContextFactory.java | 32 ++ .../tools/hadoop-aws/aws_sdk_upgrade.md | 1 + .../hadoop/fs/s3a/AbstractS3AMockTest.java | 3 +- .../hadoop/fs/s3a/TestS3ADeleteOnExit.java | 3 +- 13 files changed, 921 insertions(+), 130 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Tuples.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java index 94dbb303863c2..af454bc49a8a8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java @@ -38,9 +38,9 @@ * operation completes; if an object store needs to explicitly look for and create * directory markers, that step will be omitted. *

- *

* Be aware that on some stores (AWS S3) each object listed in a bulk delete counts - * against the write IOPS limit; large page sizes are counterproductive here. + * against the write IOPS limit; large page sizes are counterproductive here, as + * are attempts at parallel submissions across multiple threads. * @see HADOOP-16823. * Large DeleteObject requests are their own Thundering Herd *

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Tuples.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Tuples.java new file mode 100644 index 0000000000000..ed80c1daca726 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Tuples.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.functional; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Tuple support. + * This allows for tuples to be passed around as part of the public API without + * committing to a third-party library tuple implementation. + */ +@InterfaceStability.Unstable +public final class Tuples { + + private Tuples() { + } + + /** + * Create a 2-tuple. + * @param key element 1 + * @param value element 2 + * @return a tuple. + * @param element 1 type + * @param element 2 type + */ + public static Map.Entry pair(final K key, final V value) { + return new Tuple<>(key, value); + } + + /** + * Simple tuple class: uses the Map.Entry interface as other + * implementations have done, so the API is available across + * all java versions. + * @param key + * @param value + */ + private static final class Tuple implements Map.Entry { + + private final K key; + + private final V value; + + private Tuple(final K key, final V value) { + this.key = key; + this.value = value; + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return value; + } + + @Override + public V setValue(final V value) { + throw new UnsupportedOperationException("Tuple is immutable"); + } + + @Override + public String toString() { + return "(" + key + ", " + value + ')'; + } + + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 6ed13a1d44317..6b823a53cd020 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -81,7 +81,6 @@ import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; -import software.amazon.awssdk.services.s3.model.S3Error; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.model.StorageClass; import software.amazon.awssdk.services.s3.model.UploadPartRequest; @@ -122,7 +121,7 @@ import org.apache.hadoop.fs.s3a.impl.AWSCannedACL; import org.apache.hadoop.fs.s3a.impl.AWSHeaders; import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation; -import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler; +import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper; import org.apache.hadoop.fs.s3a.impl.ContextAccessors; @@ -143,9 +142,11 @@ import org.apache.hadoop.fs.s3a.impl.RenameOperation; import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl; import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder; +import org.apache.hadoop.fs.s3a.impl.S3AStoreBuilder; import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; +import org.apache.hadoop.fs.s3a.impl.StoreContextFactory; import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl; @@ -244,7 +245,6 @@ import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; -import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403_FORBIDDEN; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; @@ -258,11 +258,11 @@ import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.pairedTrackerFactory; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; -import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.util.Preconditions.checkArgument; +import static org.apache.hadoop.util.RateLimitingFactory.unlimitedRate; import static org.apache.hadoop.util.functional.RemoteIterators.foreach; import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator; @@ -284,7 +284,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, AWSPolicyProvider, DelegationTokenProvider, IOStatisticsSource, AuditSpanSource, ActiveThreadSpanSource, - BulkDeleteSource { + BulkDeleteSource, StoreContextFactory { /** * Default blocksize as used in blocksize and FS status queries. @@ -297,6 +297,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private String username; + /** + * Store back end. + */ + private S3AStore store; + private S3Client s3Client; /** Async client is used for transfer manager. */ @@ -676,9 +681,6 @@ public void initialize(URI name, Configuration originalConf) // the encryption algorithms) bindAWSClient(name, delegationTokensEnabled); - // This initiates a probe against S3 for the bucket existing. - doBucketProbing(); - inputPolicy = S3AInputPolicy.getPolicy( conf.getTrimmed(INPUT_FADVISE, Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT), @@ -725,9 +727,6 @@ public void initialize(URI name, Configuration originalConf) directoryPolicy = DirectoryPolicyImpl.getDirectoryPolicy(conf, this::allowAuthoritative); LOG.debug("Directory marker retention policy is {}", directoryPolicy); - - initMultipartUploads(conf); - pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE, BULK_DELETE_PAGE_SIZE_DEFAULT, 0); checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE, @@ -751,6 +750,25 @@ public void initialize(URI name, Configuration originalConf) optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL, OPTIMIZED_COPY_FROM_LOCAL_DEFAULT); LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal); + + // now create the store + store = new S3AStoreBuilder() + .withS3Client(s3Client) + .withDurationTrackerFactory(getDurationTrackerFactory()) + .withStoreContextFactory(this) + .withAuditSpanSource(getAuditManager()) + .withInstrumentation(getInstrumentation()) + .withStatisticsContext(statisticsContext) + .withStorageStatistics(getStorageStatistics()) + .withReadRateLimiter(unlimitedRate()) + .withWriteRateLimiter(unlimitedRate()) + .build(); + + // The filesystem is now ready to perform operations against + // S3 + // This initiates a probe against S3 for the bucket existing. + doBucketProbing(); + initMultipartUploads(conf); } catch (SdkException e) { // amazon client exception: stop all services then throw the translation cleanupWithLogger(LOG, span); @@ -1412,6 +1430,11 @@ public S3Client getAmazonS3Client(String reason) { return s3Client; } + @Override + public S3AStore getStore() { + return store; + } + /** * S3AInternals method. * {@inheritDoc}. @@ -3059,29 +3082,10 @@ public void incrementWriteOperations() { @Retries.RetryRaw protected void deleteObject(String key) throws SdkException, IOException { - blockRootDelete(key); incrementWriteOperations(); - try (DurationInfo ignored = - new DurationInfo(LOG, false, - "deleting %s", key)) { - invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key), - DELETE_CONSIDERED_IDEMPOTENT, - () -> { - incrementStatistic(OBJECT_DELETE_OBJECTS); - trackDurationOfInvocation(getDurationTrackerFactory(), - OBJECT_DELETE_REQUEST.getSymbol(), - () -> s3Client.deleteObject(getRequestFactory() - .newDeleteObjectRequestBuilder(key) - .build())); - return null; - }); - } catch (AwsServiceException ase) { - // 404 errors get swallowed; this can be raised by - // third party stores (GCS). - if (!isObjectNotFound(ase)) { - throw ase; - } - } + store.deleteObject(getRequestFactory() + .newDeleteObjectRequestBuilder(key) + .build()); } /** @@ -3107,19 +3111,6 @@ void deleteObjectAtPath(Path f, deleteObject(key); } - /** - * Reject any request to delete an object where the key is root. - * @param key key to validate - * @throws InvalidRequestException if the request was rejected due to - * a mistaken attempt to delete the root directory. - */ - private void blockRootDelete(String key) throws InvalidRequestException { - if (key.isEmpty() || "/".equals(key)) { - throw new InvalidRequestException("Bucket "+ bucket - +" cannot be deleted"); - } - } - /** * Perform a bulk object delete operation against S3. * Increments the {@code OBJECT_DELETE_REQUESTS} and write @@ -3146,38 +3137,11 @@ private void blockRootDelete(String key) throws InvalidRequestException { private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest) throws MultiObjectDeleteException, SdkException, IOException { incrementWriteOperations(); - BulkDeleteRetryHandler retryHandler = - new BulkDeleteRetryHandler(createStoreContext()); - int keyCount = deleteRequest.delete().objects().size(); - try (DurationInfo ignored = - new DurationInfo(LOG, false, "DELETE %d keys", - keyCount)) { - DeleteObjectsResponse response = - invoker.retryUntranslated("delete", DELETE_CONSIDERED_IDEMPOTENT, - (text, e, r, i) -> { - // handle the failure - retryHandler.bulkDeleteRetried(deleteRequest, e); - }, - // duration is tracked in the bulk delete counters - trackDurationOfOperation(getDurationTrackerFactory(), - OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> { - incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount); - return s3Client.deleteObjects(deleteRequest); - })); - - if (!response.errors().isEmpty()) { - // one or more of the keys could not be deleted. - // log and then throw - List errors = response.errors(); - LOG.debug("Partial failure of delete, {} errors", errors.size()); - for (S3Error error : errors) { - LOG.debug("{}: \"{}\" - {}", error.key(), error.code(), error.message()); - } - throw new MultiObjectDeleteException(errors); - } - - return response; + DeleteObjectsResponse response = store.deleteObjects(deleteRequest).getValue(); + if (!response.errors().isEmpty()) { + throw new MultiObjectDeleteException(response.errors()); } + return response; } /** @@ -3386,14 +3350,6 @@ private void removeKeysS3( List keysToDelete, boolean deleteFakeDir) throws MultiObjectDeleteException, AwsServiceException, IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Initiating delete operation for {} objects", - keysToDelete.size()); - for (ObjectIdentifier objectIdentifier : keysToDelete) { - LOG.debug(" \"{}\" {}", objectIdentifier.key(), - objectIdentifier.versionId() != null ? objectIdentifier.versionId() : ""); - } - } if (keysToDelete.isEmpty()) { // exit fast if there are no keys to delete return; @@ -3405,9 +3361,6 @@ private void removeKeysS3( noteDeleted(1, deleteFakeDir); return; } - for (ObjectIdentifier objectIdentifier : keysToDelete) { - blockRootDelete(objectIdentifier.key()); - } try { if (enableMultiObjectsDelete) { if (keysToDelete.size() <= pageSize) { @@ -5654,6 +5607,7 @@ public S3AMultipartUploaderBuilder createMultipartUploader( * new store context instances should be created as appropriate. * @return the store context of this FS. */ + @Override @InterfaceAudience.Private public StoreContext createStoreContext() { return new StoreContextBuilder().setFsURI(getUri()) @@ -5761,44 +5715,23 @@ public BulkDelete createBulkDelete(final Path path) final Path p = makeQualified(path); final AuditSpanS3A span = createSpan("bulkdelete", p.toString(), null); + final int size = enableMultiObjectsDelete ? pageSize : 1; return new BulkDeleteOperation( createStoreContext(), - createBulkDeleteCallbacks(span), + createBulkDeleteCallbacks(p, size, span), p, - enableMultiObjectsDelete ? pageSize : 1, + size, span); } /** - * Override point for mocking. + * Create the callbacks for the bulk delete operation. * @param span span for operations. - * @return an instance of the Bulk Delette callbacks. + * @return an instance of the Bulk Delete callbacks. */ protected BulkDeleteOperation.BulkDeleteOperationCallbacks createBulkDeleteCallbacks( - final AuditSpanS3A span) { - return new BulkDeleteOperationCallbacksImpl(span); + Path path, int pageSize, AuditSpanS3A span) { + return new BulkDeleteOperationCallbacksImpl(store, pathToKey(path), pageSize, span); } - /** - * Callbacks for the bulk delete operation. - */ - protected class BulkDeleteOperationCallbacksImpl implements - BulkDeleteOperation.BulkDeleteOperationCallbacks { - - /** span for operations. */ - private final AuditSpan span; - - protected BulkDeleteOperationCallbacksImpl(AuditSpan span) { - this.span = span; - } - - @Override - @Retries.RetryTranslated - public void bulkDelete(final List keys) - throws MultiObjectDeleteException, IOException, IllegalArgumentException { - span.activate(); - once("bulkDelete", "", () -> - S3AFileSystem.this.removeKeys(keys, false)); - } - } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java index b4116068565c2..3f3178c7e6e28 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java @@ -33,6 +33,9 @@ /** * This is an unstable interface for access to S3A Internal state, S3 operations * and the S3 client connector itself. + *

+ * Note for maintainers: this is documented in {@code aws_sdk_upgrade.md}; update + * on changes. */ @InterfaceStability.Unstable @InterfaceAudience.LimitedPrivate("testing/diagnostics") @@ -52,13 +55,19 @@ public interface S3AInternals { * set to false. *

* Mocking note: this is the same S3Client as is used by the owning - * filesystem; changes to this client will be reflected by changes + * filesystem and S3AStore; changes to this client will be reflected by changes * in the behavior of that filesystem. * @param reason a justification for requesting access. * @return S3Client */ S3Client getAmazonS3Client(String reason); + /** + * Get the store for low-level operations. + * @return the store the S3A FS is working through. + */ + S3AStore getStore(); + /** * Get the region of a bucket. * Invoked from StoreContext; consider an entry point. @@ -131,4 +140,5 @@ public interface S3AInternals { @AuditEntryPoint @Retries.RetryTranslated long abortMultipartUploads(Path path) throws IOException; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java new file mode 100644 index 0000000000000..c029364743cce --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; + +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException; +import org.apache.hadoop.fs.s3a.impl.StoreContext; +import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; + +/** + * Interface for the S3A Store; + * S3 client interactions should be via this; mocking + * is possible for unit tests. + */ +@InterfaceAudience.LimitedPrivate("Extensions") +@InterfaceStability.Unstable +public interface S3AStore extends IOStatisticsSource { + + /** + * Acquire write capacity for operations. + * This should be done within retry loops. + * @param capacity capacity to acquire. + * @return time spent waiting for output. + */ + Duration acquireWriteCapacity(int capacity); + + /** + * Acquire read capacity for operations. + * This should be done within retry loops. + * @param capacity capacity to acquire. + * @return time spent waiting for output. + */ + Duration acquireReadCapacity(int capacity); + + StoreContext getStoreContext(); + + DurationTrackerFactory getDurationTrackerFactory(); + + S3AStatisticsContext getStatisticsContext(); + + RequestFactory getRequestFactory(); + + /** + * Perform a bulk object delete operation against S3. + * Increments the {@code OBJECT_DELETE_REQUESTS} and write + * operation statistics + *

+ * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number + * of objects deleted in the request. + *

+ * Retry policy: retry untranslated; delete considered idempotent. + * If the request is throttled, this is logged in the throttle statistics, + * with the counter set to the number of keys, rather than the number + * of invocations of the delete operation. + * This is because S3 considers each key as one mutating operation on + * the store when updating its load counters on a specific partition + * of an S3 bucket. + * If only the request was measured, this operation would under-report. + * @param deleteRequest keys to delete on the s3-backend + * @return the AWS response + * @throws MultiObjectDeleteException one or more of the keys could not + * be deleted. + * @throws SdkException amazon-layer failure. + */ + @Retries.RetryRaw + Map.Entry deleteObjects(DeleteObjectsRequest deleteRequest) + throws MultiObjectDeleteException, SdkException, IOException; + + /** + * Delete an object. + * Increments the {@code OBJECT_DELETE_REQUESTS} statistics. + *

+ * Retry policy: retry untranslated; delete considered idempotent. + * 404 errors other than bucket not found are swallowed; + * this can be raised by third party stores (GCS). + * If an exception is caught and swallowed, the response will be empty; + * otherwise it will be the response from the delete operation. + * @param request request to make + * @return the total duration and response. + * @throws SdkException problems working with S3 + * @throws IllegalArgumentException if the request was rejected due to + * a mistaken attempt to delete the root directory. + */ + @Retries.RetryRaw + Map.Entry> deleteObject( + DeleteObjectRequest request) throws SdkException; + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java index f4d0fea16e7b9..61e666cacbfef 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java @@ -79,14 +79,14 @@ public BulkDeleteOutcome bulkDelete(final List paths) final String k = getStoreContext().pathToKey(p); return ObjectIdentifier.builder().key(k).build(); }).collect(Collectors.toList()); - try { - callbacks.bulkDelete(objects); - } catch (MultiObjectDeleteException e) { - final List outcomeElements = e.errors() + + final List errors = callbacks.bulkDelete(objects); + if (!errors.isEmpty()) { + + final List outcomeElements = errors .stream() .map(error -> new BulkDeleteOutcomeElement( - getStoreContext().keyToPath(error.key()), - MultiObjectDeleteException.errorToString(error), + getStoreContext().keyToPath(error), error, null)) .collect(Collectors.toList()); return new BulkDeleteOutcome(outcomeElements); @@ -104,6 +104,7 @@ public interface BulkDeleteOperationCallbacks { /** * Attempt a bulk delete operation. * @param keys key list + * @return * @throws MultiObjectDeleteException one or more of the keys could not * be deleted in a multiple object delete operation. * @throws AwsServiceException amazon-layer failure. @@ -111,7 +112,7 @@ public interface BulkDeleteOperationCallbacks { * @throws IllegalArgumentException illegal arguments */ @Retries.RetryTranslated - void bulkDelete(final List keys) + List bulkDelete(final List keys) throws MultiObjectDeleteException, IOException, IllegalArgumentException; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java new file mode 100644 index 0000000000000..4c4c1dfe44aff --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Error; + +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.store.audit.AuditSpan; + +import static org.apache.hadoop.fs.s3a.Invoker.once; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * Callbacks for the bulk delete operation. + */ +public class BulkDeleteOperationCallbacksImpl implements + BulkDeleteOperation.BulkDeleteOperationCallbacks { + + private final String path; + + private final int pageSize; + + /** span for operations. */ + private final AuditSpan span; + + private final S3AStore store; + + + public BulkDeleteOperationCallbacksImpl(final S3AStore store, + String path, int pageSize, AuditSpan span) { + this.span = span; + this.pageSize = pageSize; + this.path = path; + this.store = store; + } + + @Override + @Retries.RetryTranslated + public List bulkDelete(final List keysToDelete) + throws MultiObjectDeleteException, IOException, IllegalArgumentException { + span.activate(); + final int size = keysToDelete.size(); + checkArgument(size <= pageSize, + "Too many paths to delete in one operation: %s", size); + if (size == 0) { + return Collections.emptyList(); + } + if (size == 1) { + store.deleteObject(store.getRequestFactory() + .newDeleteObjectRequestBuilder(keysToDelete.get(0).key()) + .build()); + // no failures, so return an empty list + return Collections.emptyList(); + } + + final DeleteObjectsResponse response = once("bulkDelete", path, () -> + store.deleteObjects(store.getRequestFactory() + .newBulkDeleteRequestBuilder(keysToDelete) + .build())).getValue(); + final List errors = response.errors(); + if (errors.isEmpty()) { + // all good. + return Collections.emptyList(); + } else { + return errors.stream() + .map(S3Error::key) + .collect(Collectors.toList()); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java new file mode 100644 index 0000000000000..40f649a7378b6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import software.amazon.awssdk.services.s3.S3Client; + +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.S3AStorageStatistics; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.store.audit.AuditSpanSource; +import org.apache.hadoop.util.RateLimiting; + +/** + * Builder for the S3AStore. + */ +public class S3AStoreBuilder { + + private StoreContextFactory storeContextFactory; + + private S3Client s3Client; + + private DurationTrackerFactory durationTrackerFactory; + + private S3AInstrumentation instrumentation; + + private S3AStatisticsContext statisticsContext; + + private S3AStorageStatistics storageStatistics; + + private RateLimiting readRateLimiter; + + private RateLimiting writeRateLimiter; + + private AuditSpanSource auditSpanSource; + + public S3AStoreBuilder withStoreContextFactory(final StoreContextFactory storeContextFactory) { + this.storeContextFactory = storeContextFactory; + return this; + } + + public S3AStoreBuilder withS3Client(final S3Client s3Client) { + this.s3Client = s3Client; + return this; + } + + public S3AStoreBuilder withDurationTrackerFactory(final DurationTrackerFactory durationTrackerFactory) { + this.durationTrackerFactory = durationTrackerFactory; + return this; + } + + public S3AStoreBuilder withInstrumentation(final S3AInstrumentation instrumentation) { + this.instrumentation = instrumentation; + return this; + } + + public S3AStoreBuilder withStatisticsContext(final S3AStatisticsContext statisticsContext) { + this.statisticsContext = statisticsContext; + return this; + } + + public S3AStoreBuilder withStorageStatistics(final S3AStorageStatistics storageStatistics) { + this.storageStatistics = storageStatistics; + return this; + } + + public S3AStoreBuilder withReadRateLimiter(final RateLimiting readRateLimiter) { + this.readRateLimiter = readRateLimiter; + return this; + } + + public S3AStoreBuilder withWriteRateLimiter(final RateLimiting writeRateLimiter) { + this.writeRateLimiter = writeRateLimiter; + return this; + } + + public S3AStoreBuilder withAuditSpanSource(final AuditSpanSource auditSpanSource) { + this.auditSpanSource = auditSpanSource; + return this; + } + + public S3AStore build() { + return new S3AStoreImpl(storeContextFactory, s3Client, durationTrackerFactory, instrumentation, + statisticsContext, storageStatistics, readRateLimiter, writeRateLimiter, auditSpanSource); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java new file mode 100644 index 0000000000000..6d60b71a95cb2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Error; + +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.S3AStorageStatistics; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.store.audit.AuditSpanSource; +import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.RateLimiting; +import org.apache.hadoop.util.functional.Tuples; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException; +import static org.apache.hadoop.fs.s3a.Statistic.IGNORED_ERRORS; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_BULK_DELETE_REQUEST; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_OBJECTS; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_REQUEST; +import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_RETRY; +import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLED; +import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLE_RATE; +import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * Store Layer. + * This is where lower level storage operations are intended + * to move. + */ +public class S3AStoreImpl implements S3AStore { + + private static final Logger LOG = LoggerFactory.getLogger(S3AStoreImpl.class); + + private final StoreContextFactory storeContextFactory; + + private final S3Client s3Client; + + private final String bucket; + + private final RequestFactory requestFactory; + + /** Async client is used for transfer manager. */ + private S3AsyncClient s3AsyncClient; + + private final DurationTrackerFactory durationTrackerFactory; + + /** The core instrumentation. */ + private final S3AInstrumentation instrumentation; + + /** Accessors to statistics for this FS. */ + private final S3AStatisticsContext statisticsContext; + + /** Storage Statistics Bonded to the instrumentation. */ + private final S3AStorageStatistics storageStatistics; + + private final RateLimiting readRateLimiter; + + private final RateLimiting writeRateLimiter; + + private final StoreContext storeContext; + + private final Invoker invoker; + + private final AuditSpanSource auditSpanSource; + + S3AStoreImpl(StoreContextFactory storeContextFactory, + S3Client s3Client, + DurationTrackerFactory durationTrackerFactory, + S3AInstrumentation instrumentation, + S3AStatisticsContext statisticsContext, + S3AStorageStatistics storageStatistics, + RateLimiting readRateLimiter, + RateLimiting writeRateLimiter, + AuditSpanSource auditSpanSource) { + this.storeContextFactory = requireNonNull(storeContextFactory); + this.s3Client = requireNonNull(s3Client); + this.durationTrackerFactory = requireNonNull(durationTrackerFactory); + this.instrumentation = requireNonNull(instrumentation); + this.statisticsContext = requireNonNull(statisticsContext); + this.storageStatistics = requireNonNull(storageStatistics); + this.readRateLimiter = requireNonNull(readRateLimiter); + this.writeRateLimiter = requireNonNull(writeRateLimiter); + this.auditSpanSource = requireNonNull(auditSpanSource); + this.storeContext = requireNonNull(storeContextFactory.createStoreContext()); + this.invoker = storeContext.getInvoker(); + this.bucket = storeContext.getBucket(); + this.requestFactory = storeContext.getRequestFactory(); + } + + @Override + public Duration acquireWriteCapacity(final int capacity) { + return writeRateLimiter.acquire(capacity); + } + + @Override + public Duration acquireReadCapacity(final int capacity) { + return readRateLimiter.acquire(capacity); + + } + + /** + * Create the store context. + * @return a new store context. + */ + private StoreContext createStoreContext() { + return storeContextFactory.createStoreContext(); + } + + @Override + public StoreContext getStoreContext() { + return storeContext; + } + + private S3Client getS3Client() { + return s3Client; + } + + @Override + public DurationTrackerFactory getDurationTrackerFactory() { + return durationTrackerFactory; + } + + private S3AInstrumentation getInstrumentation() { + return instrumentation; + } + + @Override + public S3AStatisticsContext getStatisticsContext() { + return statisticsContext; + } + + private S3AStorageStatistics getStorageStatistics() { + return storageStatistics; + } + + @Override + public RequestFactory getRequestFactory() { + return requestFactory; + } + + /** + * Increment a statistic by 1. + * This increments both the instrumentation and storage statistics. + * @param statistic The operation to increment + */ + protected void incrementStatistic(Statistic statistic) { + incrementStatistic(statistic, 1); + } + + /** + * Increment a statistic by a specific value. + * This increments both the instrumentation and storage statistics. + * @param statistic The operation to increment + * @param count the count to increment + */ + protected void incrementStatistic(Statistic statistic, long count) { + statisticsContext.incrementCounter(statistic, count); + } + + /** + * Decrement a gauge by a specific value. + * @param statistic The operation to decrement + * @param count the count to decrement + */ + protected void decrementGauge(Statistic statistic, long count) { + statisticsContext.decrementGauge(statistic, count); + } + + /** + * Increment a gauge by a specific value. + * @param statistic The operation to increment + * @param count the count to increment + */ + protected void incrementGauge(Statistic statistic, long count) { + statisticsContext.incrementGauge(statistic, count); + } + + /** + * Callback when an operation was retried. + * Increments the statistics of ignored errors or throttled requests, + * depending up on the exception class. + * @param ex exception. + */ + public void operationRetried(Exception ex) { + if (isThrottleException(ex)) { + LOG.debug("Request throttled"); + incrementStatistic(STORE_IO_THROTTLED); + statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1); + } else { + incrementStatistic(STORE_IO_RETRY); + incrementStatistic(IGNORED_ERRORS); + } + } + + /** + * Callback from {@link Invoker} when an operation is retried. + * @param text text of the operation + * @param ex exception + * @param retries number of retries + * @param idempotent is the method idempotent + */ + public void operationRetried(String text, Exception ex, int retries, boolean idempotent) { + operationRetried(ex); + } + + /** + * Get the instrumentation's IOStatistics. + * @return statistics + */ + @Override + public IOStatistics getIOStatistics() { + return instrumentation.getIOStatistics(); + } + + /** + * Start an operation; this informs the audit service of the event + * and then sets it as the active span. + * @param operation operation name. + * @param path1 first path of operation + * @param path2 second path of operation + * @return a span for the audit + * @throws IOException failure + */ + public AuditSpanS3A createSpan(String operation, @Nullable String path1, @Nullable String path2) + throws IOException { + + return auditSpanSource.createSpan(operation, path1, path2); + } + + /** + * Reject any request to delete an object where the key is root. + * @param key key to validate + * @throws IllegalArgumentException if the request was rejected due to + * a mistaken attempt to delete the root directory. + */ + private void blockRootDelete(String key) throws IllegalArgumentException { + checkArgument(!key.isEmpty() && !"/".equals(key), "Bucket %s cannot be deleted", bucket); + } + + /** + * Perform a bulk object delete operation against S3. + * Increments the {@code OBJECT_DELETE_REQUESTS} and write + * operation statistics + *

+ * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number + * of objects deleted in the request. + *

+ * Retry policy: retry untranslated; delete considered idempotent. + * If the request is throttled, this is logged in the throttle statistics, + * with the counter set to the number of keys, rather than the number + * of invocations of the delete operation. + * This is because S3 considers each key as one mutating operation on + * the store when updating its load counters on a specific partition + * of an S3 bucket. + * If only the request was measured, this operation would under-report. + * @param deleteRequest keys to delete on the s3-backend + * @return the AWS response + * @throws IllegalArgumentException if the request was rejected due to + * a mistaken attempt to delete the root directory + * @throws SdkException amazon-layer failure. + */ + @Override + @Retries.RetryRaw + public Map.Entry deleteObjects(final DeleteObjectsRequest deleteRequest) + throws SdkException { + + DeleteObjectsResponse response; + BulkDeleteRetryHandler retryHandler = new BulkDeleteRetryHandler(createStoreContext()); + + final List keysToDelete = deleteRequest.delete().objects(); + int keyCount = keysToDelete.size(); + if (LOG.isDebugEnabled()) { + LOG.debug("Initiating delete operation for {} objects", keysToDelete.size()); + keysToDelete.stream().forEach(objectIdentifier -> { + LOG.debug(" \"{}\" {}", objectIdentifier.key(), + objectIdentifier.versionId() != null ? objectIdentifier.versionId() : ""); + }); + } + // block root calls + keysToDelete.stream().map(ObjectIdentifier::key).forEach(this::blockRootDelete); + + try (DurationInfo d = new DurationInfo(LOG, false, "DELETE %d keys", keyCount)) { + response = + invoker.retryUntranslated("delete", DELETE_CONSIDERED_IDEMPOTENT, (text, e, r, i) -> { + // handle the failure + retryHandler.bulkDeleteRetried(deleteRequest, e); + }, + // duration is tracked in the bulk delete counters + trackDurationOfOperation(getDurationTrackerFactory(), + OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> { + acquireWriteCapacity(keyCount); + incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount); + return s3Client.deleteObjects(deleteRequest); + })); + if (!response.errors().isEmpty()) { + // one or more of the keys could not be deleted. + // log and then throw + List errors = response.errors(); + if (LOG.isDebugEnabled()) { + LOG.debug("Partial failure of delete, {} errors", errors.size()); + for (S3Error error : errors) { + LOG.debug("{}: \"{}\" - {}", error.key(), error.code(), error.message()); + } + } + } + d.close(); + return Tuples.pair(d.asDuration(), response); + + } catch (IOException e) { + // this is part of the retry signature, nothing else. + // convert to unchecked. + throw new UncheckedIOException(e); + } + } + + /** + * Delete an object. + * Increments the {@code OBJECT_DELETE_REQUESTS} statistics. + *

+ * Retry policy: retry untranslated; delete considered idempotent. + * 404 errors other than bucket not found are swallowed; + * this can be raised by third party stores (GCS). + * If an exception is caught and swallowed, the response will be empty; + * otherwise it will be the response from the delete operation. + * @param request request to make + * @return the total duration and response. + * @throws SdkException problems working with S3 + * @throws IllegalArgumentException if the request was rejected due to + * a mistaken attempt to delete the root directory. + */ + @Override + @Retries.RetryRaw + public Map.Entry> deleteObject(final DeleteObjectRequest request) + throws SdkException { + + String key = request.key(); + blockRootDelete(key); + DurationInfo d = new DurationInfo(LOG, false, "deleting %s", key); + try { + DeleteObjectResponse response = + invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key), + DELETE_CONSIDERED_IDEMPOTENT, trackDurationOfOperation(getDurationTrackerFactory(), + OBJECT_DELETE_REQUEST.getSymbol(), () -> { + incrementStatistic(OBJECT_DELETE_OBJECTS); + acquireWriteCapacity(1); + return s3Client.deleteObject(request); + })); + d.close(); + return Tuples.pair(d.asDuration(), Optional.of(response)); + } catch (AwsServiceException ase) { + // 404 errors get swallowed; this can be raised by + // third party stores (GCS). + if (!isObjectNotFound(ase)) { + throw ase; + } + d.close(); + return Tuples.pair(d.asDuration(), Optional.empty()); + } catch (IOException e) { + // this is part of the retry signature, nothing else. + // convert to unchecked. + throw new UncheckedIOException(e); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java new file mode 100644 index 0000000000000..355288619d30a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private +public interface StoreContextFactory { + + /** + * Build an immutable store context, including picking + * up the current audit span. + * @return the store context. + */ + StoreContext createStoreContext(); +} diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md index e2c095e5317a4..abd58bffc6201 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md @@ -324,6 +324,7 @@ They have also been updated to return V2 SDK classes. public interface S3AInternals { S3Client getAmazonS3V2Client(String reason); + S3AStore getStore(); @Retries.RetryTranslated @AuditEntryPoint String getBucketLocation() throws IOException; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java index 734bcfd9c5d30..f43710cf25eb0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java @@ -35,8 +35,7 @@ /** - * Abstract base class for S3A unit tests using a mock S3 client and a null - * metadata store. + * Abstract base class for S3A unit tests using a mock S3 client. */ public abstract class AbstractS3AMockTest { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java index a4162f212179b..28a443f04cda9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java @@ -61,9 +61,8 @@ public boolean deleteOnExit(Path f) throws IOException { // processDeleteOnExit. @Override protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException { - boolean result = super.deleteWithoutCloseCheck(f, recursive); deleteOnDnExitCount--; - return result; + return true; } } From 1420c999b79fea8d95a9789d250542f0d3cf2c40 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 14 Mar 2024 16:00:16 +0000 Subject: [PATCH 5/5] HADOOP-18679. Bulk Delete getting the API simpler Changing results of method calls, using Tuples.pair() to return Map.Entry() instances as immutable tuples. Change-Id: Ibdd5a5b11fe0a57b293b9cb623272e272c8bab69 --- .../java/org/apache/hadoop/fs/BulkDelete.java | 61 +--- .../apache/hadoop/fs/BulkDeleteSource.java | 7 +- .../java/org/apache/hadoop/fs/FileUtil.java | 12 +- .../site/markdown/filesystem/bulkdelete.md | 284 ++++++++++++++++++ .../org/apache/hadoop/fs/s3a/S3AStore.java | 1 + .../fs/s3a/impl/BulkDeleteOperation.java | 50 +-- .../BulkDeleteOperationCallbacksImpl.java | 51 +++- 7 files changed, 360 insertions(+), 106 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java index af454bc49a8a8..9e8b052a40a41 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -81,65 +82,7 @@ public interface BulkDelete extends IOStatisticsSource, Closeable { * @throws IOException IO problems including networking, authentication and more. * @throws IllegalArgumentException if a path argument is invalid. */ - BulkDeleteOutcome bulkDelete(List paths) + List> bulkDelete(List paths) throws IOException, IllegalArgumentException; - /** - * The outcome: a list of paths which failed to delete. - * An empty list means all files were successfully deleted. - * There are no guarantees about the ordering of the list. - * Reasons for failures are not provided. - * File Not Found is not a failure. - */ - class BulkDeleteOutcome { - - /** - * List of paths which failed to delete. - */ - private final List failures; - - /** - * Constructor. - * @param failures list of failures. This must be non-null. - */ - public BulkDeleteOutcome(final List failures) { - this.failures = requireNonNull(failures); - } - - /** - * Get the list of failures. - * @return a possibly empty list of failures. - */ - public List getFailures() { - return failures; - } - } - - class BulkDeleteOutcomeElement { - private final Path path; - private final String error; - private final Exception exception; - - public BulkDeleteOutcomeElement( - final Path path, - final String error, - final Exception exception) { - this.path = path; - this.error = error; - this.exception = exception; - } - - public Path getPath() { - return path; - } - - public String getError() { - return error; - } - - public Exception getException() { - return exception; - } - } - } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java index d4cffb39bdf7c..1cc5de653ded8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java @@ -36,11 +36,6 @@ @InterfaceStability.Unstable public interface BulkDeleteSource { - /** - * Exception message for the case where bulk delete is not supported. - */ - String BULK_DELETE_NOT_SUPPORTED = "Bulk delete not supported"; - /** * Create a bulk delete operation. * There is no network IO at this point, simply the creation of @@ -54,7 +49,7 @@ public interface BulkDeleteSource { */ default BulkDelete createBulkDelete(Path path) throws UnsupportedOperationException, IllegalArgumentException, IOException { - throw new UnsupportedOperationException(BULK_DELETE_NOT_SUPPORTED); + throw new UnsupportedOperationException("Bulk delete not supported"); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java index 049a76da2da83..fabe589e24cf5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java @@ -55,7 +55,6 @@ import java.util.jar.Attributes; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; -import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import org.apache.commons.collections.map.CaseInsensitiveMap; @@ -2133,7 +2132,7 @@ public static int bulkDeletePageSize(FileSystem fs, Path path) throws IOExceptio */ private static BulkDeleteSource toBulkDeleteSource(final FileSystem fs) { if (!(fs instanceof BulkDeleteSource)) { - throw new UnsupportedOperationException(BulkDeleteSource.BULK_DELETE_NOT_SUPPORTED); + throw new UnsupportedOperationException("Bulk delete not supported"); } return (BulkDeleteSource) fs; } @@ -2155,18 +2154,15 @@ private static BulkDeleteSource toBulkDeleteSource(final FileSystem fs) { * @param fs filesystem * @param base path to delete under. * @param paths list of paths which must be absolute and under the base path. - * @return a list of all the paths which couldn't be deleted for a reason other than "not found". + * @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message. * @throws UnsupportedOperationException bulk delete under that path is not supported. * @throws IOException IO problems including networking, authentication and more. * @throws IllegalArgumentException if a path argument is invalid. */ - public static List bulkDelete(FileSystem fs, Path base, List paths) + public static List> bulkDelete(FileSystem fs, Path base, List paths) throws IOException { try (BulkDelete bulk = toBulkDeleteSource(fs).createBulkDelete(base)) { - final BulkDelete.BulkDeleteOutcome outcome = bulk.bulkDelete(paths); - return outcome.getFailures().stream() - .map(BulkDelete.BulkDeleteOutcomeElement::getPath) - .collect(Collectors.toList()); + return bulk.bulkDelete(paths); } } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md new file mode 100644 index 0000000000000..2f98417c061d0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md @@ -0,0 +1,284 @@ + + +# interface `BulkDelete` + + + +The `BulkDelete` interface provides an API to perform bulk delete of files/objects +in an object store or filesystem. + +## Key Features + +* An API for submitting a list of paths to delete. +* This list must be no larger than the "page size" supported by the client; This size is also exposed as a method. +* Triggers a request to delete files at the specific paths. +* Returns a list of which paths were reported as delete failures by the store. +* Does not consider a nonexistent file to be a failure. +* Does not offer any atomicity guarantees. +* Idempotency guarantees are weak: retries may delete files newly created by other clients. +* Provides no guarantees as to the outcome if a path references a directory. +* Provides no guarantees that parent directories will exist after the call. + + +The API is designed to match the semantics of the AWS S3 [Bulk Delete](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) REST API call, but it is not +exclusively restricted to this store. This is why the "provides no guarantees" +restrictions do not state what the outcome will be when executed on other stores. + +### Interface `org.apache.hadoop.fs.BulkDeleteSource` + +The interface `BulkDeleteSource` is offered by a FileSystem/FileContext class if +it supports the API. + +```java +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDeleteSource { + + /** + * Create a bulk delete operation. + * There is no network IO at this point, simply the creation of + * a bulk delete object. + * A path must be supplied to assist in link resolution. + * @param path path to delete under. + * @return the bulk delete. + * @throws UnsupportedOperationException bulk delete under that path is not supported. + * @throws IllegalArgumentException path not valid. + * @throws IOException problems resolving paths + */ + default BulkDelete createBulkDelete(Path path) + throws UnsupportedOperationException, IllegalArgumentException, IOException; + +} + +``` + +### Interface `org.apache.hadoop.fs.BulkDelete` + +This is the bulk delete implementation returned by the `createBulkDelete()` call. + +```java +/** + * API for bulk deletion of objects/files, + * but not directories. + * After use, call {@code close()} to release any resources and + * to guarantee store IOStatistics are updated. + *

+ * Callers MUST have no expectation that parent directories will exist after the + * operation completes; if an object store needs to explicitly look for and create + * directory markers, that step will be omitted. + *

+ * Be aware that on some stores (AWS S3) each object listed in a bulk delete counts + * against the write IOPS limit; large page sizes are counterproductive here, as + * are attempts at parallel submissions across multiple threads. + * @see HADOOP-16823. + * Large DeleteObject requests are their own Thundering Herd + *

+ */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDelete extends IOStatisticsSource, Closeable { + + /** + * The maximum number of objects/files to delete in a single request. + * @return a number greater than or equal to zero. + */ + int pageSize(); + + /** + * Base path of a bulk delete operation. + * All paths submitted in {@link #bulkDelete(List)} must be under this path. + */ + Path basePath(); + + /** + * Delete a list of files/objects. + *

    + *
  • Files must be under the path provided in {@link #basePath()}.
  • + *
  • The size of the list must be equal to or less than the page size + * declared in {@link #pageSize()}.
  • + *
  • Directories are not supported; the outcome of attempting to delete + * directories is undefined (ignored; undetected, listed as failures...).
  • + *
  • The operation is not atomic.
  • + *
  • The operation is treated as idempotent: network failures may + * trigger resubmission of the request -any new objects created under a + * path in the list may then be deleted.
  • + *
  • There is no guarantee that any parent directories exist after this call. + *
  • + *
+ * @param paths list of paths which must be absolute and under the base path. + * provided in {@link #basePath()}. + * @throws IOException IO problems including networking, authentication and more. + * @throws IllegalArgumentException if a path argument is invalid. + */ + List> bulkDelete(List paths) + throws IOException, IllegalArgumentException; + +} + +``` + +### `bulkDelete(paths)` + +#### Preconditions + +```python +if length(paths) > pageSize: throw IllegalArgumentException +``` + +#### Postconditions + +All paths which refer to files are removed from the set of files. +```python +FS'Files = FS.Files - [paths] +``` + +No other restrictions are placed upon the outcome. + + +### Availability + +The `BulkDeleteSource` interface is exported by `FileSystem` and `FileContext` storage clients +which MAY support the API; it may still be unsupported by the +specific instance. + +Use the `PathCapabilities` probe `fs.capability.bulk.delete`. + +```java +store.hasPathCapability(path, "fs.capability.bulk.delete") +``` + +### Invocation through Reflection. + +The need for many Libraries to compile against very old versions of Hadoop +means that most of the cloud-first Filesystem API calls cannot be used except +through reflection -And the more complicated The API and its data types are, +The harder that reflection is to implement. + +To assist this, the class `org.apache.hadoop.fs.FileUtil` has two methods +which are intended to provide simple access to the API, especially +through reflection. + +```java + /** + * Get the maximum number of objects/files to delete in a single request. + * @param fs filesystem + * @param path path to delete under. + * @return a number greater than or equal to zero. + * @throws UnsupportedOperationException bulk delete under that path is not supported. + * @throws IllegalArgumentException path not valid. + * @throws IOException problems resolving paths + */ + public static int bulkDeletePageSize(FileSystem fs, Path path) throws IOException; + + /** + * Delete a list of files/objects. + *
    + *
  • Files must be under the path provided in {@code base}.
  • + *
  • The size of the list must be equal to or less than the page size.
  • + *
  • Directories are not supported; the outcome of attempting to delete + * directories is undefined (ignored; undetected, listed as failures...).
  • + *
  • The operation is not atomic.
  • + *
  • The operation is treated as idempotent: network failures may + * trigger resubmission of the request -any new objects created under a + * path in the list may then be deleted.
  • + *
  • There is no guarantee that any parent directories exist after this call. + *
  • + *
+ * @param fs filesystem + * @param base path to delete under. + * @param paths list of paths which must be absolute and under the base path. + * @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message. + * @throws UnsupportedOperationException bulk delete under that path is not supported. + * @throws IOException IO problems including networking, authentication and more. + * @throws IllegalArgumentException if a path argument is invalid. + */ + public static List> bulkDelete(FileSystem fs, Path base, List paths) +``` + +## S3A Implementation + +The S3A client exports this API. + +If multi-object delete is enabled (`fs.s3a.multiobjectdelete.enable` = true), as +it is by default, then the page size is limited to that defined in +`fs.s3a.bulk.delete.page.size`, which MUST be less than or equal to1000. +* The entire list of paths to delete is aggregated into a single bulk delete request, +issued to the store. +* Provided the caller has the correct permissions, every entry in the list + will, if the path references an object, cause that object to be deleted. +* If the path does not reference an object: the path will not be deleted + "This is for deleting objects, not directories" +* No probes for the existence of parent directories will take place; no + parent directory markers will be created. + "If you need parent directories, call mkdir() yourself" +* The list of failed keys listed in the `DeleteObjectsResponse` response + are converted into paths and returned along with their error messages. +* Network and other IO errors are raised as exceptions. + +If multi-object delete is disabled (or the list of size 1) +* A single `DELETE` call is issued +* Any `AccessDeniedException` raised is converted to a result in the error list. +* Any 404 response from a (non-AWS) store will be ignored. +* Network and other IO errors are raised as exceptions. + +Because there are no probes to ensure the call does not overwrite a directory, +or to see if a parentDirectory marker needs to be created, +this API is still faster than issuing a normal `FileSystem.delete(path)` call. + +That is: all the overhead normally undertaken to preserve the Posix System model are omitted. + + +### S3 Scalability and Performance + +Every entry in a bulk delete request counts as one write operation +against AWS S3 storage. +With the default write rate under a prefix on AWS S3 Standard storage +restricted to 3,500 writes/second, it is very easy to overload +the store by issuing a few bulk delete requests simultaneously. + +* If throttling is triggered then all clients interacting with + the store may observe performance issues. +* The write quota applies even for paths which do not exist. +* The S3A client *may* perform rate throttling as well as page size limiting. + +What does that mean? it means that attempting to issue multiple +bulk delete calls in parallel can be counterproductive. + +When overloaded, the S3 store returns a 403 throttle response. +This will trigger it back off and retry of posting the request. +However, the repeated request will still include the same number of objects and +*so generate the same load*. + +This can lead to a pathological situation where the repeated requests will +never be satisfied because the request itself is sufficient to overload the store. +See [HADOOP-16823.Large DeleteObject requests are their own Thundering Herd](https://issues.apache.org/jira/browse/HADOOP-16823) +for an example of where this did actually surface in production. + +This is why the default page size of S3A clients is 250 paths, not the store limit of 1000 entries. +It is also why the S3A delete/rename Operations do not attempt to do massive parallel deletions, +Instead bulk delete requests are queued for a single blocking thread to issue. +Consider a similar design. + + +When working with versioned S3 buckets, every path deleted will add a tombstone marker +to the store at that location, even if there was no object at that path. +While this has no negative performance impact on the bulk delete call, +it will slow down list requests subsequently made against that path. +That is: bulk delete requests of paths which do not exist will hurt future queries. + +Avoid this. Note also that TPC-DS Benchmark do not create the right load to make the +performance problems observable -but they can surface in production. +* Configure buckets to have a limited number of days for tombstones to be preserved. +* Do not delete which you know do not contain objects. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java index c029364743cce..da4f52a8f11a0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java @@ -104,6 +104,7 @@ Map.Entry deleteObjects(DeleteObjectsRequest de * Retry policy: retry untranslated; delete considered idempotent. * 404 errors other than bucket not found are swallowed; * this can be raised by third party stores (GCS). + *

* If an exception is caught and swallowed, the response will be empty; * otherwise it will be the response from the delete operation. * @param request request to make diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java index 61e666cacbfef..7cae4c81f198a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java @@ -19,19 +19,20 @@ package org.apache.hadoop.fs.s3a.impl; import java.io.IOException; -import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; -import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import org.apache.hadoop.fs.BulkDelete; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.store.audit.AuditSpan; +import org.apache.hadoop.util.functional.Tuples; +import static java.util.Collections.emptyList; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; import static org.apache.hadoop.util.Preconditions.checkArgument; /** @@ -45,7 +46,8 @@ public class BulkDeleteOperation extends AbstractStoreOperation implements BulkD private final int pageSize; - public BulkDeleteOperation(final StoreContext storeContext, + public BulkDeleteOperation( + final StoreContext storeContext, final BulkDeleteOperationCallbacks callbacks, final Path basePath, final int pageSize, @@ -68,30 +70,32 @@ public Path basePath() { } @Override - public BulkDeleteOutcome bulkDelete(final List paths) + public List> bulkDelete(final List paths) throws IOException, IllegalArgumentException { requireNonNull(paths); checkArgument(paths.size() <= pageSize, "Number of paths (%d) is larger than the page size (%d)", paths.size(), pageSize); + final StoreContext context = getStoreContext(); final List objects = paths.stream().map(p -> { checkArgument(p.isAbsolute(), "Path %s is not absolute", p); - final String k = getStoreContext().pathToKey(p); + final String k = context.pathToKey(p); return ObjectIdentifier.builder().key(k).build(); - }).collect(Collectors.toList()); + }).collect(toList()); - final List errors = callbacks.bulkDelete(objects); + final List> errors = callbacks.bulkDelete(objects); if (!errors.isEmpty()) { - final List outcomeElements = errors + final List> outcomeElements = errors .stream() - .map(error -> new BulkDeleteOutcomeElement( - getStoreContext().keyToPath(error), error, - null)) - .collect(Collectors.toList()); - return new BulkDeleteOutcome(outcomeElements); + .map(error -> Tuples.pair( + context.keyToPath(error.getKey()), + error.getValue() + )) + .collect(toList()); + return outcomeElements; } - return new BulkDeleteOutcome(Collections.emptyList()); + return emptyList(); } @Override @@ -99,20 +103,20 @@ public void close() throws IOException { } + /** + * Callbacks for the bulk delete operation. + */ public interface BulkDeleteOperationCallbacks { /** - * Attempt a bulk delete operation. + * Perform a bulk delete operation. * @param keys key list - * @return - * @throws MultiObjectDeleteException one or more of the keys could not - * be deleted in a multiple object delete operation. - * @throws AwsServiceException amazon-layer failure. - * @throws IOException other IO Exception. + * @return paths which failed to delete (if any). + * @throws IOException IO Exception. * @throws IllegalArgumentException illegal arguments */ @Retries.RetryTranslated - List bulkDelete(final List keys) - throws MultiObjectDeleteException, IOException, IllegalArgumentException; + List> bulkDelete(final List keys) + throws IOException, IllegalArgumentException; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java index 4c4c1dfe44aff..8cc02dca19848 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java @@ -18,9 +18,12 @@ package org.apache.hadoop.fs.s3a.impl; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.AccessDeniedException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; @@ -30,9 +33,13 @@ import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.s3a.S3AStore; import org.apache.hadoop.fs.store.audit.AuditSpan; +import org.apache.hadoop.util.functional.Tuples; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.apache.hadoop.fs.s3a.Invoker.once; import static org.apache.hadoop.util.Preconditions.checkArgument; +import static org.apache.hadoop.util.functional.Tuples.pair; /** * Callbacks for the bulk delete operation. @@ -40,13 +47,20 @@ public class BulkDeleteOperationCallbacksImpl implements BulkDeleteOperation.BulkDeleteOperationCallbacks { + /** + * Path for logging. + */ private final String path; + /** Page size for bulk delete. */ private final int pageSize; /** span for operations. */ private final AuditSpan span; + /** + * Store. + */ private final S3AStore store; @@ -60,21 +74,18 @@ public BulkDeleteOperationCallbacksImpl(final S3AStore store, @Override @Retries.RetryTranslated - public List bulkDelete(final List keysToDelete) - throws MultiObjectDeleteException, IOException, IllegalArgumentException { + public List> bulkDelete(final List keysToDelete) + throws IOException, IllegalArgumentException { span.activate(); final int size = keysToDelete.size(); checkArgument(size <= pageSize, "Too many paths to delete in one operation: %s", size); if (size == 0) { - return Collections.emptyList(); + return emptyList(); } + if (size == 1) { - store.deleteObject(store.getRequestFactory() - .newDeleteObjectRequestBuilder(keysToDelete.get(0).key()) - .build()); - // no failures, so return an empty list - return Collections.emptyList(); + return deleteSingleObject(keysToDelete.get(0).key()); } final DeleteObjectsResponse response = once("bulkDelete", path, () -> @@ -84,11 +95,31 @@ public List bulkDelete(final List keysToDelete) final List errors = response.errors(); if (errors.isEmpty()) { // all good. - return Collections.emptyList(); + return emptyList(); } else { return errors.stream() - .map(S3Error::key) + .map(e -> pair(e.key(), e.message())) .collect(Collectors.toList()); } } + + /** + * Delete a single object. + * @param key key to delete + * @return list of keys which failed to delete: length 0 or 1. + * @throws IOException IO problem other than AccessDeniedException + */ + @Retries.RetryTranslated + private List> deleteSingleObject(final String key) throws IOException { + try { + once("bulkDelete", path, () -> + store.deleteObject(store.getRequestFactory() + .newDeleteObjectRequestBuilder(key) + .build())); + } catch (AccessDeniedException e) { + return singletonList(pair(key, e.toString())); + } + return emptyList(); + + } }