Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,7 @@ private Constants() {
* This can be set in the {code createFile()} builder.
* Value {@value}.
*/
public static final String FS_S3A_CREATE_IF_NONE_MATCH = "fs.s3a.create.header.If-None-Match";
public static final String FS_S3A_CONDITIONAL_FILE_CREATE = "fs.s3a.conditional.file.create";

/**
* Default value for create performance in an S3A FS.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM;
import static org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent.*;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
Expand Down Expand Up @@ -141,7 +142,11 @@ class S3ABlockOutputStream extends OutputStream implements
private static final String E_NOT_SYNCABLE =
"S3A streams are not Syncable. See HADOOP-17597.";

public static final String IF_NONE_MATCH_HEADER = "If-None-Match";
/**
* How long to wait for uploads to complete after being cancelled before
* the blocks themselves are closed: 15 seconds.
*/
private static final Duration TIME_TO_AWAIT_CANCEL_COMPLETION = Duration.ofSeconds(15);

/** Object being uploaded. */
private final String key;
Expand Down Expand Up @@ -686,7 +691,7 @@ private long putObject() throws IOException {
final S3ADataBlocks.DataBlock block = getActiveBlock();
final long size = block.dataSize();
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
PutObjectRequest putObjectRequest = uploadData.hasFile() ?
PutObjectRequest putObjectRequest =
writeOperationHelper.createPutObjectRequest(
key,
uploadData.getSize(),
Expand All @@ -696,32 +701,16 @@ private long putObject() throws IOException {
PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder();
Map<String, String> optionHeaders = builder.putOptions.getHeaders();

if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH_HEADER)) {
if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH)) {
maybeModifiedPutIfAbsentRequest.overrideConfiguration(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned below as well, think we should upgrade SDK and then use the new .ifNoneMatch().

Also I would recommend you move all of this logic into a new private method in this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I would move this logic to RequestFactoryImpl. But you will need to add in a flag to the method signature, "isConditonalPutEnabled", and only add the if-none-match header in when sConditonalPutEnabled is true.

Basically, you only want to add if-none-match when the PUT is coming from S3ABlockOutputStream.close() and not from anywhere else.

override -> override.putHeader(IF_NONE_MATCH_HEADER, optionHeaders.get(IF_NONE_MATCH_HEADER)));
override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH)));
}

final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build();

BlockUploadProgress progressCallback =
new BlockUploadProgress(block, progressListener, now());
statistics.blockUploadQueued(size);
ListenableFuture<PutObjectResponse> putObjectResult =
executorService.submit(() -> {
try {
// the putObject call automatically closes the input
// stream afterwards.
PutObjectResponse response =
writeOperationHelper.putObject(finalizedRequest, builder.putOptions, uploadData,
uploadData.hasFile(), statistics);
progressCallback.progressChanged(REQUEST_BYTE_TRANSFER_EVENT);
return response;
} finally {
cleanupWithLogger(LOG, uploadData, block);
}
});
clearActiveBlock();
//wait for completion
try {
progressCallback.progressChanged(PUT_STARTED_EVENT);
// the putObject call automatically closes the upload data
Expand Down Expand Up @@ -1411,6 +1400,11 @@ public static final class BlockOutputStreamBuilder {
*/
private boolean isMultipartUploadEnabled;

/**
* Is conditional create enables.
*/
private boolean isConditionalEnabled;

private BlockOutputStreamBuilder() {
}

Expand Down Expand Up @@ -1572,5 +1566,11 @@ public BlockOutputStreamBuilder withMultipartEnabled(
isMultipartUploadEnabled = value;
return this;
}

public BlockOutputStreamBuilder withConditionalEnabled(
final boolean value){
isConditionalEnabled = value;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,17 @@ CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder(

/**
* Complete a multipart upload.
*
* @param destKey destination object key
* @param uploadId ID of initiated upload
* @param partETags ordered list of etags
* @param destKey destination object key
* @param uploadId ID of initiated upload
* @param partETags ordered list of etags
* @param putOptions options for the request
* @return the request builder.
*/
CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder(
String destKey,
String uploadId,
List<CompletedPart> partETags, PutObjectOptions putOptions);
List<CompletedPart> partETags,
PutObjectOptions putOptions);

/**
* Create a HEAD object request builder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public interface AWSHeaders {
String DATE = "Date";
String ETAG = "ETag";
String LAST_MODIFIED = "Last-Modified";
String IF_NONE_MATCH = "If-None-Match";

/*
* Amazon HTTP Headers used by S3A.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ public class CreateFileBuilder extends
* Classic create file option set: overwriting.
*/
public static final CreateFileOptions OPTIONS_CREATE_FILE_OVERWRITE =
new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, null);
new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, false, null);

/**
* Classic create file option set: no overwrite.
*/
public static final CreateFileOptions OPTIONS_CREATE_FILE_NO_OVERWRITE =
new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, null);
new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, false, null);

/**
* Performance create options.
*/
public static final CreateFileOptions OPTIONS_CREATE_FILE_PERFORMANCE =
new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, null);
new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, false, null);

/**
* Callback interface.
Expand Down Expand Up @@ -144,10 +144,12 @@ public FSDataOutputStream build() throws IOException {

final boolean performance =
options.getBoolean(Constants.FS_S3A_CREATE_PERFORMANCE, false);
final boolean conditionalCreate =
options.getBoolean(Constants.FS_S3A_CONDITIONAL_FILE_CREATE, false);
return callbacks.createFileFromBuilder(
path,
getProgress(),
new CreateFileOptions(flags, isRecursive(), performance, headers));
new CreateFileOptions(flags, isRecursive(), performance, conditionalCreate, headers));

}

Expand Down Expand Up @@ -218,26 +220,33 @@ public static final class CreateFileOptions {
*/
private final boolean performance;

/**
* conditional flag.
*/
private final boolean conditionalCreate;

/**
* Headers; may be null.
*/
private final Map<String, String> headers;

/**
* @param flags creation flags
* @param recursive create parent dirs?
* @param flags creation flags
* @param recursive create parent dirs?
* @param performance performance flag
* @param
* @param headers nullable header map.
* @param conditionalCreate conditional flag
* @param headers nullable header map.
*/
public CreateFileOptions(
final EnumSet<CreateFlag> flags,
final boolean recursive,
final boolean performance,
final boolean conditionalCreate,
final Map<String, String> headers) {
this.flags = flags;
this.recursive = recursive;
this.performance = performance;
this.conditionalCreate = conditionalCreate;
this.headers = headers;
}

Expand All @@ -247,6 +256,7 @@ public String toString() {
"flags=" + flags +
", recursive=" + recursive +
", performance=" + performance +
", conditionalCreate=" + conditionalCreate +
", headers=" + headers +
'}';
}
Expand All @@ -263,6 +273,10 @@ public boolean isPerformance() {
return performance;
}

public boolean isConditionalCreate() {
return conditionalCreate;
}

public Map<String, String> getHeaders() {
return headers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2;
Expand Down Expand Up @@ -261,7 +261,7 @@ private InternalConstants() {
*/
public static final Set<String> CREATE_FILE_KEYS =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CREATE_IF_NONE_MATCH)));
new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CONDITIONAL_FILE_CREATE)));

/**
* Dynamic Path capabilities to be evaluated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -526,13 +526,11 @@ public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestB
CompleteMultipartUploadRequest.Builder requestBuilder;
Map<String, String> optionHeaders = putOptions.getHeaders();

if (optionHeaders != null && optionHeaders.containsKey("If-None-Match")) {
requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
.overrideConfiguration(override ->override.putHeader("If-None-Match", optionHeaders.get("If-None-Match")))
.multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
} else {
requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH)) {
requestBuilder = CompleteMultipartUploadRequest.builder().overrideConfiguration(
override ->override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH)));
}

return prepareRequest(requestBuilder);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.impl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.io.IOUtils;

import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import software.amazon.awssdk.services.s3.model.S3Exception;
Expand All @@ -18,20 +37,23 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE;
import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE;
import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;


public class ITestS3APutIfMatch extends AbstractS3ATestBase {
public class ITestS3APutIfMatch extends AbstractS3ACostTest {

private Configuration conf;

@Override
protected Configuration createConfiguration() {
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
removeBaseAndBucketOverrides(conf,
Expand All @@ -45,6 +67,14 @@ protected Configuration createConfiguration() {
return conf;
}

@Override
public void setup() throws Exception {
super.setup();
conf = createConfiguration();
skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_FILE_CREATE,
"Skipping IfNoneMatch tests");
}

protected String getBlockOutputBufferName() {
return FAST_UPLOAD_BUFFER_ARRAY;
}
Expand All @@ -61,7 +91,7 @@ private static void createFileWithIfNoneMatchFlag(FileSystem fs,
byte[] data,
String ifMatchTag) throws Exception {
FSDataOutputStreamBuilder builder = fs.createFile(path);
builder.must(FS_S3A_CREATE_IF_NONE_MATCH, ifMatchTag);
builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag);
FSDataOutputStream stream = builder.create().build();
if (data != null && data.length > 0) {
stream.write(data);
Expand All @@ -85,7 +115,7 @@ public void testPutIfAbsentConflict() throws IOException {
Assert.assertEquals(RemoteFileChangedException.class, e.getClass());

S3Exception s3Exception = (S3Exception) e.getCause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was gong to point you at GenericTestUtils.assertExceptionContains() but it doesn't validate the cause type or return the value as a cast type the way intercept() does. Time for a new assert there.

Assert.assertEquals(s3Exception.statusCode(), 412);
Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412);
}
}

Expand All @@ -95,7 +125,6 @@ public void testPutIfAbsentLargeFileConflict() throws IOException {
FileSystem fs = getFileSystem();
Path testFile = methodPath();

fs.mkdirs(testFile.getParent());
// enough bytes for Multipart Upload
byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are ways to do this with small files, this matters because a 6MB file is large enough that it'd have to become a scale test. Yes, it is a scale test when people are testing across the planet.

Do not worry about this right now, but be aware before merging into trunk the large file test will have to be moved to a subset of S3AScaleTestBase, and this test replaced with something using a write to a magic path –or even better, we add another new create file option to force multipart uploads always.

Designing for that makes me think that a followup to this should move to an enumset of CreateFile flags


Expand All @@ -107,7 +136,7 @@ public void testPutIfAbsentLargeFileConflict() throws IOException {

// Error gets caught here:
S3Exception s3Exception = (S3Exception) e.getCause();
Assert.assertEquals(s3Exception.statusCode(), 412);
Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412);
}
}
}
Loading