Skip to content

Commit b612c31

Browse files
authored
HADOOP-17404. ABFS: Small write - Merge append and flush
- Contributed by Sneha Vijayarajan
1 parent d21c1c6 commit b612c31

17 files changed

Lines changed: 1030 additions & 349 deletions

hadoop-tools/hadoop-azure/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,7 @@
555555
<exclude>**/azurebfs/ITestAbfsReadWriteAndSeek.java</exclude>
556556
<exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
557557
<exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
558+
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
558559
</excludes>
559560

560561
</configuration>
@@ -594,6 +595,7 @@
594595
<include>**/azurebfs/ITestAbfsReadWriteAndSeek.java</include>
595596
<include>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</include>
596597
<include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
598+
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
597599
</includes>
598600
</configuration>
599601
</execution>

hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,6 @@
4646
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
4747
<suppress checks="ParameterNumber|MagicNumber"
4848
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
49+
<suppress checks="ParameterNumber|VisibilityModifier"
50+
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
4951
</suppressions>

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ public class AbfsConfiguration{
100100
DefaultValue = DEFAULT_WRITE_BUFFER_SIZE)
101101
private int writeBufferSize;
102102

103+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION,
104+
DefaultValue = DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION)
105+
private boolean enableSmallWriteOptimization;
106+
103107
@BooleanConfigurationValidatorAnnotation(
104108
ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY,
105109
DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY)
@@ -537,6 +541,10 @@ public int getWriteBufferSize() {
537541
return this.writeBufferSize;
538542
}
539543

544+
public boolean isSmallWriteOptimizationEnabled() {
545+
return this.enableSmallWriteOptimization;
546+
}
547+
540548
public boolean readSmallFilesCompletely() {
541549
return this.readSmallFilesCompletely;
542550
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppend
578578
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
579579
.withWriteBufferSize(bufferSize)
580580
.enableFlush(abfsConfiguration.isFlushEnabled())
581+
.enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
581582
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
582583
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
583584
.withAppendBlob(isAppendBlob)

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public final class AbfsHttpConstants {
7676
public static final String AT = "@";
7777
public static final String HTTP_HEADER_PREFIX = "x-ms-";
7878
public static final String HASH = "#";
79+
public static final String TRUE = "true";
7980

8081
public static final String PLUS_ENCODE = "%20";
8182
public static final String FORWARD_SLASH_ENCODE = "%2F";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ public final class ConfigurationKeys {
5555
public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
5656
public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
5757
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
58+
/** If the data size written by Hadoop app is small, i.e. data size :
59+
* (a) before any of HFlush/HSync call is made or
60+
* (b) between 2 HFlush/Hsync API calls
61+
* is less than write buffer size, 2 separate calls, one for append and
62+
* another for flush are made.
63+
* By enabling the small write optimization, a single call will be made to
64+
* perform both append and flush operations and hence reduce request count.
65+
*/
66+
public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush";
5867
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
5968
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
6069
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public final class FileSystemConfigurations {
5656
// Default upload and download buffer size
5757
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
5858
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
59+
public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false;
5960
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
6061
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
6162
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public final class HttpQueryParams {
3636
public static final String QUERY_PARAM_POSITION = "position";
3737
public static final String QUERY_PARAM_TIMEOUT = "timeout";
3838
public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
39+
public static final String QUERY_PARAM_FLUSH = "flush";
3940
public static final String QUERY_PARAM_CLOSE = "close";
4041
public static final String QUERY_PARAM_UPN = "upn";
4142
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.contracts.services;
20+
21+
/**
22+
* Saves the different request parameters for append
23+
*/
24+
public class AppendRequestParameters {
25+
public enum Mode {
26+
APPEND_MODE,
27+
FLUSH_MODE,
28+
FLUSH_CLOSE_MODE
29+
}
30+
31+
private final long position;
32+
private final int offset;
33+
private final int length;
34+
private final Mode mode;
35+
private final boolean isAppendBlob;
36+
37+
public AppendRequestParameters(final long position,
38+
final int offset,
39+
final int length,
40+
final Mode mode,
41+
final boolean isAppendBlob) {
42+
this.position = position;
43+
this.offset = offset;
44+
this.length = length;
45+
this.mode = mode;
46+
this.isAppendBlob = isAppendBlob;
47+
}
48+
49+
public long getPosition() {
50+
return this.position;
51+
}
52+
53+
public int getoffset() {
54+
return this.offset;
55+
}
56+
57+
public int getLength() {
58+
return this.length;
59+
}
60+
61+
public Mode getMode() {
62+
return this.mode;
63+
}
64+
65+
public boolean isAppendBlob() {
66+
return this.isAppendBlob;
67+
}
68+
69+
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
4646
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
4747
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
48+
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
4849
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
4950
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
5051
import org.apache.hadoop.io.IOUtils;
@@ -396,38 +397,58 @@ public AbfsRestOperation renameIdempotencyCheckOp(
396397
return op;
397398
}
398399

399-
public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
400-
final int length, final String cachedSasToken, final boolean isAppendBlob) throws AzureBlobFileSystemException {
400+
public AbfsRestOperation append(final String path, final byte[] buffer,
401+
AppendRequestParameters reqParams, final String cachedSasToken)
402+
throws AzureBlobFileSystemException {
401403
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
402404
// JDK7 does not support PATCH, so to workaround the issue we will use
403405
// PUT and specify the real method in the X-Http-Method-Override header.
404406
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
405-
HTTP_METHOD_PATCH));
407+
HTTP_METHOD_PATCH));
406408

407409
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
408410
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
409-
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
411+
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(reqParams.getPosition()));
412+
413+
if ((reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_MODE) || (
414+
reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE)) {
415+
abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, TRUE);
416+
if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) {
417+
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, TRUE);
418+
}
419+
}
420+
410421
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
411422
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
412423
abfsUriQueryBuilder, cachedSasToken);
413424

414425
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
415426
final AbfsRestOperation op = new AbfsRestOperation(
416427
AbfsRestOperationType.Append,
417-
this,
418-
HTTP_METHOD_PUT,
419-
url,
420-
requestHeaders, buffer, offset, length, sasTokenForReuse);
428+
this,
429+
HTTP_METHOD_PUT,
430+
url,
431+
requestHeaders,
432+
buffer,
433+
reqParams.getoffset(),
434+
reqParams.getLength(),
435+
sasTokenForReuse);
421436
try {
422437
op.execute();
423438
} catch (AzureBlobFileSystemException e) {
424-
if (isAppendBlob && appendSuccessCheckOp(op, path, (position + length))) {
439+
if (reqParams.isAppendBlob()
440+
&& appendSuccessCheckOp(op, path,
441+
(reqParams.getPosition() + reqParams.getLength()))) {
425442
final AbfsRestOperation successOp = new AbfsRestOperation(
426443
AbfsRestOperationType.Append,
427-
this,
428-
HTTP_METHOD_PUT,
429-
url,
430-
requestHeaders, buffer, offset, length, sasTokenForReuse);
444+
this,
445+
HTTP_METHOD_PUT,
446+
url,
447+
requestHeaders,
448+
buffer,
449+
reqParams.getoffset(),
450+
reqParams.getLength(),
451+
sasTokenForReuse);
431452
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
432453
return successOp;
433454
}

0 commit comments

Comments
 (0)