Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME)
private long maxApacheHttpClientConnectionIdleTime;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID,
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID)
private boolean enableClientTransactionId;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1077,6 +1081,10 @@ public long getMaxApacheHttpClientConnectionIdleTime() {
return maxApacheHttpClientConnectionIdleTime;
}

public boolean getIsClientTransactionIdEnabled() {
return enableClientTransactionId;
}

/**
* Enum config to allow user to pick format of x-ms-client-request-id header
* @return tracingContextFormat config if valid, else default ALL_ID_FORMAT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
Expand Down Expand Up @@ -147,7 +148,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
Expand Down Expand Up @@ -1876,7 +1876,7 @@ private long extractContentLength(AbfsHttpOperation op) {
long contentLength;
String contentLengthHeader = op.getResponseHeader(
HttpHeaderConfigurations.CONTENT_LENGTH);
if (!contentLengthHeader.equals(EMPTY_STRING)) {
if (!StringUtils.isEmpty(contentLengthHeader)) {
contentLength = Long.parseLong(contentLengthHeader);
} else {
contentLength = 0;
Expand Down Expand Up @@ -2161,6 +2161,11 @@ void setClient(AbfsClient client) {
this.client = client;
}

@VisibleForTesting
void setClientHandler(AbfsClientHandler clientHandler) {
this.clientHandler = clientHandler;
}

@VisibleForTesting
DataBlocks.BlockFactory getBlockFactory() {
return blockFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public enum ApiVersion {

DEC_12_2019("2019-12-12"),
APR_10_2021("2021-04-10"),
AUG_03_2023("2023-08-03");
AUG_03_2023("2023-08-03"),
NOV_04_2024("2024-11-04");

private final String xMsApiVersion;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ public static String accountProperty(String property, String account) {
public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = "fs.azure.blob.dir.rename.max.thread";
/**Maximum number of thread per blob-delete orchestration: {@value}*/
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread";
/**Flag to enable/disable sending client transactional ID during create/rename operations: {@value}*/
public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = "fs.azure.enable.client.transaction.id";

private ConfigurationKeys() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,7 @@ public final class FileSystemConfigurations {

public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;

public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true;

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,11 @@ public final class HttpHeaderConfigurations {
*/
public static final String X_MS_COPY_STATUS = "x-ms-copy-status";

/**
* Http Request Header for create rename idempotence.
* {@value}
*/
public static final String X_MS_CLIENT_TRANSACTION_ID = "x-ms-client-transaction-id";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ private AbfsClient(final URL baseUrl,
encryptionType = EncryptionType.GLOBAL_KEY;
}

// Version update needed to support x-ms-client-transaction-id header
if (abfsConfiguration.getIsClientTransactionIdEnabled()) {
xMsVersion = ApiVersion.NOV_04_2024;
}

String sslProviderName = null;

if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileAlreadyExistsException;
Expand Down Expand Up @@ -109,6 +110,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_EXISTING_RESOURCE_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_BREAK_PERIOD;
Expand Down Expand Up @@ -383,6 +385,9 @@ public AbfsRestOperation createPath(final String path,
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
}

// Add the client transaction ID to the request headers.
String clientTransactionId = addClientTransactionIdToHeader(requestHeaders);

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
if (isAppendBlob) {
Expand All @@ -405,11 +410,32 @@ public AbfsRestOperation createPath(final String path,
if (!op.hasResult()) {
throw ex;
}
if (!isFile && op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
String existingResource =
op.getResult().getResponseHeader(X_MS_EXISTING_RESOURCE_TYPE);
if (existingResource != null && existingResource.equals(DIRECTORY)) {
return op; //don't throw ex on mkdirs for existing directory
if (!isFile) {
if (op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
String existingResource =
op.getResult().getResponseHeader(X_MS_EXISTING_RESOURCE_TYPE);
if (existingResource != null && existingResource.equals(DIRECTORY)) {
return op; //don't throw ex on mkdirs for existing directory
}
}
} else {
// recovery using client transaction id only if it is a retried request.
if (op.isARetriedRequest() && clientTransactionId != null
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT
|| op.getResult().getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED)) {
try {
final AbfsHttpOperation getPathStatusOp =
getPathStatus(path, false,
tracingContext, null).getResult();
if (clientTransactionId.equals(
getPathStatusOp.getResponseHeader(
X_MS_CLIENT_TRANSACTION_ID))) {
return op;
}
} catch (AzureBlobFileSystemException ignored) {
// In case of get path status failure,
// we will throw the original exception.
}
}
}
throw ex;
Expand Down Expand Up @@ -681,6 +707,9 @@ public AbfsClientRenameResult renamePath(
requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource));
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));

// Add the client transaction ID to the request headers.
String clientTransactionId = addClientTransactionIdToHeader(requestHeaders);

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
appendSASTokenToQuery(destination,
Expand All @@ -705,6 +734,27 @@ public AbfsClientRenameResult renamePath(
throw e;
}

// recovery using client transaction id only if it is a retried request.
if (op.isARetriedRequest() && clientTransactionId != null
&& SOURCE_PATH_NOT_FOUND.getErrorCode().equalsIgnoreCase(
op.getResult().getStorageErrorCode())) {
try {
final AbfsHttpOperation abfsHttpOperation =
getPathStatus(destination, false,
tracingContext, null).getResult();
if (clientTransactionId.equals(
abfsHttpOperation.getResponseHeader(
X_MS_CLIENT_TRANSACTION_ID))) {
return new AbfsClientRenameResult(op, true,
isMetadataIncompleteState);
}
} catch (AbfsRestOperationException ignored) {
// In case of get path status failure,
// we will throw the original exception.
}
throw e;
}

// ref: HADOOP-19393. Write permission checks can occur before validating
// rename operation's validity. If there is an existing destination path, it may be rejected
// with an authorization error. Catching and throwing FileAlreadyExistsException instead.
Expand Down Expand Up @@ -1589,4 +1639,23 @@ private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsPro

return properties;
}

/**
* Add the client transaction id to the request header
* if {@link AbfsConfiguration#getIsClientTransactionIdEnabled()} is enabled.
* @param requestHeaders list of headers to be sent with the request
*
* @return client transaction id
*/
@VisibleForTesting
public String addClientTransactionIdToHeader(List<AbfsHttpHeader> requestHeaders) {
String clientTransactionId = null;
// Set client transaction ID if the namespace and client transaction ID config are enabled.
if (getIsNamespaceEnabled() && getAbfsConfiguration().getIsClientTransactionIdEnabled()) {
clientTransactionId = UUID.randomUUID().toString();
requestHeaders.add(
new AbfsHttpHeader(X_MS_CLIENT_TRANSACTION_ID, clientTransactionId));
}
return clientTransactionId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,6 +47,8 @@
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
Expand All @@ -70,6 +73,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
Expand Down Expand Up @@ -733,4 +737,53 @@ protected void checkFuturesForExceptions(List<Future<?>> futures, int exceptionV
}
assertEquals(exceptionCaught, exceptionVal);
}

/**
* Assumes that recovery through client transaction ID is enabled.
* Namespace is enabled for the given AzureBlobFileSystem.
* Service type is DFS.
* Assumes that the client transaction ID is enabled in the configuration.
*
* @param fs the AzureBlobFileSystem instance to check
* @throws AzureBlobFileSystemException in case of an error
*/
protected void assumeRecoveryThroughClientTransactionID(
AzureBlobFileSystem fs, boolean isCreate)
throws AzureBlobFileSystemException {
// Assumes that recovery through client transaction ID is enabled.
Assume.assumeTrue(getConfiguration().getIsClientTransactionIdEnabled());
// Assumes that service type is DFS.
assumeDfsServiceType();
// Assumes that namespace is enabled for the given AzureBlobFileSystem.
Assume.assumeTrue(
fs.getIsNamespaceEnabled(getTestTracingContext(fs, true)));
if (isCreate) {
// Assume that create client is DFS client.
Assume.assumeTrue(
AbfsServiceType.DFS.equals(
fs.getAbfsStore().getAbfsConfiguration().getIngressServiceType()));
// Assume that append blob is not enabled in DFS client.
Assume.assumeFalse(isAppendBlobEnabled());
}
}

/**
* Mocks the behavior of adding a client transaction ID to the request headers
* for the given AzureBlobFileSystem. This method generates a random transaction ID
* and adds it to the headers of the {@link AbfsDfsClient}.
*
* @param abfsDfsClient The {@link AbfsDfsClient} mocked AbfsDfsClient.
* @param clientTransactionId An array to hold the generated transaction ID.
*/
protected void mockAddClientTransactionIdToHeader(AbfsDfsClient abfsDfsClient,
String[] clientTransactionId) {
Mockito.doAnswer( addClientTransactionId -> {
clientTransactionId[0] = UUID.randomUUID().toString();
List<AbfsHttpHeader> headers = addClientTransactionId.getArgument(0);
headers.add(
new AbfsHttpHeader(X_MS_CLIENT_TRANSACTION_ID,
clientTransactionId[0]));
return clientTransactionId[0];
}).when(abfsDfsClient).addClientTransactionIdToHeader(Mockito.anyList());
}
}
Loading