Skip to content

Commit 8d65c2c

Browse files
AWS: Integrate S3 analytics accelerator library
1 parent 6e87181 commit 8d65c2c

18 files changed

Lines changed: 500 additions & 19 deletions

aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import software.amazon.awssdk.services.s3control.S3ControlClient;
6969
import software.amazon.awssdk.utils.ImmutableMap;
7070
import software.amazon.awssdk.utils.IoUtils;
71+
import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
7172

7273
public class TestS3FileIOIntegration {
7374

@@ -255,6 +256,48 @@ public void testNewInputStreamWithMultiRegionAccessPoint() throws Exception {
255256
validateRead(s3FileIO);
256257
}
257258

259+
@Test
260+
public void testNewInputStreamWithAnalyticsAccelerator() throws Exception {
261+
s3.putObject(
262+
PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
263+
RequestBody.fromBytes(contentBytes));
264+
S3FileIO s3FileIO = new S3FileIO();
265+
s3FileIO.initialize(
266+
ImmutableMap.of(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, String.valueOf(true)));
267+
validateRead(s3FileIO);
268+
}
269+
270+
@Test
271+
public void testNewInputStreamWithAnalyticsAcceleratorAndCRT() throws Exception {
272+
s3.putObject(
273+
PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
274+
RequestBody.fromBytes(contentBytes));
275+
S3FileIO s3FileIO = new S3FileIO();
276+
s3FileIO.initialize(
277+
ImmutableMap.of(
278+
S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED,
279+
String.valueOf(true),
280+
S3FileIOProperties.S3_CRT_ENABLED,
281+
String.valueOf(true)));
282+
validateRead(s3FileIO);
283+
}
284+
285+
@Test
286+
public void testNewInputStreamWithAnalyticsAcceleratorCustomConfigured() throws Exception {
287+
final String prefetchingMode = "logicalio.prefetching.mode";
288+
final String s3Uri = String.format("s3://%s/%s/%s", bucketName, prefix, "testFile.parquet");
289+
S3FileIO s3FileIO = new S3FileIO();
290+
s3FileIO.initialize(
291+
ImmutableMap.of(
292+
S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED,
293+
String.valueOf(true),
294+
S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_PROPERTIES_PREFIX + prefetchingMode,
295+
PrefetchMode.ALL.name()));
296+
write(s3FileIO, s3Uri);
297+
validateRead(s3FileIO, s3Uri);
298+
s3FileIO.deleteFile(s3Uri);
299+
}
300+
258301
@Test
259302
public void testNewOutputStream() throws Exception {
260303
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
@@ -324,6 +367,19 @@ public void testNewOutputStreamWithMultiRegionAccessPoint() throws Exception {
324367
}
325368
}
326369

370+
@Test
371+
public void testNewOutputStreamWithAnalyticsAccelerator() throws Exception {
372+
S3FileIO s3FileIO = new S3FileIO();
373+
s3FileIO.initialize(
374+
ImmutableMap.of(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, String.valueOf(true)));
375+
write(s3FileIO);
376+
try (InputStream stream =
377+
s3.getObject(GetObjectRequest.builder().bucket(bucketName).key(objectKey).build())) {
378+
String result = IoUtils.toUtf8String(stream);
379+
assertThat(result).isEqualTo(content);
380+
}
381+
}
382+
327383
@Test
328384
public void testServerSideS3Encryption() throws Exception {
329385
S3FileIOProperties properties = new S3FileIOProperties();

aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
2929
import software.amazon.awssdk.services.glue.GlueClient;
3030
import software.amazon.awssdk.services.kms.KmsClient;
31+
import software.amazon.awssdk.services.s3.S3AsyncClient;
32+
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
3133
import software.amazon.awssdk.services.s3.S3Client;
34+
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
3235
import software.amazon.awssdk.services.sts.StsClient;
3336
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
3437
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
@@ -52,6 +55,14 @@ public S3Client s3() {
5255
.build();
5356
}
5457

58+
@Override
59+
public S3AsyncClient s3Async() {
60+
if (s3FileIOProperties.isS3CRTEnabled()) {
61+
return S3AsyncClient.crtBuilder().applyMutation(this::applyAssumeRoleConfigurations).build();
62+
}
63+
return S3AsyncClient.builder().applyMutation(this::applyAssumeRoleConfigurations).build();
64+
}
65+
5566
@Override
5667
public GlueClient glue() {
5768
return GlueClient.builder()
@@ -95,24 +106,25 @@ public void initialize(Map<String, String> properties) {
95106

96107
protected <T extends AwsClientBuilder & AwsSyncClientBuilder> T applyAssumeRoleConfigurations(
97108
T clientBuilder) {
98-
AssumeRoleRequest assumeRoleRequest =
99-
AssumeRoleRequest.builder()
100-
.roleArn(awsProperties.clientAssumeRoleArn())
101-
.roleSessionName(roleSessionName)
102-
.durationSeconds(awsProperties.clientAssumeRoleTimeoutSec())
103-
.externalId(awsProperties.clientAssumeRoleExternalId())
104-
.tags(awsProperties.stsClientAssumeRoleTags())
105-
.build();
106109
clientBuilder
107-
.credentialsProvider(
108-
StsAssumeRoleCredentialsProvider.builder()
109-
.stsClient(sts())
110-
.refreshRequest(assumeRoleRequest)
111-
.build())
110+
.credentialsProvider(createCredentialsProvider())
112111
.region(Region.of(awsProperties.clientAssumeRoleRegion()));
113112
return clientBuilder;
114113
}
115114

115+
protected S3AsyncClientBuilder applyAssumeRoleConfigurations(S3AsyncClientBuilder clientBuilder) {
116+
return clientBuilder
117+
.credentialsProvider(createCredentialsProvider())
118+
.region(Region.of(awsProperties.clientAssumeRoleRegion()));
119+
}
120+
121+
protected S3CrtAsyncClientBuilder applyAssumeRoleConfigurations(
122+
S3CrtAsyncClientBuilder clientBuilder) {
123+
return clientBuilder
124+
.credentialsProvider(createCredentialsProvider())
125+
.region(Region.of(awsProperties.clientAssumeRoleRegion()));
126+
}
127+
116128
protected String region() {
117129
return awsProperties.clientAssumeRoleRegion();
118130
}
@@ -145,4 +157,21 @@ private String genSessionName() {
145157
}
146158
return String.format("iceberg-aws-%s", UUID.randomUUID());
147159
}
160+
161+
private StsAssumeRoleCredentialsProvider createCredentialsProvider() {
162+
return StsAssumeRoleCredentialsProvider.builder()
163+
.stsClient(sts())
164+
.refreshRequest(createAssumeRoleRequest())
165+
.build();
166+
}
167+
168+
private AssumeRoleRequest createAssumeRoleRequest() {
169+
return AssumeRoleRequest.builder()
170+
.roleArn(awsProperties.clientAssumeRoleArn())
171+
.roleSessionName(roleSessionName)
172+
.durationSeconds(awsProperties.clientAssumeRoleTimeoutSec())
173+
.externalId(awsProperties.clientAssumeRoleExternalId())
174+
.tags(awsProperties.stsClientAssumeRoleTags())
175+
.build();
176+
}
148177
}

aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import software.amazon.awssdk.services.glue.GlueClient;
4040
import software.amazon.awssdk.services.glue.GlueClientBuilder;
4141
import software.amazon.awssdk.services.kms.KmsClient;
42+
import software.amazon.awssdk.services.s3.S3AsyncClient;
4243
import software.amazon.awssdk.services.s3.S3Client;
4344
import software.amazon.awssdk.services.s3.S3ClientBuilder;
4445
import software.amazon.awssdk.services.s3.S3Configuration;
@@ -118,6 +119,14 @@ public S3Client s3() {
118119
.build();
119120
}
120121

122+
@Override
123+
public S3AsyncClient s3Async() {
124+
if (s3FileIOProperties.isS3CRTEnabled()) {
125+
return S3AsyncClient.crtBuilder().build();
126+
}
127+
return S3AsyncClient.builder().build();
128+
}
129+
121130
@Override
122131
public GlueClient glue() {
123132
return GlueClient.builder()

aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
2424
import software.amazon.awssdk.services.glue.GlueClient;
2525
import software.amazon.awssdk.services.kms.KmsClient;
26+
import software.amazon.awssdk.services.s3.S3AsyncClient;
2627
import software.amazon.awssdk.services.s3.S3Client;
2728

2829
/**
@@ -38,6 +39,13 @@ public interface AwsClientFactory extends Serializable {
3839
*/
3940
S3Client s3();
4041

42+
/**
43+
* create a Amazon S3 async client
44+
*
45+
* @return s3 async client
46+
*/
47+
S3AsyncClient s3Async();
48+
4149
/**
4250
* create a AWS Glue client
4351
*
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.aws.s3;
20+
21+
import java.io.IOException;
22+
import org.apache.iceberg.io.SeekableInputStream;
23+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
24+
25+
/** A wrapper to convert {@link S3SeekableInputStream} to Iceberg {@link SeekableInputStream} */
26+
class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream {
27+
28+
private final S3SeekableInputStream delegate;
29+
30+
AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream) {
31+
this.delegate = stream;
32+
}
33+
34+
@Override
35+
public int read() throws IOException {
36+
return this.delegate.read();
37+
}
38+
39+
@Override
40+
public int read(byte[] b) throws IOException {
41+
return this.delegate.read(b, 0, b.length);
42+
}
43+
44+
@Override
45+
public int read(byte[] b, int off, int len) throws IOException {
46+
return this.delegate.read(b, off, len);
47+
}
48+
49+
@Override
50+
public void seek(long l) throws IOException {
51+
this.delegate.seek(l);
52+
}
53+
54+
@Override
55+
public long getPos() {
56+
return this.delegate.getPos();
57+
}
58+
59+
@Override
60+
public void close() throws IOException {
61+
this.delegate.close();
62+
}
63+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.aws.s3;
20+
21+
import com.github.benmanes.caffeine.cache.Cache;
22+
import com.github.benmanes.caffeine.cache.Caffeine;
23+
import java.io.IOException;
24+
import org.apache.iceberg.exceptions.RuntimeIOException;
25+
import org.apache.iceberg.io.SeekableInputStream;
26+
import org.apache.iceberg.util.Pair;
27+
import software.amazon.awssdk.services.s3.S3AsyncClient;
28+
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
29+
import software.amazon.s3.analyticsaccelerator.ObjectClientConfiguration;
30+
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
31+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
32+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
33+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
34+
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
35+
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
36+
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
37+
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
38+
import software.amazon.s3.analyticsaccelerator.util.S3URI;
39+
40+
class AnalyticsAcceleratorUtil {
41+
42+
private static final Cache<Pair<S3AsyncClient, S3FileIOProperties>, S3SeekableInputStreamFactory>
43+
STREAM_FACTORY_CACHE = Caffeine.newBuilder().maximumSize(100).build();
44+
45+
private AnalyticsAcceleratorUtil() {}
46+
47+
public static SeekableInputStream newStream(S3InputFile inputFile) {
48+
S3URI uri = S3URI.of(inputFile.uri().bucket(), inputFile.uri().key());
49+
HeadObjectResponse metadata = inputFile.getObjectMetadata();
50+
OpenStreamInformation openStreamInfo =
51+
OpenStreamInformation.builder()
52+
.objectMetadata(
53+
ObjectMetadata.builder()
54+
.contentLength(metadata.contentLength())
55+
.etag(metadata.eTag())
56+
.build())
57+
.build();
58+
59+
S3SeekableInputStreamFactory factory =
60+
STREAM_FACTORY_CACHE.get(
61+
Pair.of(inputFile.asyncClient(), inputFile.s3FileIOProperties()),
62+
AnalyticsAcceleratorUtil::createNewFactory);
63+
64+
try {
65+
S3SeekableInputStream seekableInputStream = factory.createStream(uri, openStreamInfo);
66+
return new AnalyticsAcceleratorInputStreamWrapper(seekableInputStream);
67+
} catch (IOException e) {
68+
throw new RuntimeIOException(
69+
e, "Failed to create S3 analytics accelerator input stream for: %s", inputFile.uri());
70+
}
71+
}
72+
73+
private static S3SeekableInputStreamFactory createNewFactory(
74+
Pair<S3AsyncClient, S3FileIOProperties> cacheKey) {
75+
ConnectorConfiguration connectorConfiguration =
76+
new ConnectorConfiguration(cacheKey.second().s3AnalyticsacceleratorProperties());
77+
S3SeekableInputStreamConfiguration streamConfiguration =
78+
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
79+
ObjectClientConfiguration objectClientConfiguration =
80+
ObjectClientConfiguration.fromConfiguration(connectorConfiguration);
81+
82+
ObjectClient objectClient = new S3SdkObjectClient(cacheKey.first(), objectClientConfiguration);
83+
return new S3SeekableInputStreamFactory(objectClient, streamConfiguration);
84+
}
85+
86+
public static void cleanupCache(
87+
S3AsyncClient asyncClient, S3FileIOProperties s3FileIOProperties) {
88+
STREAM_FACTORY_CACHE.invalidate(Pair.of(asyncClient, s3FileIOProperties));
89+
}
90+
}

0 commit comments

Comments
 (0)