Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FOOTER_READ_BUFFER_SIZE)
private int footerReadBufferSize;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_BUFFERED_PREAD_DISABLE,
DefaultValue = DEFAULT_BUFFERED_PREAD_DISABLE)
private boolean isBufferedPReadDisabled;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
Expand Down Expand Up @@ -953,6 +958,14 @@ public int getFooterReadBufferSize() {
return this.footerReadBufferSize;
}

/**
* Returns whether the buffered pread is disabled.
* @return true if buffered pread is disabled, false otherwise.
*/
public boolean isBufferedPReadDisabled() {
return this.isBufferedPReadDisabled;
}

public int getReadBufferSize() {
return this.readBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,8 +945,9 @@ contentLength, populateAbfsInputStreamContext(
private AbfsInputStreamContext populateAbfsInputStreamContext(
Optional<Configuration> options, ContextEncryptionAdapter contextEncryptionAdapter) {
boolean bufferedPreadDisabled = options
.map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false))
.orElse(false);
.map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE,
getAbfsConfiguration().isBufferedPReadDisabled()))
.orElse(getAbfsConfiguration().isBufferedPReadDisabled());
int footerReadBufferSize = options.map(c -> c.getInt(
AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize()))
.orElse(getAbfsConfiguration().getFooterReadBufferSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public final class AbfsHttpConstants {
public static final String STAR = "*";
public static final String COMMA = ",";
public static final String COLON = ":";
public static final String HYPHEN = "-";
public static final String EQUAL = "=";
public static final String QUESTION_MARK = "?";
public static final String AND_MARK = "&";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = true;
public static final int DEFAULT_FOOTER_READ_BUFFER_SIZE = 512 * ONE_KB;
public static final boolean DEFAULT_BUFFERED_PREAD_DISABLE = false;
public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public enum ReadType {
this.readType = readType;
}

/**
* Get the read type as a string.
*
* @return the read type string
*/
@Override
public String toString() {
return readType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ private int optimisedRead(final byte[] b, final int off, final int len,
}
} catch (IOException e) {
LOG.debug("Optimized read failed. Defaulting to readOneBlock {}", e);
tracingContext.setReadType(ReadType.NORMAL_READ);
restorePointerState();
return readOneBlock(b, off, len);
} finally {
Expand All @@ -451,7 +450,6 @@ private int optimisedRead(final byte[] b, final int off, final int len,
// bCursor that means the user requested data has not been read.
if (fCursor < contentLength && bCursor > limit) {
restorePointerState();
tracingContext.setReadType(ReadType.NORMAL_READ);
return readOneBlock(b, off, len);
}
return copyToUserBuffer(b, off, len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class TracingContext {
//final concatenated ID list set into x-ms-client-request-id header
private String header = EMPTY_STRING;
private String ingressHandler = EMPTY_STRING;
private String position = EMPTY_STRING;
private String position = String.valueOf(0); // position of read/write in remote file
private String metricResults = EMPTY_STRING;
private String metricHeader = EMPTY_STRING;
private ReadType readType = ReadType.UNKNOWN_READ;
Expand All @@ -80,7 +80,7 @@ public class TracingContext {
* will not change this field. In case {@link #primaryRequestId} is non-null,
* this field shall not be set.
*/
private String primaryRequestIdForRetry;
private String primaryRequestIdForRetry = EMPTY_STRING;
private Integer operatedBlobCount = 1; // Only relevant for rename-delete over blob endpoint where it will be explicitly set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it changed from null to 1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it was coming out as null in ClientReqId. Having a null value does not looks good and can be prone to NPE if someone used this value anywhere.
Since this is set only in rename/delete other ops are prone to NPE.

As to why set to 1, I thought for every operation this has to be 1. I am open to suggestions for a better default value but strongly feel null should be avoided.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there was a null check before it was added to the header which would avoid the NPE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but we decided to keep the header schema fix and publishing this value as null does not look good in Client Request Id as it can be exposed to user.


private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
Expand Down Expand Up @@ -200,8 +200,8 @@ public void setListener(Listener listener) {
* <li>ingressHandler</li>
* <li>position of read/write in the remote file</li>
* <li>operatedBlobCount - number of blobs operated on by this request</li>
* <li>httpOperationHeader - suffix for network library used</li>
* <li>operationSpecificHeader - different operation types can publish info relevant to that operation</li>
* <li>httpOperationHeader - suffix for network library used</li>
* </ul>
* @param httpOperation AbfsHttpOperation instance to set header into
* connection
Expand All @@ -214,7 +214,7 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail
clientRequestId = UUID.randomUUID().toString();
switch (format) {
case ALL_ID_FORMAT:
header = TracingHeaderVersion.V1.getVersion() + COLON
header = TracingHeaderVersion.getCurrentVersion() + COLON
+ clientCorrelationID + COLON
+ clientRequestId + COLON
+ fileSystemID + COLON
Expand All @@ -225,19 +225,19 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail
+ ingressHandler + COLON
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these empty string checks are needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With empty checks we cannot have a fixed schema. We need the proper defined schema where each position after split is fixed for all the headers and analysis can be done easily without worrying about the position of info we need to analyse.

+ position + COLON
+ operatedBlobCount + COLON
+ httpOperation.getTracingContextSuffix() + COLON
+ getOperationSpecificHeader(opType);
+ getOperationSpecificHeader(opType) + COLON
+ httpOperation.getTracingContextSuffix();

metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : EMPTY_STRING;
break;
case TWO_ID_FORMAT:
header = TracingHeaderVersion.V1.getVersion() + COLON
header = TracingHeaderVersion.getCurrentVersion() + COLON
+ clientCorrelationID + COLON + clientRequestId;
metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : EMPTY_STRING;
break;
default:
//case SINGLE_ID_FORMAT
header = TracingHeaderVersion.V1.getVersion() + COLON
header = TracingHeaderVersion.getCurrentVersion() + COLON
+ clientRequestId;
metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : EMPTY_STRING;
}
Expand Down Expand Up @@ -275,17 +275,15 @@ private String getPrimaryRequestIdForHeader(final Boolean isRetry) {
return primaryRequestIdForRetry;
}

private String addFailureReasons(final String header,
final String previousFailure, String retryPolicyAbbreviation) {
if (previousFailure == null) {
return header;
}
if (CONNECTION_TIMEOUT_ABBREVIATION.equals(previousFailure) && retryPolicyAbbreviation != null) {
return String.format("%s_%s_%s", header, previousFailure, retryPolicyAbbreviation);
}
return String.format("%s_%s", header, previousFailure);
}

/**
* Get the retry header string in format retryCount_failureReason_retryPolicyAbbreviation
* retryCount is always there and 0 for first request.
* failureReason is null for first request
* retryPolicyAbbreviation is only present when request fails with ConnectionTimeout
* @param previousFailure Previous failure reason, null if not a retried request
* @param retryPolicyAbbreviation Abbreviation of retry policy used to get retry interval
* @return String representing the retry header
*/
private String getRetryHeader(final String previousFailure, String retryPolicyAbbreviation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc to all newly added methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove the addFailureReasons method- it has no usage now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

String retryHeader = String.format("%d", retryCount);
if (previousFailure == null) {
Expand All @@ -297,6 +295,11 @@ private String getRetryHeader(final String previousFailure, String retryPolicyAb
return String.format("%s_%s", retryHeader, previousFailure);
}

/**
* Get the operation specific header for the current operation type.
* @param opType The operation type for which the header is needed
* @return String representing the operation specific header
*/
private String getOperationSpecificHeader(FSOperationType opType) {
// Similar header can be added for other operations in the future.
switch (opType) {
Expand All @@ -307,6 +310,10 @@ private String getOperationSpecificHeader(FSOperationType opType) {
}
}

/**
* Get the operation specific header for read operations.
* @return String representing the read specific header
*/
private String getReadSpecificHeader() {
// More information on read can be added to this header in the future.
// As underscore separated values.
Expand Down Expand Up @@ -372,14 +379,14 @@ public void setPosition(final String position) {
}
}

/**
* Sets the read type for the current operation.
* @param readType the read type to set, must not be null.
*/
public void setReadType(ReadType readType) {
this.readType = readType;
if (listener != null) {
listener.updateReadType(readType);
}
}

public ReadType getReadType() {
return readType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,55 @@

package org.apache.hadoop.fs.azurebfs.utils;

/**
* Enum representing the version of the tracing header used in Azure Blob File System (ABFS).
* It defines two versions: V0 and V1, with their respective field counts.
* Any changes to the tracing header should introduce a new version so that every
* version has a fixed predefined schema of fields.
*/
public enum TracingHeaderVersion {

/**
* Version 0 of the tracing header, which has no version prefix and contains 8 permanent and a few optional fields.
* This is the initial version of the tracing header.
*/
V0("", 8),
/**
* Version 1 of the tracing header, which includes a version prefix and has 13 permanent fields.
* This version is used for the current tracing header schema.
* Schema: version:clientCorrelationId:clientRequestId:fileSystemId
* :primaryRequestId:streamId:opType:retryHeader:ingressHandler
* :position:operatedBlobCount:operationSpecificHeader:httpOperationHeader
*/
V1("v1", 13);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the next versions would be V1.1/V1.2- so should we consider starting with V1.0/V1.1?
And with the version updates- would we update the version field in V1 only or new V1.1 enum?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So every time we add a new header, we need to add a new version ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have simple version strings like v0, v1, v2 and so on. This will help reduce char count in clientReqId.

With any new changes in the schema of Tracing Header (add/delete/rearrange) we need to bump up version and update the schema and getCurrentVersion method to return the latest version.


private final String version;
private final String versionString;
private final int fieldCount;

TracingHeaderVersion(String version, int fieldCount) {
this.version = version;
TracingHeaderVersion(String versionString, int fieldCount) {
this.versionString = versionString;
this.fieldCount = fieldCount;
}

@Override
public String toString() {
return version;
return versionString;
}

/**
* Returns the latest version of the tracing header. Any changes done to the
* schema of tracing context header should be accompanied by a version bump.
* @return the latest version of the tracing header.
*/
public static TracingHeaderVersion getCurrentVersion() {
return V1;
Copy link
Contributor

@anmolanmol1234 anmolanmol1234 Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be updated everytime a new version is introduced, can it be dynamically fetched ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update it to the latest version every time we do a version upgrade.

}

public int getFieldCount() {
return V1.fieldCount;
return fieldCount;
}

public String getVersion() {
return V1.version;
public String getVersionString() {
return versionString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(), Mockito.anyString());
tracingContext.constructHeader(abfsHttpOperation, null, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
String header = tracingContext.getHeader();
String assertionPrimaryId = header.split(":")[3];
String assertionPrimaryId = header.split(COLON)[3];

tracingContext.setRetryCount(1);
tracingContext.setListener(new TracingHeaderValidator(
Expand All @@ -277,7 +277,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin

tracingContext.constructHeader(abfsHttpOperation, READ_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
header = tracingContext.getHeader();
String primaryRequestId = header.split(":")[3];
String primaryRequestId = header.split(COLON)[3];

Assertions.assertThat(primaryRequestId)
.describedAs("PrimaryRequestId in a retried request's tracingContext "
Expand Down Expand Up @@ -329,7 +329,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin
}

private void checkHeaderForRetryPolicyAbbreviation(String header, String expectedFailureReason, String expectedRetryPolicyAbbreviation) {
String[] headerContents = header.split(":", SPLIT_NO_LIMIT);
String[] headerContents = header.split(COLON, SPLIT_NO_LIMIT);
String previousReqContext = headerContents[7];

if (expectedFailureReason != null) {
Expand Down
Loading