Skip to content

Commit dbee3c1

Browse files
Integrate analytics accelerator into iceberg-aws
1 parent 8844508 commit dbee3c1

22 files changed

Lines changed: 520 additions & 25 deletions

aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import software.amazon.awssdk.services.s3control.S3ControlClient;
6969
import software.amazon.awssdk.utils.ImmutableMap;
7070
import software.amazon.awssdk.utils.IoUtils;
71+
import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
7172

7273
public class TestS3FileIOIntegration {
7374

@@ -255,6 +256,33 @@ public void testNewInputStreamWithMultiRegionAccessPoint() throws Exception {
255256
validateRead(s3FileIO);
256257
}
257258

259+
@Test
260+
public void testNewInputStreamWithAnalyticsAccelerator() throws Exception {
261+
s3.putObject(
262+
PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
263+
RequestBody.fromBytes(contentBytes));
264+
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
265+
s3FileIO.initialize(
266+
ImmutableMap.of(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, String.valueOf(true)));
267+
validateRead(s3FileIO);
268+
}
269+
270+
@Test
271+
public void testNewInputStreamWithAnalyticsAcceleratorCustomConfigured() throws Exception {
272+
final String prefetchingMode = "logicalio.prefetching.mode";
273+
final String s3Uri = String.format("s3://%s/%s/%s", bucketName, prefix, "someFile.parquet");
274+
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
275+
s3FileIO.initialize(
276+
ImmutableMap.of(
277+
S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED,
278+
String.valueOf(true),
279+
S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_PREFIX + prefetchingMode,
280+
PrefetchMode.ALL.name()));
281+
write(s3FileIO, s3Uri);
282+
validateRead(s3FileIO, s3Uri);
283+
s3FileIO.deleteFile(s3Uri);
284+
}
285+
258286
@Test
259287
public void testNewOutputStream() throws Exception {
260288
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
@@ -324,6 +352,19 @@ public void testNewOutputStreamWithMultiRegionAccessPoint() throws Exception {
324352
}
325353
}
326354

355+
@Test
356+
public void testNewOutputStreamWithAnalyticsAccelerator() throws Exception {
357+
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
358+
s3FileIO.initialize(
359+
ImmutableMap.of(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, String.valueOf(true)));
360+
write(s3FileIO);
361+
try (InputStream stream =
362+
s3.getObject(GetObjectRequest.builder().bucket(bucketName).key(objectKey).build())) {
363+
String result = IoUtils.toUtf8String(stream);
364+
assertThat(result).isEqualTo(content);
365+
}
366+
}
367+
327368
@Test
328369
public void testServerSideS3Encryption() throws Exception {
329370
S3FileIOProperties properties = new S3FileIOProperties();

aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
2929
import software.amazon.awssdk.services.glue.GlueClient;
3030
import software.amazon.awssdk.services.kms.KmsClient;
31+
import software.amazon.awssdk.services.s3.S3AsyncClient;
3132
import software.amazon.awssdk.services.s3.S3Client;
33+
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
3234
import software.amazon.awssdk.services.sts.StsClient;
3335
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
3436
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
@@ -52,6 +54,14 @@ public S3Client s3() {
5254
.build();
5355
}
5456

57+
@Override
58+
public S3AsyncClient s3Async() {
59+
return S3AsyncClient.crtBuilder()
60+
.applyMutation(this::applyAssumeRoleConfigurations)
61+
.applyMutation(awsClientProperties::applyAsyncConfigurations)
62+
.build();
63+
}
64+
5565
@Override
5666
public GlueClient glue() {
5767
return GlueClient.builder()
@@ -95,24 +105,19 @@ public void initialize(Map<String, String> properties) {
95105

96106
protected <T extends AwsClientBuilder & AwsSyncClientBuilder> T applyAssumeRoleConfigurations(
97107
T clientBuilder) {
98-
AssumeRoleRequest assumeRoleRequest =
99-
AssumeRoleRequest.builder()
100-
.roleArn(awsProperties.clientAssumeRoleArn())
101-
.roleSessionName(roleSessionName)
102-
.durationSeconds(awsProperties.clientAssumeRoleTimeoutSec())
103-
.externalId(awsProperties.clientAssumeRoleExternalId())
104-
.tags(awsProperties.stsClientAssumeRoleTags())
105-
.build();
106108
clientBuilder
107-
.credentialsProvider(
108-
StsAssumeRoleCredentialsProvider.builder()
109-
.stsClient(sts())
110-
.refreshRequest(assumeRoleRequest)
111-
.build())
109+
.credentialsProvider(createCredentialsProvider())
112110
.region(Region.of(awsProperties.clientAssumeRoleRegion()));
113111
return clientBuilder;
114112
}
115113

114+
protected S3CrtAsyncClientBuilder applyAssumeRoleConfigurations(
115+
S3CrtAsyncClientBuilder clientBuilder) {
116+
return clientBuilder
117+
.credentialsProvider(createCredentialsProvider())
118+
.region(Region.of(awsProperties.clientAssumeRoleRegion()));
119+
}
120+
116121
protected String region() {
117122
return awsProperties.clientAssumeRoleRegion();
118123
}
@@ -145,4 +150,21 @@ private String genSessionName() {
145150
}
146151
return String.format("iceberg-aws-%s", UUID.randomUUID());
147152
}
153+
154+
private StsAssumeRoleCredentialsProvider createCredentialsProvider() {
155+
return StsAssumeRoleCredentialsProvider.builder()
156+
.stsClient(sts())
157+
.refreshRequest(createAssumeRoleRequest())
158+
.build();
159+
}
160+
161+
private AssumeRoleRequest createAssumeRoleRequest() {
162+
return AssumeRoleRequest.builder()
163+
.roleArn(awsProperties.clientAssumeRoleArn())
164+
.roleSessionName(roleSessionName)
165+
.durationSeconds(awsProperties.clientAssumeRoleTimeoutSec())
166+
.externalId(awsProperties.clientAssumeRoleExternalId())
167+
.tags(awsProperties.stsClientAssumeRoleTags())
168+
.build();
169+
}
148170
}

aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import software.amazon.awssdk.services.glue.GlueClient;
4040
import software.amazon.awssdk.services.glue.GlueClientBuilder;
4141
import software.amazon.awssdk.services.kms.KmsClient;
42+
import software.amazon.awssdk.services.s3.S3AsyncClient;
4243
import software.amazon.awssdk.services.s3.S3Client;
4344
import software.amazon.awssdk.services.s3.S3ClientBuilder;
4445
import software.amazon.awssdk.services.s3.S3Configuration;
@@ -118,6 +119,13 @@ public S3Client s3() {
118119
.build();
119120
}
120121

122+
@Override
123+
public S3AsyncClient s3Async() {
124+
return S3AsyncClient.crtBuilder()
125+
.applyMutation(awsClientProperties::applyAsyncConfigurations)
126+
.build();
127+
}
128+
121129
@Override
122130
public GlueClient glue() {
123131
return GlueClient.builder()

aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
2424
import software.amazon.awssdk.services.glue.GlueClient;
2525
import software.amazon.awssdk.services.kms.KmsClient;
26+
import software.amazon.awssdk.services.s3.S3AsyncClient;
2627
import software.amazon.awssdk.services.s3.S3Client;
2728

2829
/**
@@ -38,6 +39,13 @@ public interface AwsClientFactory extends Serializable {
3839
*/
3940
S3Client s3();
4041

42+
/**
43+
* create a Amazon S3 async client
44+
*
45+
* @return s3 async client
46+
*/
47+
S3AsyncClient s3Async();
48+
4149
/**
4250
* create a AWS Glue client
4351
*

aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
4141
import software.amazon.awssdk.core.retry.RetryMode;
4242
import software.amazon.awssdk.regions.Region;
43+
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
4344

4445
public class AwsClientProperties implements Serializable {
4546
/**
@@ -74,6 +75,14 @@ public class AwsClientProperties implements Serializable {
7475
*/
7576
public static final String CLIENT_REGION = "client.region";
7677

78+
/**
79+
* Used to set the maxConcurrency field in the {@link S3CrtAsyncClientBuilder} when building async
80+
* s3 clients.
81+
*/
82+
public static final String CLIENT_MAX_CONCURRENCY = "client.max-concurrency";
83+
84+
public static final int CLIENT_MAX_CONCURRENCY_DEFAULT = 500;
85+
7786
/**
7887
* When set, the {@link VendedCredentialsProvider} will be used to fetch and refresh vended
7988
* credentials from this endpoint.
@@ -84,6 +93,7 @@ public class AwsClientProperties implements Serializable {
8493
public static final String REFRESH_CREDENTIALS_ENABLED = "client.refresh-credentials-enabled";
8594

8695
private String clientRegion;
96+
private final Integer clientMaxConcurrency;
8797
private final String clientCredentialsProvider;
8898
private final Map<String, String> clientCredentialsProviderProperties;
8999
private final String refreshCredentialsEndpoint;
@@ -92,6 +102,7 @@ public class AwsClientProperties implements Serializable {
92102

93103
public AwsClientProperties() {
94104
this.clientRegion = null;
105+
this.clientMaxConcurrency = CLIENT_MAX_CONCURRENCY_DEFAULT;
95106
this.clientCredentialsProvider = null;
96107
this.clientCredentialsProviderProperties = null;
97108
this.refreshCredentialsEndpoint = null;
@@ -102,6 +113,9 @@ public AwsClientProperties() {
102113
public AwsClientProperties(Map<String, String> properties) {
103114
this.allProperties = SerializableMap.copyOf(properties);
104115
this.clientRegion = properties.get(CLIENT_REGION);
116+
this.clientMaxConcurrency =
117+
PropertyUtil.propertyAsInt(
118+
properties, CLIENT_MAX_CONCURRENCY, CLIENT_MAX_CONCURRENCY_DEFAULT);
105119
this.clientCredentialsProvider = properties.get(CLIENT_CREDENTIALS_PROVIDER);
106120
this.clientCredentialsProviderProperties =
107121
PropertyUtil.propertiesWithPrefix(properties, CLIENT_CREDENTIAL_PROVIDER_PREFIX);
@@ -135,6 +149,19 @@ public <T extends AwsClientBuilder> void applyClientRegionConfiguration(T builde
135149
}
136150
}
137151

152+
/**
153+
* Configure async client settings.
154+
*
155+
* <p>Sample usage:
156+
*
157+
* <pre>
158+
* S3AsyncClient.crtBuilder().applyMutation(awsClientProperties::applyAsyncConfigurations)
159+
* </pre>
160+
*/
161+
public <T extends S3CrtAsyncClientBuilder> void applyAsyncConfigurations(T builder) {
162+
builder.maxConcurrency(clientMaxConcurrency);
163+
}
164+
138165
/**
139166
* Configure the credential provider for AWS clients.
140167
*

aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,20 @@
2727

2828
abstract class BaseS3File {
2929
private final S3Client client;
30+
private final S3InputStreamFactory inputStreamFactory;
3031
private final S3URI uri;
3132
private final S3FileIOProperties s3FileIOProperties;
3233
private HeadObjectResponse metadata;
3334
private final MetricsContext metrics;
3435

3536
BaseS3File(
36-
S3Client client, S3URI uri, S3FileIOProperties s3FileIOProperties, MetricsContext metrics) {
37+
S3Client client,
38+
S3InputStreamFactory inputStreamFactory,
39+
S3URI uri,
40+
S3FileIOProperties s3FileIOProperties,
41+
MetricsContext metrics) {
3742
this.client = client;
43+
this.inputStreamFactory = inputStreamFactory;
3844
this.uri = uri;
3945
this.s3FileIOProperties = s3FileIOProperties;
4046
this.metrics = metrics;
@@ -48,6 +54,10 @@ S3Client client() {
4854
return client;
4955
}
5056

57+
S3InputStreamFactory inputStreamFactory() {
58+
return inputStreamFactory;
59+
}
60+
5161
S3URI uri() {
5262
return uri;
5363
}

aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
import java.util.Map;
2222
import org.apache.iceberg.aws.AwsClientProperties;
2323
import org.apache.iceberg.aws.HttpClientProperties;
24+
import software.amazon.awssdk.services.s3.S3AsyncClient;
2425
import software.amazon.awssdk.services.s3.S3Client;
2526

26-
class DefaultS3FileIOAwsClientFactory implements S3FileIOAwsClientFactory {
27+
public class DefaultS3FileIOAwsClientFactory implements S3FileIOAwsClientFactory {
2728
private S3FileIOProperties s3FileIOProperties;
2829
private HttpClientProperties httpClientProperties;
2930
private AwsClientProperties awsClientProperties;
3031

31-
DefaultS3FileIOAwsClientFactory() {
32+
public DefaultS3FileIOAwsClientFactory() {
3233
this.s3FileIOProperties = new S3FileIOProperties();
3334
this.httpClientProperties = new HttpClientProperties();
3435
this.awsClientProperties = new AwsClientProperties();
@@ -58,4 +59,11 @@ public S3Client s3() {
5859
.applyMutation(s3FileIOProperties::applyRetryConfigurations)
5960
.build();
6061
}
62+
63+
@Override
64+
public S3AsyncClient s3Async() {
65+
return S3AsyncClient.crtBuilder()
66+
.applyMutation(awsClientProperties::applyAsyncConfigurations)
67+
.build();
68+
}
6169
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.aws.s3;
20+
21+
import org.apache.iceberg.io.SeekableInputStream;
22+
import org.apache.iceberg.metrics.MetricsContext;
23+
import software.amazon.awssdk.services.s3.S3Client;
24+
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
25+
26+
public class DefaultS3InputStreamFactory implements S3InputStreamFactory {
27+
@Override
28+
public SeekableInputStream createStream(
29+
S3Client client,
30+
S3URI uri,
31+
S3FileIOProperties properties,
32+
MetricsContext metrics,
33+
ObjectMetadata metadata) {
34+
return new S3InputStream(client, uri, properties, metrics);
35+
}
36+
}

0 commit comments

Comments
 (0)