Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public void setup() throws Exception {

final FileSystem fs = getFileSystem();
Path testPath = getContract().getTestPath();
Assume.assumeTrue("Multipart uploader is not supported",
fs.hasPathCapability(testPath,
CommonPathCapabilities.FS_MULTIPART_UPLOADER));
uploader0 = fs.createMultipartUploader(testPath).build();
uploader1 = fs.createMultipartUploader(testPath).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,4 +1093,19 @@ private Constants() {
*/
public static final String AWS_S3_CENTRAL_REGION = "us-east-1";

/**
* S3 client-side encryption(CSE) method.
* <p>
* Value: {@value}
*/
public static final String CLIENT_SIDE_ENCRYPTION_METHOD =
"fs.s3a.cse.method";

/**
* Key ID for KMS S3 CSE method.
* <p>
* Value: {@value}
*/
public static final String CLIENT_SIDE_ENCRYPTION_KMS_KEY_ID =
"fs.s3a.cse.kms.key-id";
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@
import com.amazonaws.SdkClientException;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Builder;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.AmazonS3EncryptionClientV2Builder;
import com.amazonaws.services.s3.AmazonS3EncryptionV2;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.internal.ServiceUtils;
import com.amazonaws.services.s3.model.CryptoConfigurationV2;
import com.amazonaws.services.s3.model.CryptoMode;
import com.amazonaws.services.s3.model.CryptoRangeGetMode;
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider;
import com.amazonaws.util.AwsHostNameUtils;
import com.amazonaws.util.RuntimeHttpUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
Expand All @@ -43,9 +52,12 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move up to L44. Yes, it its a PITA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, missed it :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

losing battle here. I've got it wrong in some of the audit work and then it complicates backporting


import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
import static org.apache.hadoop.fs.s3a.Constants.CLIENT_SIDE_ENCRYPTION_KMS_KEY_ID;
import static org.apache.hadoop.fs.s3a.Constants.CLIENT_SIDE_ENCRYPTION_METHOD;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
Expand Down Expand Up @@ -112,15 +124,73 @@ public AmazonS3 createS3Client(
}

try {
return buildAmazonS3Client(
awsConf,
parameters);
if (conf.get(CLIENT_SIDE_ENCRYPTION_METHOD) == null) {
return buildAmazonS3Client(
awsConf,
parameters);
} else {
return buildAmazonS3EncryptionClient(
awsConf,
parameters);
}
} catch (SdkClientException e) {
// SDK refused to build.
throw translateException("creating AWS S3 client", uri.toString(), e);
}
}

/**
* Create an {@link AmazonS3} client of type
* {@link AmazonS3EncryptionV2} if CSE is enabled.
*
* @param awsConf AWS configuration.
* @param parameters parameters
*
* @return new AmazonS3 client.
*/
protected AmazonS3 buildAmazonS3EncryptionClient(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InconsistentS3Client doesn't do this; I can see tests skip it. Should we have that client throw some Unsupported Exception here

final ClientConfiguration awsConf,
final S3ClientCreationParameters parameters){

AmazonS3 client;
AmazonS3EncryptionClientV2Builder builder =
new AmazonS3EncryptionClientV2Builder();
Configuration conf = getConf();

//CSE-KMS Method
String kmsKeyId = conf.get(CLIENT_SIDE_ENCRYPTION_KMS_KEY_ID);
// Check if kmsKeyID is not null
Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method "
+ "requires KMS key ID. Use " + CLIENT_SIDE_ENCRYPTION_KMS_KEY_ID
+ " property to set it. ");

EncryptionMaterialsProvider materialsProvider =
new KMSEncryptionMaterialsProvider(kmsKeyId);
builder.withEncryptionMaterialsProvider(materialsProvider);
//Configure basic params of a S3 builder.
configureBasicParams(builder, awsConf, parameters);

// Configuring endpoint.
AmazonS3EncryptionClientV2Builder.EndpointConfiguration epr
= createEndpointConfiguration(parameters.getEndpoint(),
awsConf, getConf().getTrimmed(AWS_REGION));
configureEndpoint(builder, epr);

// Create cryptoConfig.
CryptoConfigurationV2 cryptoConfigurationV2 =
new CryptoConfigurationV2(CryptoMode.AuthenticatedEncryption)
.withRangeGetMode(CryptoRangeGetMode.ALL);
if (epr != null) {
cryptoConfigurationV2
.withAwsKmsRegion(RegionUtils.getRegion(epr.getSigningRegion()));
LOG.debug("KMS region used: {}", cryptoConfigurationV2.getAwsKmsRegion());
}
builder.withCryptoConfiguration(cryptoConfigurationV2);
client = builder.build();

return client;
}

/**
* Use the Builder API to create an AWS S3 client.
* <p>
Expand All @@ -137,41 +207,68 @@ protected AmazonS3 buildAmazonS3Client(
final ClientConfiguration awsConf,
final S3ClientCreationParameters parameters) {
AmazonS3ClientBuilder b = AmazonS3Client.builder();
b.withCredentials(parameters.getCredentialSet());
b.withClientConfiguration(awsConf);
b.withPathStyleAccessEnabled(parameters.isPathStyleAccess());
configureBasicParams(b, awsConf, parameters);

// endpoint set up is a PITA
AwsClientBuilder.EndpointConfiguration epr
= createEndpointConfiguration(parameters.getEndpoint(),
awsConf, getConf().getTrimmed(AWS_REGION));
configureEndpoint(b, epr);
final AmazonS3 client = b.build();
return client;
}

/**
* A method to configure basic AmazonS3Builder parameters.
*
* @param builder Instance of AmazonS3Builder used.
* @param awsConf ClientConfiguration used.
* @param parameters Parameters used to set in the builder.
*/
private void configureBasicParams(AmazonS3Builder builder,
ClientConfiguration awsConf, S3ClientCreationParameters parameters) {
builder.withCredentials(parameters.getCredentialSet());
builder.withClientConfiguration(awsConf);
builder.withPathStyleAccessEnabled(parameters.isPathStyleAccess());

if (parameters.getMetrics() != null) {
b.withMetricsCollector(
builder.withMetricsCollector(
new AwsStatisticsCollector(parameters.getMetrics()));
}
if (parameters.getRequestHandlers() != null) {
b.withRequestHandlers(
builder.withRequestHandlers(
parameters.getRequestHandlers().toArray(new RequestHandler2[0]));
}
if (parameters.getMonitoringListener() != null) {
b.withMonitoringListener(parameters.getMonitoringListener());
builder.withMonitoringListener(parameters.getMonitoringListener());
}

// endpoint set up is a PITA
AwsClientBuilder.EndpointConfiguration epr
= createEndpointConfiguration(parameters.getEndpoint(),
awsConf, getConf().getTrimmed(AWS_REGION));
}

/**
* A method to configure endpoint and Region for an AmazonS3Builder.
*
* @param builder Instance of AmazonS3Builder used.
* @param epr EndpointConfiguration used to set in builder.
*/
private void configureEndpoint(
AmazonS3Builder builder,
AmazonS3Builder.EndpointConfiguration epr) {
if (epr != null) {
// an endpoint binding was constructed: use it.
b.withEndpointConfiguration(epr);
builder.withEndpointConfiguration(epr);
} else {
// no idea what the endpoint is, so tell the SDK
// to work it out at the cost of an extra HEAD request
b.withForceGlobalBucketAccessEnabled(true);
builder.withForceGlobalBucketAccessEnabled(true);
// HADOOP-17771 force set the region so the build process doesn't halt.
String region = getConf().getTrimmed(AWS_REGION, AWS_S3_CENTRAL_REGION);
LOG.debug("fs.s3a.endpoint.region=\"{}\"", region);
if (!region.isEmpty()) {
// there's either an explicit region or we have fallen back
// to the central one.
LOG.debug("Using default endpoint; setting region to {}", region);
b.setRegion(region);
builder.setRegion(region);
} else {
// no region.
// allow this if people really want it; it is OK to rely on this
Expand All @@ -180,8 +277,6 @@ protected AmazonS3 buildAmazonS3Client(
LOG.debug(SDK_REGION_CHAIN_IN_USE);
}
}
final AmazonS3 client = b.build();
return client;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,18 @@
public class Listing extends AbstractStoreOperation {

private static final Logger LOG = S3AFileSystem.LOG;
private final boolean isCSEEnabled;

static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
new AcceptAllButS3nDirs();

private final ListingOperationCallbacks listingOperationCallbacks;

public Listing(ListingOperationCallbacks listingOperationCallbacks,
StoreContext storeContext) {
StoreContext storeContext, boolean isCSEEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move isCSEEnabled to storeContext?

super(storeContext);
this.listingOperationCallbacks = listingOperationCallbacks;
this.isCSEEnabled = isCSEEnabled;
}

/**
Expand Down Expand Up @@ -687,7 +689,7 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
S3AFileStatus status = createFileStatus(keyPath, summary,
listingOperationCallbacks.getDefaultBlockSize(keyPath),
getStoreContext().getUsername(),
summary.getETag(), null);
summary.getETag(), null, isCSEEnabled);
LOG.debug("Adding: {}", status);
stats.add(status);
added++;
Expand Down Expand Up @@ -961,7 +963,7 @@ public AcceptFilesOnly(Path qualifiedPath) {
public boolean accept(Path keyPath, S3ObjectSummary summary) {
return !keyPath.equals(qualifiedPath)
&& !summary.getKey().endsWith(S3N_FOLDER_SUFFIX)
&& !objectRepresentsDirectory(summary.getKey(), summary.getSize());
&& !objectRepresentsDirectory(summary.getKey());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ class S3ABlockOutputStream extends OutputStream implements
private static final LogExactlyOnce WARN_ON_SYNCABLE =
new LogExactlyOnce(LOG);

/** is client side encryption enabled? */
private final boolean isCSEEnabled;

/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
Expand Down Expand Up @@ -189,6 +192,7 @@ class S3ABlockOutputStream extends OutputStream implements
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
}
this.isCSEEnabled = builder.isCSEEnabled;
}

/**
Expand Down Expand Up @@ -307,29 +311,33 @@ public synchronized void write(byte[] source, int offset, int len)
// of capacity
// Trigger an upload then process the remainder.
LOG.debug("writing more data than block has capacity -triggering upload");
uploadCurrentBlock();
uploadCurrentBlock(false);
// tail recursion is mildly expensive, but given buffer sizes must be MB.
// it's unlikely to recurse very deeply.
this.write(source, offset + written, len - written);
} else {
if (remainingCapacity == 0) {
if (remainingCapacity == 0 && !isCSEEnabled) {
// the whole buffer is done, trigger an upload
uploadCurrentBlock();
uploadCurrentBlock(false);
}
}
}

/**
* Start an asynchronous upload of the current block.
*
* @param isLast true, if part being uploaded is last and client side
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and or 'or'??

* encryption is enabled.
* @throws IOException Problems opening the destination for upload,
* initializing the upload, or if a previous operation has failed.
* initializing the upload, or if a previous operation has failed.
*/
private synchronized void uploadCurrentBlock() throws IOException {
private synchronized void uploadCurrentBlock(boolean isLast)
throws IOException {
Preconditions.checkState(hasActiveBlock(), "No active block");
LOG.debug("Writing block # {}", blockCount);
initMultipartUpload();
try {
multiPartUpload.uploadBlockAsync(getActiveBlock());
multiPartUpload.uploadBlockAsync(getActiveBlock(), isLast);
bytesSubmitted += getActiveBlock().dataSize();
} finally {
// set the block to null, so the next write will create a new block.
Expand Down Expand Up @@ -389,8 +397,10 @@ public void close() throws IOException {
// PUT the final block
if (hasBlock &&
(block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) {
//send last part
uploadCurrentBlock();
// send last part and set the value of isLastPart to true in case of
// CSE being enabled, since we are sure it is last part as parts
// are being uploaded serially in CSE.
uploadCurrentBlock(isCSEEnabled);
}
// wait for the partial uploads to finish
final List<PartETag> partETags =
Expand Down Expand Up @@ -760,7 +770,8 @@ public void maybeRethrowUploadFailure() throws IOException {
* @throws IOException upload failure
* @throws PathIOException if too many blocks were written
*/
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
Boolean isLast)
throws IOException {
LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
Preconditions.checkNotNull(uploadId, "Null uploadId");
Expand All @@ -781,6 +792,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
uploadData.getUploadStream(),
uploadData.getFile(),
0L);
request.setLastPart(isLast);
} catch (SdkBaseException aws) {
// catch and translate
IOException e = translateException("upload", key, aws);
Expand Down Expand Up @@ -1042,6 +1054,9 @@ public static final class BlockOutputStreamBuilder {
/** Should Syncable calls be downgraded? */
private boolean downgradeSyncableExceptions;

/** is Client side Encryption enabled? */
private boolean isCSEEnabled;

private BlockOutputStreamBuilder() {
}

Expand Down Expand Up @@ -1157,5 +1172,15 @@ public BlockOutputStreamBuilder withDowngradeSyncableExceptions(
downgradeSyncableExceptions = value;
return this;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withCSEEnabled(boolean value) {
isCSEEnabled = value;
return this;
}
}
}
Loading