Skip to content

Commit 881353e

Browse files
committed
HADOOP-19478. S3A: pull out new configuration load/probes under S3AStore
New service under S3AStoreImpl: StoreConfigurationService This is just the draft design; it is intended to be a place to move most of our configuration options: flags, durations etc, ideally lifting some of the work from org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations, * putting it into common * adding duration and size attributes * move reflection code from AbfsConfiguration#AbfsConfiguration into there too, and apply Change-Id: I99c8305574492c05170274dcc363bfba4857981b
1 parent 5054b16 commit 881353e

File tree

10 files changed

+466
-36
lines changed

10 files changed

+466
-36
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1522,6 +1522,31 @@ private Constants() {
15221522
*/
15231523
public static final String FS_S3A_PERFORMANCE_FLAGS =
15241524
"fs.s3a.performance.flags";
1525+
1526+
1527+
/**
1528+
* Is the create overwrite feature enabled or not?
1529+
* A configuration option and a path status probe.
1530+
* Value {@value}.
1531+
*/
1532+
public static final String FS_S3A_CONDITIONAL_CREATE_ENABLED = "fs.s3a.conditional.create.enabled";
1533+
1534+
/**
1535+
* If conditional create is available, should it be used in
1536+
* createFile() operations to check for file existence?
1537+
* If set, this disables probes for directories.
1538+
* Value {@value}.
1539+
*/
1540+
public static final String FS_S3A_CONDITIONAL_CREATE_FILES = "fs.s3a.conditional.create.files";
1541+
1542+
/**
1543+
* createFile() boolean option toreate a multipart file, always: {@value}.
1544+
* <p>
1545+
* This is inefficient and will not work on a store which doesn't support that feature,
1546+
* so is primarily for testing.
1547+
*/
1548+
public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart";
1549+
15251550
/**
15261551
* Prefix for adding a header to the object when created.
15271552
* The actual value must have a "." suffix and then the actual header.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@
147147
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
148148
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
149149
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
150+
import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration;
151+
import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
150152
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
151153
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
152154
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
@@ -258,6 +260,9 @@
258260
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
259261
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
260262
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.isS3ExpressStore;
263+
import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.ConditionalCreateAvailable;
264+
import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.ConditionalCreateForFiles;
265+
import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.DowngradeSyncableExceptions;
261266
import static org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests;
262267
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.checkNoS3Guard;
263268
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
@@ -611,6 +616,9 @@ public void initialize(URI name, Configuration originalConf)
611616
setUri(name, delegationTokensEnabled);
612617
super.initialize(uri, conf);
613618
setConf(conf);
619+
// init store configuration service.
620+
StoreConfigurationService storeConfiguration = new StoreConfigurationService();
621+
storeConfiguration.init(conf);
614622

615623
// initialize statistics, after which statistics
616624
// can be collected.
@@ -794,7 +802,9 @@ public void initialize(URI name, Configuration originalConf)
794802
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
795803

796804
// now create and initialize the store
797-
store = createS3AStore(clientManager, rateLimitCapacity);
805+
store = createS3AStore(clientManager,
806+
rateLimitCapacity,
807+
storeConfiguration);
798808
// the s3 client is created through the store, rather than
799809
// directly through the client manager.
800810
// this is to aid mocking.
@@ -864,24 +874,28 @@ private S3AFileSystemOperations createFileSystemHandler() {
864874
* This is protected so that tests can override it.
865875
* @param clientManager client manager
866876
* @param rateLimitCapacity rate limit
877+
* @param storeConfiguration the store configuration.
867878
* @return a new store instance
868879
*/
869880
@VisibleForTesting
870881
protected S3AStore createS3AStore(final ClientManager clientManager,
871-
final int rateLimitCapacity) {
882+
final int rateLimitCapacity,
883+
final StoreConfigurationService storeConfiguration) {
884+
872885
final S3AStore st = new S3AStoreBuilder()
873886
.withAuditSpanSource(getAuditManager())
874887
.withClientManager(clientManager)
875888
.withDurationTrackerFactory(getDurationTrackerFactory())
876889
.withFsStatistics(getFsStatistics())
877890
.withInstrumentation(getInstrumentation())
878891
.withStatisticsContext(statisticsContext)
892+
.withStoreConfigurationService(storeConfiguration)
879893
.withStoreContextFactory(this)
880894
.withStorageStatistics(getStorageStatistics())
881895
.withReadRateLimiter(unlimitedRate())
882896
.withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
883897
.build();
884-
st.init(getConf());
898+
st.init(storeConfiguration.getConfig());
885899
st.start();
886900
return st;
887901
}
@@ -2123,28 +2137,28 @@ private FSDataOutputStream innerCreateFile(
21232137

21242138
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
21252139
S3ABlockOutputStream.builder()
2126-
.withKey(destKey)
2127-
.withBlockFactory(blockFactory)
2128-
.withBlockSize(partSize)
2129-
.withStatistics(outputStreamStatistics)
2130-
.withProgress(progress)
2131-
.withPutTracker(putTracker)
2132-
.withWriteOperations(
2133-
createWriteOperationHelper(auditSpan))
2134-
.withExecutorService(
2135-
new SemaphoredDelegatingExecutor(
2136-
boundedThreadPool,
2137-
blockOutputActiveBlocks,
2138-
true,
2139-
outputStreamStatistics))
2140-
.withDowngradeSyncableExceptions(
2140+
.withKey(destKey)
2141+
.withBlockFactory(blockFactory)
2142+
.withBlockSize(partSize)
2143+
.withStatistics(outputStreamStatistics)
2144+
.withProgress(progress)
2145+
.withPutTracker(putTracker)
2146+
.withWriteOperations(
2147+
createWriteOperationHelper(auditSpan))
2148+
.withExecutorService(
2149+
new SemaphoredDelegatingExecutor(
2150+
boundedThreadPool,
2151+
blockOutputActiveBlocks,
2152+
true,
2153+
outputStreamStatistics))
2154+
.withDowngradeSyncableExceptions(
21412155
getConf().getBoolean(
21422156
DOWNGRADE_SYNCABLE_EXCEPTIONS,
21432157
DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
2144-
.withCSEEnabled(isCSEEnabled)
2145-
.withPutOptions(putOptions)
2146-
.withIOStatisticsAggregator(
2147-
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
2158+
.withCSEEnabled(isCSEEnabled)
2159+
.withPutOptions(putOptions)
2160+
.withIOStatisticsAggregator(
2161+
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
21482162
.withMultipartEnabled(isMultipartUploadEnabled);
21492163
return new FSDataOutputStream(
21502164
new S3ABlockOutputStream(builder),
@@ -5299,6 +5313,7 @@ public CommitterStatistics newCommitterStatistics() {
52995313
public boolean hasPathCapability(final Path path, final String capability)
53005314
throws IOException {
53015315
final Path p = makeQualified(path);
5316+
final S3AStore store = getStore();
53025317
String cap = validatePathCapabilityArgs(p, capability);
53035318
switch (cap) {
53045319

@@ -5365,6 +5380,11 @@ public boolean hasPathCapability(final Path path, final String capability)
53655380
case FS_S3A_CREATE_HEADER:
53665381
return true;
53675382

5383+
case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE:
5384+
case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG:
5385+
// conditional create requires it to be enabled in the FS.
5386+
return store.getStoreConfiguration().isFlagSet(ConditionalCreateAvailable);
5387+
53685388
// is the FS configured for create file performance
53695389
case FS_S3A_CREATE_PERFORMANCE_ENABLED:
53705390
return performanceFlags.enabled(PerformanceFlagEnum.Create);
@@ -5388,8 +5408,8 @@ public boolean hasPathCapability(final Path path, final String capability)
53885408
}
53895409

53905410
// ask the store for what capabilities it offers
5391-
// this may include input and output capabilites -and more
5392-
if (getStore() != null && getStore().hasPathCapability(path, capability)) {
5411+
// this includes, store configuration flags, IO capabilites...etc.
5412+
if (store.hasPathCapability(path, capability)) {
53935413
return true;
53945414
}
53955415

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
5555
import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
5656
import org.apache.hadoop.fs.s3a.impl.StoreContext;
57+
import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration;
5758
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
5859
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
5960
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
@@ -366,4 +367,12 @@ default boolean hasCapability(String capability) {
366367
/*
367368
=============== END ObjectInputStreamFactory ===============
368369
*/
370+
371+
372+
/**
373+
* Get the store configuration.
374+
* @return the store configuration.
375+
*/
376+
StoreConfiguration getStoreConfiguration();
377+
369378
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
2424
import org.apache.hadoop.fs.s3a.S3AStore;
2525
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
26+
import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
2627
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
2728
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
2829
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
@@ -51,6 +52,8 @@ public class S3AStoreBuilder {
5152

5253
private AuditSpanSource<AuditSpanS3A> auditSpanSource;
5354

55+
private StoreConfigurationService storeConfigurationService;
56+
5457
/**
5558
* The original file system statistics: fairly minimal but broadly
5659
* collected so it is important to pick up.
@@ -117,6 +120,17 @@ public S3AStoreBuilder withFsStatistics(final FileSystem.Statistics value) {
117120
return this;
118121
}
119122

123+
/**
124+
* Set the store configuration service.
125+
* @param value new value
126+
* @return the builder
127+
*/
128+
public S3AStoreBuilder withStoreConfigurationService(
129+
final StoreConfigurationService value) {
130+
storeConfigurationService = value;
131+
return this;
132+
}
133+
120134
public S3AStore build() {
121135
return new S3AStoreImpl(storeContextFactory,
122136
clientManager,
@@ -127,6 +141,7 @@ public S3AStore build() {
127141
readRateLimiter,
128142
writeRateLimiter,
129143
auditSpanSource,
130-
fsStatistics);
144+
fsStatistics,
145+
storeConfigurationService);
131146
}
132147
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,14 @@
7373
import org.apache.hadoop.fs.s3a.UploadInfo;
7474
import org.apache.hadoop.fs.s3a.api.RequestFactory;
7575
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
76+
import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration;
7677
import org.apache.hadoop.fs.s3a.impl.streams.FactoryBindingParameters;
7778
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
7879
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
7980
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
8081
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
8182
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
83+
import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
8284
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
8385
import org.apache.hadoop.fs.statistics.DurationTracker;
8486
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
@@ -193,10 +195,15 @@ public class S3AStoreImpl
193195
*/
194196
private ObjectInputStreamFactory objectInputStreamFactory;
195197

198+
/**
199+
* Store Configuration.
200+
*/
201+
private final StoreConfigurationService storeConfiguration;
202+
196203
/**
197204
* Constructor to create S3A store.
198205
* Package private, as {@link S3AStoreBuilder} creates them.
199-
* */
206+
*/
200207
S3AStoreImpl(StoreContextFactory storeContextFactory,
201208
ClientManager clientManager,
202209
DurationTrackerFactory durationTrackerFactory,
@@ -206,7 +213,8 @@ public class S3AStoreImpl
206213
RateLimiting readRateLimiter,
207214
RateLimiting writeRateLimiter,
208215
AuditSpanSource<AuditSpanS3A> auditSpanSource,
209-
@Nullable FileSystem.Statistics fsStatistics) {
216+
@Nullable FileSystem.Statistics fsStatistics,
217+
StoreConfigurationService storeConfiguration) {
210218
super("S3AStore");
211219
this.auditSpanSource = requireNonNull(auditSpanSource);
212220
this.clientManager = requireNonNull(clientManager);
@@ -223,7 +231,9 @@ public class S3AStoreImpl
223231
this.invoker = requireNonNull(storeContext.getInvoker());
224232
this.bucket = requireNonNull(storeContext.getBucket());
225233
this.requestFactory = requireNonNull(storeContext.getRequestFactory());
234+
this.storeConfiguration = requireNonNull(storeConfiguration);
226235
addService(clientManager);
236+
addService(storeConfiguration);
227237
}
228238

229239
/**
@@ -253,20 +263,26 @@ protected void serviceStart() throws Exception {
253263

254264
/**
255265
* Return the store path capabilities.
256-
* If the object stream factory is non-null, hands off the
257-
* query to that factory if not handled here.
266+
* This may hand off the probe to assistant classes/services.
258267
* @param path path to query the capability of.
259268
* @param capability non-null, non-empty string to query the path for support.
260-
* @return known capabilities
269+
* @return true if the capability is known and enabled.
261270
*/
262271
@Override
263272
public boolean hasPathCapability(final Path path, final String capability) {
264-
switch (toLowerCase(capability)) {
265-
case StreamCapabilities.IOSTATISTICS:
273+
274+
// only support this once started; avoids worrying about
275+
// state of services which assist in this calculation.
276+
if (!isInState(STATE.STARTED)) {
277+
return false;
278+
}
279+
final String cap = toLowerCase(capability);
280+
if (cap.equals(StreamCapabilities.IOSTATISTICS)) {
266281
return true;
267-
default:
268-
return inputStreamHasCapability(capability);
269282
}
283+
// probe store configuration and the input stream for
284+
// the capability.
285+
return storeConfiguration.hasPathCapability(path, cap)|| inputStreamHasCapability(cap);
270286
}
271287

272288
/**
@@ -1001,4 +1017,19 @@ public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOE
10011017
/*
10021018
=============== END ObjectInputStreamFactory ===============
10031019
*/
1020+
1021+
1022+
/*
1023+
=============== BEGIN StoreConfigurationService ===============
1024+
*/
1025+
1026+
@Override
1027+
public StoreConfiguration getStoreConfiguration() {
1028+
return storeConfiguration;
1029+
}
1030+
1031+
/*
1032+
=============== END StoreConfigurationService ===============
1033+
*/
1034+
10041035
}

0 commit comments

Comments
 (0)