Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
Expand Down Expand Up @@ -1875,7 +1876,7 @@ private long extractContentLength(AbfsHttpOperation op) {
long contentLength;
String contentLengthHeader = op.getResponseHeader(
HttpHeaderConfigurations.CONTENT_LENGTH);
if (!Strings.isNullOrEmpty(contentLengthHeader)) {
if (!StringUtils.isEmpty(contentLengthHeader)) {
contentLength = Long.parseLong(contentLengthHeader);
} else {
contentLength = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public final class FileSystemConfigurations {

public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;

public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = false;
public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true;

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileAlreadyExistsException;
Expand Down Expand Up @@ -754,6 +755,14 @@ public AbfsClientRenameResult renamePath(
throw e;
}

// ref: HADOOP-19393. Write permission checks can occur before validating
// rename operation's validity. If there is an existing destination path, it may be rejected
// with an authorization error. Catching and throwing FileAlreadyExistsException instead.
if (op.getResult().getStorageErrorCode()
.equals(UNAUTHORIZED_BLOB_OVERWRITE.getErrorCode())){
throw new FileAlreadyExistsException(ERR_FILE_ALREADY_EXISTS);
}

// ref: HADOOP-18242. Rename failure occurring due to a rare case of
// tracking metadata being in incomplete state.
if (op.getResult().getStorageErrorCode()
Expand Down Expand Up @@ -1638,7 +1647,8 @@ private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsPro
*
* @return client transaction id
*/
private String addClientTransactionIdToHeader(List<AbfsHttpHeader> requestHeaders) {
@VisibleForTesting
public String addClientTransactionIdToHeader(List<AbfsHttpHeader> requestHeaders) {
String clientTransactionId = null;
// Set client transaction ID if the namespace and client transaction ID config are enabled.
if (getIsNamespaceEnabled() && getAbfsConfiguration().getIsClientTransactionIdEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,6 +48,7 @@
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
Expand All @@ -71,6 +73,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
Expand Down Expand Up @@ -757,12 +760,30 @@ protected void assumeRecoveryThroughClientTransactionID(
if (isCreate) {
// Assume that create client is DFS client.
Assume.assumeTrue(
fs.getAbfsStore().getClientHandler().getIngressClient()
instanceof AbfsDfsClient);
AbfsServiceType.DFS.equals(
fs.getAbfsStore().getAbfsConfiguration().getIngressServiceType()));
// Assume that append blob is not enabled in DFS client.
Assume.assumeTrue(
StringUtils.isEmpty(
fs.getAbfsStore().getAbfsConfiguration().getAppendBlobDirs()));
Assume.assumeFalse(isAppendBlobEnabled());
}
}

/**
* Mocks the behavior of adding a client transaction ID to the request headers
* for the given AzureBlobFileSystem. This method generates a random transaction ID
* and adds it to the headers of the {@link AbfsDfsClient}.
*
* @param abfsDfsClient The {@link AbfsDfsClient} mocked AbfsDfsClient.
* @param clientTransactionId An array to hold the generated transaction ID.
*/
protected void mockAddClientTransactionIdToHeader(AbfsDfsClient abfsDfsClient,
String[] clientTransactionId) {
Mockito.doAnswer( addClientTransactionId -> {
clientTransactionId[0] = UUID.randomUUID().toString();
List<AbfsHttpHeader> headers = addClientTransactionId.getArgument(0);
headers.add(
new AbfsHttpHeader(X_MS_CLIENT_TRANSACTION_ID,
clientTransactionId[0]));
return clientTransactionId[0];
}).when(abfsDfsClient).addClientTransactionIdToHeader(Mockito.anyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_MKDIR_OVERWRITE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE;
Expand Down Expand Up @@ -201,13 +203,7 @@ public void createPathRetryIdempotency() throws Exception {
configuration.set(FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID, "true");
try (AzureBlobFileSystem fs = getFileSystem(configuration)) {
assumeRecoveryThroughClientTransactionID(fs, true);
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsDfsClient abfsClient = (AbfsDfsClient) Mockito.spy(
clientHandler.getClient());
fs.getAbfsStore().setClient(abfsClient);
fs.getAbfsStore().setClientHandler(clientHandler);
Mockito.doReturn(abfsClient).when(clientHandler).getIngressClient();
AbfsDfsClient abfsClient = mockIngressClientHandler(fs);
final Path nonOverwriteFile = new Path(
"/NonOverwriteTest_FileName_" + UUID.randomUUID());
final List<AbfsHttpHeader> headers = new ArrayList<>();
Expand All @@ -222,14 +218,14 @@ public void answer(final AbfsRestOperation mockedObj,
if (count == 0) {
count = 1;
AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
Mockito.doReturn("PUT").when(op).getMethod();
Mockito.doReturn("").when(op).getStorageErrorMessage();
Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod();
Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage();
Mockito.doReturn(true).when(mockedObj).hasResult();
Mockito.doReturn(op).when(mockedObj).getResult();
Mockito.doReturn(HTTP_CONFLICT).when(op).getStatusCode();
headers.addAll(mockedObj.getRequestHeaders());
throw new AbfsRestOperationException(HTTP_CONFLICT,
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), "",
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), EMPTY_STRING,
null, op);
}
}
Expand Down Expand Up @@ -273,7 +269,9 @@ public void answer(final AbfsRestOperation mockedObj,
public void getClientTransactionIdAfterCreate() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
assumeRecoveryThroughClientTransactionID(fs, true);
AbfsDfsClient abfsDfsClient = (AbfsDfsClient) fs.getAbfsClient();
final String[] clientTransactionId = new String[1];
AbfsDfsClient abfsDfsClient = mockIngressClientHandler(fs);
mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
final Path nonOverwriteFile = new Path(
"/NonOverwriteTest_FileName_" + UUID.randomUUID());
fs.create(nonOverwriteFile, false);
Expand All @@ -285,6 +283,10 @@ public void getClientTransactionIdAfterCreate() throws Exception {
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction ID should be set during create")
.isNotNull();
Assertions.assertThat(
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction ID should be equal to the one set in the header")
.isEqualTo(clientTransactionId[0]);
}
}

Expand All @@ -304,12 +306,14 @@ public void getClientTransactionIdAfterCreate() throws Exception {
public void testClientTransactionIdAfterTwoCreateCalls() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
assumeRecoveryThroughClientTransactionID(fs, true);
AbfsDfsClient abfsDfsClient = (AbfsDfsClient) fs.getAbfsClient();
final String[] clientTransactionId = new String[1];
AbfsDfsClient abfsDfsClient = mockIngressClientHandler(fs);
mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
Path testPath = path("testfile");
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.create(testPath, false);
fs.create(testPath, false); //5ff449d1-b5d2-478c-9722-8e26ebb5501e
fs.create(testPath, true);
final AbfsHttpOperation getPathStatusOp =
abfsDfsClient.getPathStatus(testPath.toUri().getPath(), false,
Expand All @@ -318,6 +322,28 @@ public void testClientTransactionIdAfterTwoCreateCalls() throws Exception {
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction ID should be set during create")
.isNotNull();
Assertions.assertThat(
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction ID should be equal to the one set in the header")
.isEqualTo(clientTransactionId[0]);
}
}

/**
* Mocks and returns an instance of {@link AbfsDfsClient} for the given AzureBlobFileSystem.
* This method sets up the necessary mock behavior for the client handler and ingress client.
*
* @param fs The {@link AzureBlobFileSystem} instance for which the client handler will be mocked.
* @return A mocked {@link AbfsDfsClient} instance associated with the provided file system.
*/
private AbfsDfsClient mockIngressClientHandler(AzureBlobFileSystem fs) {
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsDfsClient abfsDfsClient = (AbfsDfsClient) Mockito.spy(
clientHandler.getClient());
fs.getAbfsStore().setClient(abfsDfsClient);
fs.getAbfsStore().setClientHandler(clientHandler);
Mockito.doReturn(abfsDfsClient).when(clientHandler).getIngressClient();
return abfsDfsClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_PENDING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
Expand Down Expand Up @@ -1683,16 +1685,16 @@ public void answer(final AbfsRestOperation mockedObj,
if (count == 0) {
count = 1;
AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
Mockito.doReturn("PUT").when(op).getMethod();
Mockito.doReturn("").when(op).getStorageErrorMessage();
Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod();
Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage();
Mockito.doReturn(SOURCE_PATH_NOT_FOUND.getErrorCode()).when(op)
.getStorageErrorCode();
Mockito.doReturn(true).when(mockedObj).hasResult();
Mockito.doReturn(op).when(mockedObj).getResult();
Mockito.doReturn(HTTP_NOT_FOUND).when(op).getStatusCode();
headers.addAll(mockedObj.getRequestHeaders());
throw new AbfsRestOperationException(HTTP_NOT_FOUND,
SOURCE_PATH_NOT_FOUND.getErrorCode(), "", null, op);
SOURCE_PATH_NOT_FOUND.getErrorCode(), EMPTY_STRING, null, op);
}
}
});
Expand Down Expand Up @@ -1738,7 +1740,10 @@ public void answer(final AbfsRestOperation mockedObj,
public void getClientTransactionIdAfterRename() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
assumeRecoveryThroughClientTransactionID(fs, false);
AbfsDfsClient abfsDfsClient = (AbfsDfsClient) fs.getAbfsClient();
AbfsDfsClient abfsDfsClient = (AbfsDfsClient) Mockito.spy(fs.getAbfsClient());
fs.getAbfsStore().setClient(abfsDfsClient);
final String[] clientTransactionId = new String[1];
mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
Path sourceDir = path("/testSrc");
assertMkdirs(fs, sourceDir);
String filename = "file1";
Expand All @@ -1754,6 +1759,10 @@ public void getClientTransactionIdAfterRename() throws Exception {
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction id should be present in dest file")
.isNotNull();
Assertions.assertThat(
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction ID should be equal to the one set in the header")
.isEqualTo(clientTransactionId[0]);
}
}
}