Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,11 @@ public final class HttpHeaderConfigurations {
*/
public static final String X_MS_BLOB_CONTENT_MD5 = "x-ms-blob-content-md5";

/**
* Http Request Header for create rename idempotence.
* {@value}
*/
public static final String X_MS_CLIENT_TRANSACTION_ID = "x-ms-client-transaction-id";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,7 @@ AbfsRestOperation getAbfsRestOperation(final AbfsRestOperationType operationType
* @param requestHeaders The list of HTTP headers for the request.
* @return An AbfsRestOperation instance.
*/
@VisibleForTesting
AbfsRestOperation getAbfsRestOperation(final AbfsRestOperationType operationType,
final String httpMethod,
final URL url,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RANGE_GET_CONTENT_MD5;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RENAME_SOURCE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_FS_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOBTYPE;
Expand Down Expand Up @@ -353,6 +354,9 @@ public AbfsRestOperation createPath(final String path,
permissions.getPermission()));
}

final String clientTransactionId = UUID.randomUUID().toString();
requestHeaders.add(new AbfsHttpHeader(X_MS_CLIENT_TRANSACTION_ID, clientTransactionId));

if (permissions.hasUmask()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK,
permissions.getUmask()));
Expand Down Expand Up @@ -391,6 +395,13 @@ public AbfsRestOperation createPath(final String path,
return op; //don't throw ex on mkdirs for existing directory
}
}
if (isFile && op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
final AbfsHttpOperation getPathStatusOp =
getPathStatus(path, false, tracingContext, contextEncryptionAdapter).getResult();
if (clientTransactionId.equals(getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))) {
return op;
}
}
throw ex;
}
return op;
Expand Down Expand Up @@ -576,6 +587,9 @@ public AbfsClientRenameResult renamePath(
requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource));
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));

final String clientTransactionId = UUID.randomUUID().toString();
requestHeaders.add(new AbfsHttpHeader(X_MS_CLIENT_TRANSACTION_ID, clientTransactionId));

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
appendSASTokenToQuery(destination,
Expand All @@ -600,6 +614,22 @@ public AbfsClientRenameResult renamePath(
throw e;
}

if (SOURCE_PATH_NOT_FOUND.getErrorCode()
.equalsIgnoreCase(op.getResult().getStorageErrorCode())) {
try {
final AbfsHttpOperation abfsHttpOperation = getPathStatus(destination,
false, tracingContext, null).getResult();
if (clientTransactionId.equals(
abfsHttpOperation.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))) {
return new AbfsClientRenameResult(op, true,
isMetadataIncompleteState);
}
} catch (AbfsRestOperationException ignored) {
// In case of get path status failure, we will throw the original exception.
}
throw e;
}

// ref: HADOOP-18242. Rename failure occurring due to a rare case of
// tracking metadata being in incomplete state.
if (op.getResult().getStorageErrorCode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ public void testAbfsHttpSendStatistics() throws IOException {
* B. If config "fs.azure.enable.conditional.create.overwrite" is enabled,
* create overwrite=false (will fail in this case as file is indeed present)
* + getFileStatus to fetch the file ETag
* + getFileStatus to fetch transaction id
* + create overwrite=true
* = 3 connections and 2 send requests
* = 4 connections and 2 send requests
*/
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
expectedConnectionsMade += 3;
expectedConnectionsMade += 4;
expectedRequestsSent += 2;
} else {
expectedConnectionsMade += 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
import java.io.FilterOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;

import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
Expand All @@ -47,6 +52,8 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;

Expand All @@ -57,6 +64,7 @@
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
Expand Down Expand Up @@ -273,6 +281,55 @@ public void testDefaultCreateOverwriteFileTest() throws Throwable {
testCreateFileOverwrite(false);
}

@Test
public void createPathRetryIdempotency() throws Exception {
final AzureBlobFileSystem currentFs = getFileSystem();
Configuration config = new Configuration(this.getRawConfiguration());
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), config);
AbfsClient abfsClient = Mockito.spy(fs.getAbfsClient());
fs.getAbfsStore().setClient(abfsClient);
final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_" + UUID.randomUUID());
final List<AbfsHttpHeader> headers = new ArrayList<>();
TestAbfsClient.mockAbfsOperationCreation(abfsClient, new MockIntercept<AbfsRestOperation>() {
private int count = 0;
@Override
public void answer(final AbfsRestOperation mockedObj,
final InvocationOnMock answer) throws AbfsRestOperationException {
if (count == 0) {
count = 1;
AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
Mockito.doReturn("PUT").when(op).getMethod();
Mockito.doReturn("").when(op).getStorageErrorMessage();
Mockito.doReturn(true).when(mockedObj).hasResult();
Mockito.doReturn(op).when(mockedObj).getResult();
Mockito.doReturn(HTTP_CONFLICT).when(op).getStatusCode();
headers.addAll(mockedObj.getRequestHeaders());
throw new AbfsRestOperationException(HTTP_CONFLICT, AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
"", null, op);
}
}
});
AbfsRestOperation getPathRestOp = Mockito.mock(AbfsRestOperation.class);
AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
Mockito.doAnswer(answer -> {
String requiredHeader = null;
for (AbfsHttpHeader httpHeader : headers) {
if (X_MS_CLIENT_TRANSACTION_ID.equalsIgnoreCase(httpHeader.getName())) {
requiredHeader = httpHeader.getValue();
break;
}
}
return requiredHeader;
}).when(op).getResponseHeader(X_MS_CLIENT_TRANSACTION_ID);
Mockito.doReturn(true).when(getPathRestOp).hasResult();
Mockito.doReturn(op).when(getPathRestOp).getResult();
Mockito.doReturn(getPathRestOp).when(abfsClient).getPathStatus(
nullable(String.class), nullable(Boolean.class),
nullable(TracingContext.class), nullable(ContextEncryptionAdapter.class));

fs.create(nonOverwriteFile, false);
}

public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
throws Throwable {
final AzureBlobFileSystem currentFs = getFileSystem();
Expand Down Expand Up @@ -311,8 +368,10 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
() -> fs.create(nonOverwriteFile, false));
fs.registerListener(null);

// One request to server to create path should be issued
createRequestCount++;
// Two request to server to create path should be issued
// 1. create
// 2. GetFileStatus to get transaction id
createRequestCount += 2;

assertAbfsStatistics(
CONNECTIONS_MADE,
Expand Down Expand Up @@ -342,11 +401,12 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
fs.registerListener(null);

if (enableConditionalCreateOverwrite) {
// Three requests will be sent to server to create path,
// Four requests will be sent to server to create path,
// 1. create without overwrite
// 2. GetFileStatus to get eTag
// 3. create with overwrite
createRequestCount += 3;
// 4. GetFileStatus to get transaction id
createRequestCount += 4;
} else {
createRequestCount++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,38 @@

import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RESOURCE_TYPE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.mockito.ArgumentMatchers.nullable;

/**
* Test rename operation.
Expand Down Expand Up @@ -198,4 +216,63 @@ public void testRenameWithNoDestinationParentDir() throws Exception {
+ "incomplete state failure is hit")
.isEqualTo(2);
}

@Test
public void renamePathRetryIdempotency() throws Exception {
final AzureBlobFileSystem currentFs = getFileSystem();
Configuration config = new Configuration(this.getRawConfiguration());
final AzureBlobFileSystem fs =
(AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
config);
AbfsClient abfsClient = Mockito.spy(fs.getAbfsClient());
fs.getAbfsStore().setClient(abfsClient);
Path sourceDir = path("/testSrc");
assertMkdirs(fs, sourceDir);
String filename = "file1";
Path sourceFilePath = new Path(sourceDir, filename);
touch(sourceFilePath);
Path destFilePath = new Path(sourceDir, "file2");
final List<AbfsHttpHeader> headers = new ArrayList<>();
TestAbfsClient.mockAbfsOperationCreation(abfsClient,
new MockIntercept<AbfsRestOperation>() {
private int count = 0;
@Override
public void answer(final AbfsRestOperation mockedObj,
final InvocationOnMock answer) throws AbfsRestOperationException {
if (count == 0) {
count = 1;
AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
Mockito.doReturn("PUT").when(op).getMethod();
Mockito.doReturn("").when(op).getStorageErrorMessage();
Mockito.doReturn(SOURCE_PATH_NOT_FOUND.getErrorCode()).when(op)
.getStorageErrorCode();
Mockito.doReturn(true).when(mockedObj).hasResult();
Mockito.doReturn(op).when(mockedObj).getResult();
Mockito.doReturn(HTTP_NOT_FOUND).when(op).getStatusCode();
headers.addAll(mockedObj.getRequestHeaders());
throw new AbfsRestOperationException(HTTP_NOT_FOUND, SOURCE_PATH_NOT_FOUND.getErrorCode(),
"", null, op);
}
}
});
AbfsRestOperation getPathRestOp = Mockito.mock(AbfsRestOperation.class);
AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
Mockito.doAnswer(answer -> {
String requiredHeader = null;
for (AbfsHttpHeader httpHeader : headers) {
if (X_MS_CLIENT_TRANSACTION_ID.equalsIgnoreCase(httpHeader.getName())) {
requiredHeader = httpHeader.getValue();
break;
}
}
return requiredHeader;
}).when(op).getResponseHeader(X_MS_CLIENT_TRANSACTION_ID);
Mockito.doReturn(true).when(getPathRestOp).hasResult();
Mockito.doReturn(op).when(getPathRestOp).getResult();
Mockito.doReturn(DIRECTORY).when(op).getResponseHeader(X_MS_RESOURCE_TYPE);
Mockito.doReturn(getPathRestOp).when(abfsClient).getPathStatus(
nullable(String.class), nullable(Boolean.class),
nullable(TracingContext.class), nullable(ContextEncryptionAdapter.class));
fs.rename(sourceFilePath, destFilePath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs;

import org.mockito.invocation.InvocationOnMock;

import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;

public interface MockIntercept<T> {
void answer(T mockedObj, InvocationOnMock answer) throws AbfsRestOperationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.net.URL;
import java.util.Map;

import org.apache.hadoop.fs.azurebfs.MockIntercept;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -37,6 +38,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsClient.ABFS_CLIENT_TIMER_THREAD_NAME;
import static org.mockito.ArgumentMatchers.any;

/**
* Unit test cases for the AbfsClient class.
Expand Down Expand Up @@ -138,4 +140,26 @@ private boolean isThreadRunning(String threadName) {
}
return false;
}

public static void mockAbfsOperationCreation(final AbfsClient abfsClient,
final MockIntercept mockIntercept) throws Exception {
Mockito.doAnswer(answer -> {
AbfsRestOperation op = Mockito.spy(
new AbfsRestOperation(
answer.getArgument(0),
abfsClient,
answer.getArgument(1),
answer.getArgument(2),
answer.getArgument(3),
abfsClient.getAbfsConfiguration()
));
Mockito.doAnswer((answer1) -> {
mockIntercept.answer(op, answer1);
return null;
}).when(op)
.execute(any());
return op;
}).when(abfsClient)
.getAbfsRestOperation(any(), any(), any(), any());
}
}
Loading
Loading