Skip to content

Commit aee975a

Browse files
mehakmeetsteveloughran
authored andcommitted
HADOOP-13887. Support S3 client side encryption (S3-CSE) using AWS-SDK (#2706)
This (big!) patch adds support for client side encryption in AWS S3, with keys managed by AWS-KMS. Read the documentation in encryption.md very, very carefully before use and consider it unstable. S3-CSE is enabled in the existing configuration option "fs.s3a.server-side-encryption-algorithm": fs.s3a.server-side-encryption-algorithm=CSE-KMS fs.s3a.server-side-encryption.key=<KMS_KEY_ID> You cannot enable CSE and SSE in the same client, although you can still enable a default SSE option in the S3 console. * Filesystem list/get status operations subtract 16 bytes from the length of all files >= 16 bytes long to compensate for the padding which CSE adds. * The SDK always warns about the specific algorithm chosen being deprecated. It is critical to use this algorithm for ranged GET requests to work (i.e. random IO). Ignore. * Unencrypted files CANNOT BE READ. The entire bucket SHOULD be encrypted with S3-CSE. * Uploading files may be a bit slower as blocks are now written sequentially. * The Multipart Upload API is disabled when S3-CSE is active. Contributed by Mehakmeet Singh Change-Id: Ie1a27a036a39db66a67e9c6d33bc78d54ea708a0
1 parent da011ba commit aee975a

35 files changed

Lines changed: 1370 additions & 115 deletions

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ public void setup() throws Exception {
8989

9090
final FileSystem fs = getFileSystem();
9191
Path testPath = getContract().getTestPath();
92+
Assume.assumeTrue("Multipart uploader is not supported",
93+
fs.hasPathCapability(testPath,
94+
CommonPathCapabilities.FS_MULTIPART_UPLOADER));
9295
uploader0 = fs.createMultipartUploader(testPath).build();
9396
uploader1 = fs.createMultipartUploader(testPath).build();
9497
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,12 @@ private Constants() {
420420
"fs.s3a.multipart.purge.age";
421421
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;
422422

423-
// s3 server-side encryption, see S3AEncryptionMethods for valid options
423+
/**
424+
* s3 server-side encryption or s3 client side encryption method, see
425+
* {@link S3AEncryptionMethods} for valid options.
426+
*
427+
* {@value}
428+
*/
424429
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
425430
"fs.s3a.server-side-encryption-algorithm";
426431

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

Lines changed: 117 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,23 @@
2525
import com.amazonaws.SdkClientException;
2626
import com.amazonaws.client.builder.AwsClientBuilder;
2727
import com.amazonaws.handlers.RequestHandler2;
28+
import com.amazonaws.regions.RegionUtils;
2829
import com.amazonaws.services.s3.AmazonS3;
30+
import com.amazonaws.services.s3.AmazonS3Builder;
2931
import com.amazonaws.services.s3.AmazonS3Client;
3032
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
33+
import com.amazonaws.services.s3.AmazonS3EncryptionClientV2Builder;
34+
import com.amazonaws.services.s3.AmazonS3EncryptionV2;
3135
import com.amazonaws.services.s3.S3ClientOptions;
3236
import com.amazonaws.services.s3.internal.ServiceUtils;
37+
import com.amazonaws.services.s3.model.CryptoConfigurationV2;
38+
import com.amazonaws.services.s3.model.CryptoMode;
39+
import com.amazonaws.services.s3.model.CryptoRangeGetMode;
40+
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
41+
import com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider;
3342
import com.amazonaws.util.AwsHostNameUtils;
3443
import com.amazonaws.util.RuntimeHttpUtils;
44+
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
3545
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
3646
import org.slf4j.Logger;
3747
import org.slf4j.LoggerFactory;
@@ -48,6 +58,8 @@
4858
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
4959
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
5060
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
61+
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
62+
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
5163
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
5264

5365
/**
@@ -112,15 +124,77 @@ public AmazonS3 createS3Client(
112124
}
113125

114126
try {
115-
return buildAmazonS3Client(
116-
awsConf,
117-
parameters);
127+
if (S3AEncryptionMethods.getMethod(S3AUtils.
128+
lookupPassword(conf, SERVER_SIDE_ENCRYPTION_ALGORITHM, null))
129+
.equals(S3AEncryptionMethods.CSE_KMS)) {
130+
return buildAmazonS3EncryptionClient(
131+
awsConf,
132+
parameters);
133+
} else {
134+
return buildAmazonS3Client(
135+
awsConf,
136+
parameters);
137+
}
118138
} catch (SdkClientException e) {
119139
// SDK refused to build.
120140
throw translateException("creating AWS S3 client", uri.toString(), e);
121141
}
122142
}
123143

144+
/**
145+
* Create an {@link AmazonS3} client of type
146+
* {@link AmazonS3EncryptionV2} if CSE is enabled.
147+
*
148+
* @param awsConf AWS configuration.
149+
* @param parameters parameters.
150+
*
151+
* @return new AmazonS3 client.
152+
* @throws IOException if lookupPassword() has any problem.
153+
*/
154+
protected AmazonS3 buildAmazonS3EncryptionClient(
155+
final ClientConfiguration awsConf,
156+
final S3ClientCreationParameters parameters) throws IOException {
157+
158+
AmazonS3 client;
159+
AmazonS3EncryptionClientV2Builder builder =
160+
new AmazonS3EncryptionClientV2Builder();
161+
Configuration conf = getConf();
162+
163+
//CSE-KMS Method
164+
String kmsKeyId = S3AUtils.lookupPassword(conf,
165+
SERVER_SIDE_ENCRYPTION_KEY, null);
166+
// Check if kmsKeyID is not null
167+
Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method "
168+
+ "requires KMS key ID. Use " + SERVER_SIDE_ENCRYPTION_KEY
169+
+ " property to set it. ");
170+
171+
EncryptionMaterialsProvider materialsProvider =
172+
new KMSEncryptionMaterialsProvider(kmsKeyId);
173+
builder.withEncryptionMaterialsProvider(materialsProvider);
174+
//Configure basic params of a S3 builder.
175+
configureBasicParams(builder, awsConf, parameters);
176+
177+
// Configuring endpoint.
178+
AmazonS3EncryptionClientV2Builder.EndpointConfiguration epr
179+
= createEndpointConfiguration(parameters.getEndpoint(),
180+
awsConf, getConf().getTrimmed(AWS_REGION));
181+
configureEndpoint(builder, epr);
182+
183+
// Create cryptoConfig.
184+
CryptoConfigurationV2 cryptoConfigurationV2 =
185+
new CryptoConfigurationV2(CryptoMode.AuthenticatedEncryption)
186+
.withRangeGetMode(CryptoRangeGetMode.ALL);
187+
if (epr != null) {
188+
cryptoConfigurationV2
189+
.withAwsKmsRegion(RegionUtils.getRegion(epr.getSigningRegion()));
190+
LOG.debug("KMS region used: {}", cryptoConfigurationV2.getAwsKmsRegion());
191+
}
192+
builder.withCryptoConfiguration(cryptoConfigurationV2);
193+
client = builder.build();
194+
195+
return client;
196+
}
197+
124198
/**
125199
* Use the Builder API to create an AWS S3 client.
126200
* <p>
@@ -137,41 +211,68 @@ protected AmazonS3 buildAmazonS3Client(
137211
final ClientConfiguration awsConf,
138212
final S3ClientCreationParameters parameters) {
139213
AmazonS3ClientBuilder b = AmazonS3Client.builder();
140-
b.withCredentials(parameters.getCredentialSet());
141-
b.withClientConfiguration(awsConf);
142-
b.withPathStyleAccessEnabled(parameters.isPathStyleAccess());
214+
configureBasicParams(b, awsConf, parameters);
215+
216+
// endpoint set up is a PITA
217+
AwsClientBuilder.EndpointConfiguration epr
218+
= createEndpointConfiguration(parameters.getEndpoint(),
219+
awsConf, getConf().getTrimmed(AWS_REGION));
220+
configureEndpoint(b, epr);
221+
final AmazonS3 client = b.build();
222+
return client;
223+
}
224+
225+
/**
226+
* A method to configure basic AmazonS3Builder parameters.
227+
*
228+
* @param builder Instance of AmazonS3Builder used.
229+
* @param awsConf ClientConfiguration used.
230+
* @param parameters Parameters used to set in the builder.
231+
*/
232+
private void configureBasicParams(AmazonS3Builder builder,
233+
ClientConfiguration awsConf, S3ClientCreationParameters parameters) {
234+
builder.withCredentials(parameters.getCredentialSet());
235+
builder.withClientConfiguration(awsConf);
236+
builder.withPathStyleAccessEnabled(parameters.isPathStyleAccess());
143237

144238
if (parameters.getMetrics() != null) {
145-
b.withMetricsCollector(
239+
builder.withMetricsCollector(
146240
new AwsStatisticsCollector(parameters.getMetrics()));
147241
}
148242
if (parameters.getRequestHandlers() != null) {
149-
b.withRequestHandlers(
243+
builder.withRequestHandlers(
150244
parameters.getRequestHandlers().toArray(new RequestHandler2[0]));
151245
}
152246
if (parameters.getMonitoringListener() != null) {
153-
b.withMonitoringListener(parameters.getMonitoringListener());
247+
builder.withMonitoringListener(parameters.getMonitoringListener());
154248
}
155249

156-
// endpoint set up is a PITA
157-
AwsClientBuilder.EndpointConfiguration epr
158-
= createEndpointConfiguration(parameters.getEndpoint(),
159-
awsConf, getConf().getTrimmed(AWS_REGION));
250+
}
251+
252+
/**
253+
* A method to configure endpoint and Region for an AmazonS3Builder.
254+
*
255+
* @param builder Instance of AmazonS3Builder used.
256+
* @param epr EndpointConfiguration used to set in builder.
257+
*/
258+
private void configureEndpoint(
259+
AmazonS3Builder builder,
260+
AmazonS3Builder.EndpointConfiguration epr) {
160261
if (epr != null) {
161262
// an endpoint binding was constructed: use it.
162-
b.withEndpointConfiguration(epr);
263+
builder.withEndpointConfiguration(epr);
163264
} else {
164265
// no idea what the endpoint is, so tell the SDK
165266
// to work it out at the cost of an extra HEAD request
166-
b.withForceGlobalBucketAccessEnabled(true);
267+
builder.withForceGlobalBucketAccessEnabled(true);
167268
// HADOOP-17771 force set the region so the build process doesn't halt.
168269
String region = getConf().getTrimmed(AWS_REGION, AWS_S3_CENTRAL_REGION);
169270
LOG.debug("fs.s3a.endpoint.region=\"{}\"", region);
170271
if (!region.isEmpty()) {
171272
// there's either an explicit region or we have fallen back
172273
// to the central one.
173274
LOG.debug("Using default endpoint; setting region to {}", region);
174-
b.setRegion(region);
275+
builder.setRegion(region);
175276
} else {
176277
// no region.
177278
// allow this if people really want it; it is OK to rely on this
@@ -180,8 +281,6 @@ protected AmazonS3 buildAmazonS3Client(
180281
LOG.debug(SDK_REGION_CHAIN_IN_USE);
181282
}
182283
}
183-
final AmazonS3 client = b.build();
184-
return client;
185284
}
186285

187286
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,18 @@
9090
public class Listing extends AbstractStoreOperation {
9191

9292
private static final Logger LOG = S3AFileSystem.LOG;
93+
private final boolean isCSEEnabled;
9394

9495
static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
9596
new AcceptAllButS3nDirs();
9697

9798
private final ListingOperationCallbacks listingOperationCallbacks;
9899

99100
public Listing(ListingOperationCallbacks listingOperationCallbacks,
100-
StoreContext storeContext) {
101+
StoreContext storeContext) {
101102
super(storeContext);
102103
this.listingOperationCallbacks = listingOperationCallbacks;
104+
this.isCSEEnabled = storeContext.isCSEEnabled();
103105
}
104106

105107
/**
@@ -687,7 +689,7 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
687689
S3AFileStatus status = createFileStatus(keyPath, summary,
688690
listingOperationCallbacks.getDefaultBlockSize(keyPath),
689691
getStoreContext().getUsername(),
690-
summary.getETag(), null);
692+
summary.getETag(), null, isCSEEnabled);
691693
LOG.debug("Adding: {}", status);
692694
stats.add(status);
693695
added++;
@@ -961,7 +963,7 @@ public AcceptFilesOnly(Path qualifiedPath) {
961963
public boolean accept(Path keyPath, S3ObjectSummary summary) {
962964
return !keyPath.equals(qualifiedPath)
963965
&& !summary.getKey().endsWith(S3N_FOLDER_SUFFIX)
964-
&& !objectRepresentsDirectory(summary.getKey(), summary.getSize());
966+
&& !objectRepresentsDirectory(summary.getKey());
965967
}
966968

967969
/**
@@ -1049,6 +1051,7 @@ public boolean accept(FileStatus status) {
10491051
}
10501052
}
10511053

1054+
@SuppressWarnings("unchecked")
10521055
public static RemoteIterator<LocatedFileStatus> toLocatedFileStatusIterator(
10531056
RemoteIterator<? extends LocatedFileStatus> iterator) {
10541057
return (RemoteIterator < LocatedFileStatus >) iterator;

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ class S3ABlockOutputStream extends OutputStream implements
155155
private static final LogExactlyOnce WARN_ON_SYNCABLE =
156156
new LogExactlyOnce(LOG);
157157

158+
/** is client side encryption enabled? */
159+
private final boolean isCSEEnabled;
160+
158161
/**
159162
* An S3A output stream which uploads partitions in a separate pool of
160163
* threads; different {@link S3ADataBlocks.BlockFactory}
@@ -189,6 +192,7 @@ class S3ABlockOutputStream extends OutputStream implements
189192
LOG.debug("Put tracker requests multipart upload");
190193
initMultipartUpload();
191194
}
195+
this.isCSEEnabled = builder.isCSEEnabled;
192196
}
193197

194198
/**
@@ -307,29 +311,34 @@ public synchronized void write(byte[] source, int offset, int len)
307311
// of capacity
308312
// Trigger an upload then process the remainder.
309313
LOG.debug("writing more data than block has capacity -triggering upload");
310-
uploadCurrentBlock();
314+
uploadCurrentBlock(false);
311315
// tail recursion is mildly expensive, but given buffer sizes must be MB.
312316
// it's unlikely to recurse very deeply.
313317
this.write(source, offset + written, len - written);
314318
} else {
315-
if (remainingCapacity == 0) {
319+
if (remainingCapacity == 0 && !isCSEEnabled) {
316320
// the whole buffer is done, trigger an upload
317-
uploadCurrentBlock();
321+
uploadCurrentBlock(false);
318322
}
319323
}
320324
}
321325

322326
/**
323327
* Start an asynchronous upload of the current block.
328+
*
329+
* @param isLast true, if part being uploaded is last and client side
330+
* encryption is enabled.
324331
* @throws IOException Problems opening the destination for upload,
325-
* initializing the upload, or if a previous operation has failed.
332+
* initializing the upload, or if a previous operation
333+
* has failed.
326334
*/
327-
private synchronized void uploadCurrentBlock() throws IOException {
335+
private synchronized void uploadCurrentBlock(boolean isLast)
336+
throws IOException {
328337
Preconditions.checkState(hasActiveBlock(), "No active block");
329338
LOG.debug("Writing block # {}", blockCount);
330339
initMultipartUpload();
331340
try {
332-
multiPartUpload.uploadBlockAsync(getActiveBlock());
341+
multiPartUpload.uploadBlockAsync(getActiveBlock(), isLast);
333342
bytesSubmitted += getActiveBlock().dataSize();
334343
} finally {
335344
// set the block to null, so the next write will create a new block.
@@ -389,8 +398,9 @@ public void close() throws IOException {
389398
// PUT the final block
390399
if (hasBlock &&
391400
(block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) {
392-
//send last part
393-
uploadCurrentBlock();
401+
// send last part and set the value of isLastPart to true.
402+
// Necessary to set this "true" in case of client side encryption.
403+
uploadCurrentBlock(true);
394404
}
395405
// wait for the partial uploads to finish
396406
final List<PartETag> partETags =
@@ -760,7 +770,8 @@ public void maybeRethrowUploadFailure() throws IOException {
760770
* @throws IOException upload failure
761771
* @throws PathIOException if too many blocks were written
762772
*/
763-
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
773+
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
774+
Boolean isLast)
764775
throws IOException {
765776
LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
766777
Preconditions.checkNotNull(uploadId, "Null uploadId");
@@ -781,6 +792,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
781792
uploadData.getUploadStream(),
782793
uploadData.getFile(),
783794
0L);
795+
request.setLastPart(isLast);
784796
} catch (SdkBaseException aws) {
785797
// catch and translate
786798
IOException e = translateException("upload", key, aws);
@@ -1042,6 +1054,9 @@ public static final class BlockOutputStreamBuilder {
10421054
/** Should Syncable calls be downgraded? */
10431055
private boolean downgradeSyncableExceptions;
10441056

1057+
/** is Client side Encryption enabled? */
1058+
private boolean isCSEEnabled;
1059+
10451060
private BlockOutputStreamBuilder() {
10461061
}
10471062

@@ -1157,5 +1172,15 @@ public BlockOutputStreamBuilder withDowngradeSyncableExceptions(
11571172
downgradeSyncableExceptions = value;
11581173
return this;
11591174
}
1175+
1176+
/**
1177+
* Set builder value.
1178+
* @param value new value
1179+
* @return the builder
1180+
*/
1181+
public BlockOutputStreamBuilder withCSEEnabled(boolean value) {
1182+
isCSEEnabled = value;
1183+
return this;
1184+
}
11601185
}
11611186
}

0 commit comments

Comments
 (0)