Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,7 @@ protected RequestFactory createRequestFactory() {
.withRequestPreparer(getAuditManager()::requestCreated)
.withContentEncoding(contentEncoding)
.withStorageClass(storageClass)
.withMultipartEnabled(isMultipartEnabled)
.build();
}

Expand Down Expand Up @@ -1839,7 +1840,7 @@ private FSDataOutputStream innerCreateFile(

if(!checkDiskBuffer(getConf())){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just add a method validateOutputStreamConfiguration() and throw exception in the implementation only.

Copy link
Contributor

@mukund-thakur mukund-thakur Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just add a method validateOutputStreamConfiguration() and throw exception in the implementation only.

This is still pending. I don't really mind leaving it as it is but I think my suggestion is consistent with other parts of the code and is more readable.
CC @steveloughran

throw new IOException("Unable to create OutputStream with the given"
+ "multipart upload and buffer configuration.");
+ " multipart upload and buffer configuration.");
}

final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
Expand Down Expand Up @@ -5424,4 +5425,8 @@ public RequestFactory getRequestFactory() {
public boolean isCSEEnabled() {
return isCSEEnabled;
}

public boolean isMultipartEnabled() {
return isMultipartEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -199,7 +200,7 @@ AbortMultipartUploadRequest newAbortMultipartUploadRequest(
*/
InitiateMultipartUploadRequest newMultipartUploadRequest(
String destKey,
@Nullable PutObjectOptions options);
@Nullable PutObjectOptions options) throws IOException;

/**
* Complete a multipart upload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ protected AbstractS3ACommitter(
LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
role, jobName(context), jobIdString(context), outputPath);
S3AFileSystem fs = getDestS3AFS();
if (!fs.isMultipartEnabled()) {
throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem,"
+ " the committer can't proceed.");
}
// set this thread's context with the job ID.
// audit spans created in this thread will pick
// up this value., including the commit operations instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public class RequestFactoryImpl implements RequestFactory {
*/
private final StorageClass storageClass;

/**
* Is Multipart Enabled
*/
private final boolean isMultipartEnabled;

/**
* Constructor.
* @param builder builder with all the configuration.
Expand All @@ -137,6 +142,7 @@ protected RequestFactoryImpl(
this.requestPreparer = builder.requestPreparer;
this.contentEncoding = builder.contentEncoding;
this.storageClass = builder.storageClass;
this.isMultipartEnabled = builder.isMultipartEnabled;
}

/**
Expand Down Expand Up @@ -460,7 +466,10 @@ public AbortMultipartUploadRequest newAbortMultipartUploadRequest(
@Override
public InitiateMultipartUploadRequest newMultipartUploadRequest(
final String destKey,
@Nullable final PutObjectOptions options) {
@Nullable final PutObjectOptions options) throws IOException {
if(!isMultipartEnabled){
throw new IOException("Multipart uploads are disabled on the given filesystem.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make a PathIOException and include destkey. This gives a bit more detail.

throw new PathIOException(destKey, "Multipart uploads are disabled");

}
final ObjectMetadata objectMetadata = newObjectMetadata(-1);
maybeSetMetadata(options, objectMetadata);
final InitiateMultipartUploadRequest initiateMPURequest =
Expand Down Expand Up @@ -682,6 +691,11 @@ public static final class RequestFactoryBuilder {
*/
private PrepareRequest requestPreparer;

/**
* Is Multipart Enabled on the path.
*/
private boolean isMultipartEnabled = true;

private RequestFactoryBuilder() {
}

Expand Down Expand Up @@ -767,6 +781,18 @@ public RequestFactoryBuilder withRequestPreparer(
this.requestPreparer = value;
return this;
}

/**
* Multipart enabled
*
* @param value new value
* @return the builder
*/
public RequestFactoryBuilder withMultipartEnabled(
final boolean value) {
this.isMultipartEnabled = value;
return this;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.commit.magic;

import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;

import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;

public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase {

@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
conf.set(FS_S3A_COMMITTER_NAME, CommitConstants.COMMITTER_NAME_MAGIC);
return conf;
}

@Test
public void testCreateCommitter() {
TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
new TaskAttemptID());
Path commitPath = getFileSystem().makeQualified(
new Path(getContract().getTestPath(), "/testpath"));
LOG.debug("{}", commitPath);
assertThrows(PathCommitException.class,
() -> new MagicS3GuardCommitter(commitPath, tContext));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.commit.staging.integration;

import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;

import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;

public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
conf.set(FS_S3A_COMMITTER_NAME, InternalCommitterConstants.COMMITTER_NAME_STAGING);
return conf;
}

@Test
public void testCreateCommitter() {
TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
new TaskAttemptID());
Path commitPath = getFileSystem().makeQualified(
new Path(getContract().getTestPath(), "/testpath"));
LOG.debug("{}", commitPath);
assertThrows(PathCommitException.class,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same intercept.

() -> new StagingCommitter(commitPath, tContext));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -173,7 +174,11 @@ private void createFactoryObjects(RequestFactory factory) {
a(factory.newListObjectsV1Request(path, "/", 1));
a(factory.newListNextBatchOfObjectsRequest(new ObjectListing()));
a(factory.newListObjectsV2Request(path, "/", 1));
a(factory.newMultipartUploadRequest(path, null));
try {
a(factory.newMultipartUploadRequest(path, null));
} catch (IOException e) {
throw new RuntimeException(e);
}
File srcfile = new File("/tmp/a");
a(factory.newPutObjectRequest(path,
factory.newObjectMetadata(-1), null, srcfile));
Expand Down