Skip to content

Commit 1774c5b

Browse files
committed
HADOOP-18679. Add API for bulk/paged object deletion
A more minimal design that is easier to use and implement. Caller creates a BulkOperation; they get the page size of it and then submit batches to delete of less than that size. The outcome of each call contains a list of failures. S3A implementation to show how straightforward it is. Even with the single entry page size, it is still more efficient to use this as it doesn't try to recreate a parent dir or perform any probes to see if it is a directory: it maps straight to a DELETE call. Change-Id: Ibe8737e7933fe03d39070e7138a710b50c3d60c2
1 parent af5c8e7 commit 1774c5b

6 files changed

Lines changed: 379 additions & 7 deletions

File tree

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs;
20+
21+
import java.io.Closeable;
22+
import java.io.IOException;
23+
import java.util.List;
24+
25+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
26+
27+
/**
28+
* API for bulk deletion of objects/files,
29+
* <i>but not directories</i>.
30+
* After use, call {@code close()} to release any resources and
31+
* to guarantee store IOStatistics are updated.
32+
*/
33+
public interface BulkDelete extends IOStatisticsSource, Closeable {
34+
35+
/**
36+
* The maximum number of objects/files to delete in a single request.
37+
* @return a number greater than or equal to zero.
38+
*/
39+
int pageSize();
40+
41+
/**
42+
* Base path of a bulk delete operation.
43+
* All paths submitted in {@link #bulkDelete(List)} must be under this path.
44+
*/
45+
Path basePath();
46+
47+
/**
48+
* Delete a list of files/objects.
49+
* <ul>
50+
* <li>Files must be under the path provided in {@link #basePath()}.</li>
51+
* <li>The size of the list must be equal to or less than the page size
52+
* declared in {@link #pageSize()}.</li>
53+
* <li>Directories are not supported; the outcome of attempting to delete
54+
* directories is undefined (ignored; undetected, listed as failures...).</li>
55+
* <li>The operation is not atomic.</li>
56+
* <li>The operation is treated as idempotent: network failures may
57+
* trigger resubmission of the request -any new objects created under a
58+
* path in the list may then be deleted.</li>
59+
* <li>There is no guarantee that any parent directories exist after this call.
60+
* </li>
61+
* </ul>
62+
* @param paths list of paths which must be absolute and under the base path.
63+
* provided in {@link #basePath()}.
64+
* @throws IOException IO problems including networking, authentication and more.
65+
* @throws IllegalArgumentException if a path argument is invalid.
66+
*/
67+
BulkDeleteOutcome bulkDelete(List<Path> paths)
68+
throws IOException, IllegalArgumentException;
69+
70+
/**
71+
* The outcome: a list of paths which failed to delete.
72+
* An empty list means all files were successfully deleted.
73+
* There are no guarantees about the ordering of the list.
74+
* Reasons for failures are not provided.
75+
* File Not Found is not a failure.
76+
*/
77+
class BulkDeleteOutcome {
78+
79+
/**
80+
* List of paths which failed to delete.
81+
*/
82+
private final List<BulkDeleteOutcomeElement> failures;
83+
84+
/**
85+
* Constructor.
86+
* @param failures list of failures. This must be non-null.
87+
*/
88+
public BulkDeleteOutcome(final List<BulkDeleteOutcomeElement> failures) {
89+
this.failures = failures;
90+
}
91+
92+
/**
93+
* Get the list of failures.
94+
* @return a possibly empty list of failures.
95+
*/
96+
public List<BulkDeleteOutcomeElement> getFailures() {
97+
return failures;
98+
}
99+
}
100+
101+
class BulkDeleteOutcomeElement {
102+
private final Path path;
103+
private final String error;
104+
private final Exception exception;
105+
106+
public BulkDeleteOutcomeElement(
107+
final Path path,
108+
final String error,
109+
final Exception exception) {
110+
this.path = path;
111+
this.error = error;
112+
this.exception = exception;
113+
}
114+
115+
public Path getPath() {
116+
return path;
117+
}
118+
119+
public String getError() {
120+
return error;
121+
}
122+
123+
public Exception getException() {
124+
return exception;
125+
}
126+
}
127+
128+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs;
20+
21+
import java.io.IOException;
22+
23+
/**
24+
* Interface for bulk deletion.
25+
* Filesystems which support bulk deletion should implement this interface
26+
* and MUST also declare their support in the path capability
27+
* {@link CommonPathCapabilities#BULK_DELETE}.
28+
* Exporting the interface does not guarantee that the operation is supported;
29+
* returning a {@link BulkDelete} object from the call {@link #createBulkDelete(Path)}
30+
* is.
31+
*/
32+
public interface BulkDeleteSource {
33+
34+
/**
35+
* Create a bulk delete operation.
36+
* There is no network IO at this point, simply the creation of
37+
* a bulk delete object.
38+
* A path must be supplied to ensure that on viewfs and similar filesystems,
39+
* @param path path to delete under.
40+
* @return the bulk delete.
41+
* @throws UnsupportedOperationException the filesystem does not support delete under that path.
42+
* @throws IllegalArgumentException path not valid.
43+
* @throws IOException problems resolving paths
44+
*/
45+
default BulkDelete createBulkDelete(Path path)
46+
throws UnsupportedOperationException, IllegalArgumentException, IOException {
47+
throw new UnsupportedOperationException("Bulk delete not supported");
48+
}
49+
50+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,4 +181,10 @@ private CommonPathCapabilities() {
181181
*/
182182
public static final String DIRECTORY_LISTING_INCONSISTENT =
183183
"fs.capability.directory.listing.inconsistent";
184+
185+
/**
186+
* Capability string to probe for bulk delete: {@value}.
187+
*/
188+
public static final String BULK_DELETE = "fs.capability.bulk.delete";
189+
184190
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@
105105
import org.apache.hadoop.classification.InterfaceStability;
106106
import org.apache.hadoop.classification.VisibleForTesting;
107107
import org.apache.hadoop.conf.Configuration;
108+
import org.apache.hadoop.fs.BulkDelete;
109+
import org.apache.hadoop.fs.BulkDeleteSource;
108110
import org.apache.hadoop.fs.CommonPathCapabilities;
109111
import org.apache.hadoop.fs.ContentSummary;
110112
import org.apache.hadoop.fs.CreateFlag;
@@ -121,6 +123,7 @@
121123
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
122124
import org.apache.hadoop.fs.s3a.impl.AWSCannedACL;
123125
import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
126+
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation;
124127
import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
125128
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
126129
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
@@ -284,7 +287,8 @@
284287
@InterfaceStability.Evolving
285288
public class S3AFileSystem extends FileSystem implements StreamCapabilities,
286289
AWSPolicyProvider, DelegationTokenProvider, IOStatisticsSource,
287-
AuditSpanSource<AuditSpanS3A>, ActiveThreadSpanSource<AuditSpanS3A> {
290+
AuditSpanSource<AuditSpanS3A>, ActiveThreadSpanSource<AuditSpanS3A>,
291+
BulkDeleteSource {
288292

289293
/**
290294
* Default blocksize as used in blocksize and FS status queries.
@@ -3406,6 +3410,13 @@ private void removeKeysS3(
34063410
// exit fast if there are no keys to delete
34073411
return;
34083412
}
3413+
if (keysToDelete.size() == 1) {
3414+
// single object is a single delete call.
3415+
// this is more informative in server logs and may be more efficient..
3416+
deleteObject(keysToDelete.get(0).key());
3417+
noteDeleted(1, deleteFakeDir);
3418+
return;
3419+
}
34093420
for (ObjectIdentifier objectIdentifier : keysToDelete) {
34103421
blockRootDelete(objectIdentifier.key());
34113422
}
@@ -5476,7 +5487,11 @@ public boolean hasPathCapability(final Path path, final String capability)
54765487
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
54775488
return true;
54785489

5479-
// multi object delete flag
5490+
// this is always true, even if multi object
5491+
// delete is disabled -the page size is simply reduced to 1.
5492+
case CommonPathCapabilities.BULK_DELETE:
5493+
return true;
5494+
54805495
case ENABLE_MULTI_DELETE:
54815496
return enableMultiObjectsDelete;
54825497

@@ -5851,4 +5866,50 @@ public boolean isMultipartUploadEnabled() {
58515866
return isMultipartUploadEnabled;
58525867
}
58535868

5869+
@Override
5870+
public BulkDelete createBulkDelete(final Path path)
5871+
throws IllegalArgumentException, IOException {
5872+
5873+
final Path p = makeQualified(path);
5874+
final AuditSpanS3A span = createSpan("bulkdelete", p.toString(), null);
5875+
return new BulkDeleteOperation(
5876+
createStoreContext(),
5877+
createBulkDeleteCallbacks(span),
5878+
p,
5879+
enableMultiObjectsDelete ? 1: pageSize,
5880+
span);
5881+
}
5882+
5883+
/**
5884+
* Override point for mocking.
5885+
* @param span span for operations.
5886+
* @return an instance of the Bulk Delette callbacks.
5887+
*/
5888+
protected BulkDeleteOperation.BulkDeleteOperationCallbacks createBulkDeleteCallbacks(
5889+
final AuditSpanS3A span) {
5890+
return new BulkDeleteOperationCallbacksImpl(span);
5891+
}
5892+
5893+
/**
5894+
* Callbacks for the bulk delete operation.
5895+
*/
5896+
protected class BulkDeleteOperationCallbacksImpl implements
5897+
BulkDeleteOperation.BulkDeleteOperationCallbacks {
5898+
5899+
/** span for operations. */
5900+
private final AuditSpan span;
5901+
5902+
protected BulkDeleteOperationCallbacksImpl(AuditSpan span) {
5903+
this.span = span;
5904+
}
5905+
5906+
@Override
5907+
@Retries.RetryTranslated
5908+
public void bulkDelete(final List<ObjectIdentifier> keys)
5909+
throws MultiObjectDeleteException, IOException, IllegalArgumentException {
5910+
span.activate();
5911+
once("bulkDelete", "", () ->
5912+
S3AFileSystem.this.removeKeys(keys, false));
5913+
}
5914+
}
58545915
}

0 commit comments

Comments
 (0)