Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hadoop-common-project/hadoop-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,12 @@
<artifactId>lz4-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface BulkDelete extends IOStatisticsSource, Closeable {

/**
* The maximum number of objects/files to delete in a single request.
* @return a number greater than or equal to zero.
* @return a number greater than zero.
*/
int pageSize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ public final class StoreStatisticNames {
public static final String STORE_IO_RETRY
= "store_io_retry";

public static final String STORE_IO_RATE_LIMITED_DURATION
= "store_io_rate_limited_duration";

/**
* A store's equivalent of a paged LIST request was initiated: {@value}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.contract;

import org.apache.hadoop.fs.*;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

public abstract class AbstractContractBulkDeleteTest extends AbstractFSContractTestBase {

private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractBulkDeleteTest.class);

protected int pageSize;

protected Path basePath;

protected FileSystem fs;

@Before
public void setUp() throws Exception {
fs = getFileSystem();
basePath = path(getClass().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use methodPath() if it is there. I'm going to have to look at this because it's creating race condition issued with abfs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is under setup and the path will be created under the contract test directory. so cleanup should work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

problem is that abfs is running test methods in parallel, and unless path() invokes methodPath() the methods all work with the same basePath, so can cleanup while other methods are active

pageSize = FileUtil.bulkDeletePageSize(getFileSystem(), basePath);
fs.mkdirs(basePath);
}

public Path getBasePath() {
return basePath;
}

/**
* Validate the page size for bulk delete operation. Different stores can have different
* implementations for bulk delete operation thus different page size.
*/
@Test
public void validatePageSize() throws Exception {
Assertions.assertThat(pageSize)
.describedAs("Page size should be 1 by default for all stores")
.isEqualTo(1);
}

@Test
public void testPathsSizeEqualsPageSizePrecondition() throws Exception {
List<Path> listOfPaths = createListOfPaths(pageSize, basePath);
// Bulk delete call should pass with no exception.
FileUtil.bulkDelete(getFileSystem(), basePath, listOfPaths);
}

@Test
public void testPathsSizeGreaterThanPageSizePrecondition() throws Exception {
List<Path> listOfPaths = createListOfPaths(pageSize + 1, basePath);
intercept(IllegalArgumentException.class,
() -> FileUtil.bulkDelete(getFileSystem(), basePath, listOfPaths));
}

@Test
public void testPathsSizeLessThanPageSizePrecondition() throws Exception {
List<Path> listOfPaths = createListOfPaths(pageSize - 1, basePath);
// Bulk delete call should pass with no exception.
FileUtil.bulkDelete(getFileSystem(), basePath, listOfPaths);
}

@Test
public void testBulkDeleteSuccessful() throws Exception {
List<Path> listOfPaths = createListOfPaths(pageSize, basePath);
for (Path path : listOfPaths) {
touch(fs, path);
}
FileStatus[] fileStatuses = fs.listStatus(basePath);
Assertions.assertThat(fileStatuses)
.describedAs("File count after create")
.hasSize(pageSize);
assertSuccessfulBulkDelete(FileUtil.bulkDelete(getFileSystem(), basePath, listOfPaths));
FileStatus[] fileStatusesAfterDelete = fs.listStatus(basePath);
Assertions.assertThat(fileStatusesAfterDelete)
.describedAs("File statuses should be empty after delete")
.isEmpty();
}

@Test
public void validatePathCapabilityDeclared() throws Exception {
Assertions.assertThat(fs.hasPathCapability(basePath, CommonPathCapabilities.BULK_DELETE))
.describedAs("Path capability BULK_DELETE should be declared")
.isTrue();
}

@Test
public void testDeletePathsNotUnderBase() throws Exception {
List<Path> paths = new ArrayList<>();
Path pathNotUnderBase = path("not-under-base");
paths.add(pathNotUnderBase);
// Should fail as path is not under the base path.
intercept(IllegalArgumentException.class,
() -> FileUtil.bulkDelete(getFileSystem(), basePath, paths));
}

@Test
public void testDeletePathsNotExists() throws Exception {
List<Path> paths = new ArrayList<>();
Path pathNotExists = new Path(basePath, "not-exists");
paths.add(pathNotExists);
// bulk delete call doesn't verify if a path exist or not before deleting.
assertSuccessfulBulkDelete(FileUtil.bulkDelete(getFileSystem(), basePath, paths));
}

@Test
public void testDeletePathsDirectory() throws Exception {
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
fs.mkdirs(dirPath);
paths.add(dirPath);
Path filePath = new Path(dirPath, "file");
touch(fs, filePath);
paths.add(filePath);
// Outcome is undefined. But call shouldn't fail. In case of S3 directories will still be present.
assertSuccessfulBulkDelete(FileUtil.bulkDelete(getFileSystem(), basePath, paths));
}

@Test
public void testDeleteEmptyDirectory() throws Exception {
List<Path> paths = new ArrayList<>();
Path emptyDirPath = new Path(basePath, "empty-dir");
fs.mkdirs(emptyDirPath);
paths.add(emptyDirPath);
// Should pass as empty directory.
assertSuccessfulBulkDelete(FileUtil.bulkDelete(getFileSystem(), basePath, paths));
}

@Test
public void testDeleteEmptyList() throws Exception {
List<Path> paths = new ArrayList<>();
// Empty list should pass.
assertSuccessfulBulkDelete(FileUtil.bulkDelete(getFileSystem(), basePath, paths));
}

@Test
public void testDeleteSamePathsMoreThanOnce() throws Exception {
List<Path> paths = new ArrayList<>();
Path path = new Path(basePath, "file");
touch(fs, path);
paths.add(path);
paths.add(path);
Path another = new Path(basePath, "another-file");
touch(fs, another);
paths.add(another);
assertSuccessfulBulkDelete(FileUtil.bulkDelete(getFileSystem(), basePath, paths));
}

/**
* This test validates that files to be deleted don't have
* to be direct children of the base path.
*/
@Test
public void testDeepDirectoryFilesDelete() throws Exception {
List<Path> paths = new ArrayList<>();
Path dir1 = new Path(basePath, "dir1");
Path dir2 = new Path(dir1, "dir2");
Path dir3 = new Path(dir2, "dir3");
fs.mkdirs(dir3);
Path file1 = new Path(dir3, "file1");
touch(fs, file1);
paths.add(file1);
assertSuccessfulBulkDelete(FileUtil.bulkDelete(getFileSystem(), basePath, paths));
}


@Test
public void testChildPaths() throws Exception {
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
fs.mkdirs(dirPath);
paths.add(dirPath);
Path filePath = new Path(dirPath, "file");
touch(fs, filePath);
paths.add(filePath);
// Should pass as both paths are under the base path.
assertSuccessfulBulkDelete(FileUtil.bulkDelete(getFileSystem(), basePath, paths));
}


public static void assertSuccessfulBulkDelete(List<Map.Entry<Path, String>> entries) {
Assertions.assertThat(entries)
.describedAs("return entries should be empty after successful delete")
.isEmpty();
}

private List<Path> createListOfPaths(int count, Path basePath) {
List<Path> paths = new ArrayList<>();
for (int i=0; i < count; i++) {
Path path = new Path(basePath, "file-" + i);
paths.add(path);
}
return paths;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1641,4 +1641,7 @@ private Constants() {
*/
public static final String AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED =
"fs.s3a.access.grants.fallback.to.iam";
public static final int DEFAULT_S3A_IO_RATE_LIMIT = 0;

public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

import org.apache.hadoop.util.*;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -166,10 +167,6 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -207,12 +204,8 @@
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.fs.store.EtagChecksum;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.CallableRaisingIOE;

Expand Down Expand Up @@ -760,6 +753,7 @@ public void initialize(URI name, Configuration originalConf)
LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal);
s3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false);

int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
// now create the store
store = new S3AStoreBuilder()
.withS3Client(s3Client)
Expand All @@ -770,7 +764,7 @@ public void initialize(URI name, Configuration originalConf)
.withStatisticsContext(statisticsContext)
.withStorageStatistics(getStorageStatistics())
.withReadRateLimiter(unlimitedRate())
.withWriteRateLimiter(unlimitedRate())
.withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
.build();

// The filesystem is now ready to perform operations against
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,10 @@ public enum Statistic {
"retried requests made of the remote store",
TYPE_COUNTER),

STORE_IO_RATE_LIMITED(StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION,
"Duration of rate limited operations",
TYPE_DURATION),

STORE_IO_THROTTLED(
StoreStatisticNames.STORE_IO_THROTTLED,
"Requests throttled and retried",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ public List<Map.Entry<Path, String>> bulkDelete(final List<Path> paths)
requireNonNull(paths);
checkArgument(paths.size() <= pageSize,
"Number of paths (%d) is larger than the page size (%d)", paths.size(), pageSize);

final StoreContext context = getStoreContext();
final List<ObjectIdentifier> objects = paths.stream().map(p -> {
checkArgument(p.isAbsolute(), "Path %s is not absolute", p);
checkArgument(validatePathIsUnderParent(p),
"Path %s is not under the base path %s", p, basePath);
final String k = context.pathToKey(p);
return ObjectIdentifier.builder().key(k).build();
}).collect(toList());
Expand All @@ -98,6 +99,16 @@ public List<Map.Entry<Path, String>> bulkDelete(final List<Path> paths)
return emptyList();
}

private boolean validatePathIsUnderParent(Path p) {
while (p.getParent() != null) {
if (p.getParent().equals(basePath)) {
return true;
}
p = p.getParent();
}
return false;
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public List<Map.Entry<String, String>> bulkDelete(final List<ObjectIdentifier> k
return emptyList();
} else {
return errors.stream()
.map(e -> pair(e.key(), e.message()))
.map(e -> pair(e.key(), e.toString()))
.collect(Collectors.toList());
}
}
Expand Down
Loading