Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,19 @@ public AbfsRestOperation renamePath(String source, final String destination, fin
url,
requestHeaders);
Instant renameRequestStartTime = Instant.now();
op.execute();

if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
return renameIdempotencyCheckOp(renameRequestStartTime, op, destination);
try {
op.execute();
} catch (AzureBlobFileSystemException e) {
final AbfsRestOperation idempotencyOp = renameIdempotencyCheckOp(
renameRequestStartTime, op, destination);
if (idempotencyOp.getResult().getStatusCode()
== op.getResult().getStatusCode()) {
// idempotency did not return different result
// throw back the exception
throw e;
} else {
return idempotencyOp;
}
}

return op;
Expand Down Expand Up @@ -369,14 +378,21 @@ public AbfsRestOperation renameIdempotencyCheckOp(
// exists. Check on destination status and if it has a recent LMT timestamp.
// If yes, return success, else fall back to original rename request failure response.

final AbfsRestOperation destStatusOp = getPathStatus(destination, false);
if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) {
String lmt = destStatusOp.getResult().getResponseHeader(
HttpHeaderConfigurations.LAST_MODIFIED);

if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
return destStatusOp;
try {
final AbfsRestOperation destStatusOp = getPathStatus(destination,
false);
if (destStatusOp.getResult().getStatusCode()
== HttpURLConnection.HTTP_OK) {
String lmt = destStatusOp.getResult().getResponseHeader(
HttpHeaderConfigurations.LAST_MODIFIED);

if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
return destStatusOp;
}
}
} catch (AzureBlobFileSystemException e) {
// GetFileStatus on the destination failed, return original op
return op;
}
}

Expand Down Expand Up @@ -570,10 +586,18 @@ public AbfsRestOperation deletePath(final String path, final boolean recursive,
HTTP_METHOD_DELETE,
url,
requestHeaders);
try {
op.execute();

if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
return deleteIdempotencyCheckOp(op);
} catch (AzureBlobFileSystemException e) {
final AbfsRestOperation idempotencyOp = deleteIdempotencyCheckOp(op);
if (idempotencyOp.getResult().getStatusCode()
== op.getResult().getStatusCode()) {
// idempotency did not return different result
// throw back the exception
throw e;
} else {
return idempotencyOp;
}
}

return op;
Expand Down Expand Up @@ -822,7 +846,8 @@ private URL createRequestUrl(final String query) throws AzureBlobFileSystemExcep
return createRequestUrl(EMPTY_STRING, query);
}

private URL createRequestUrl(final String path, final String query)
@VisibleForTesting
protected URL createRequestUrl(final String path, final String query)
throws AzureBlobFileSystemException {
final String base = baseUrl.toString();
String encodedPath = path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.net.UnknownHostException;
import java.util.List;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -170,7 +171,8 @@ String getSasToken() {
* Executes the REST operation with retry, by issuing one or more
* HTTP operations.
*/
void execute() throws AzureBlobFileSystemException {
@VisibleForTesting
public void execute() throws AzureBlobFileSystemException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tag this as @VisibleForTesting too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// see if we have latency reports from the previous requests
String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency();
if (latencyHeader != null && !latencyHeader.isEmpty()) {
Expand All @@ -181,8 +183,9 @@ void execute() throws AzureBlobFileSystemException {

retryCount = 0;
LOG.debug("First execution of REST operation - {}", operationType);
while (!executeHttpOperation(retryCount++)) {
while (!executeHttpOperation(retryCount)) {
try {
++retryCount;
LOG.debug("Retrying REST operation {}. RetryCount = {}",
operationType, retryCount);
Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.junit.Assume;
import org.junit.Test;

import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
Expand All @@ -38,9 +39,12 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -168,7 +172,8 @@ public void testDeleteIdempotency() throws Exception {
// Set retryCount to non-zero
when(op.isARetriedRequest()).thenReturn(true);

// Mock instance of Http Operation response. This will return HTTP:Not Found
// Case 1: Mock instance of Http Operation response. This will return
// HTTP:Not Found
AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);

Expand All @@ -181,6 +186,64 @@ public void testDeleteIdempotency() throws Exception {
.describedAs(
"Delete is considered idempotent by default and should return success.")
.isEqualTo(HTTP_OK);

// Case 2: Mock instance of Http Operation response. This will return
// HTTP:Bad Request
AbfsHttpOperation http400Op = mock(AbfsHttpOperation.class);
when(http400Op.getStatusCode()).thenReturn(HTTP_BAD_REQUEST);

// Mock delete response to 400
when(op.getResult()).thenReturn(http400Op);

Assertions.assertThat(testClient.deleteIdempotencyCheckOp(op)
.getResult()
.getStatusCode())
.describedAs(
"Idempotency check to happen only for HTTP 404 response.")
.isEqualTo(HTTP_BAD_REQUEST);

}

@Test
public void testDeleteIdempotencyTriggerHttp404() throws Exception {

final AzureBlobFileSystem fs = getFileSystem();
AbfsClient client = TestAbfsClient.createTestClientFromCurrentContext(
fs.getAbfsStore().getClient(),
this.getConfiguration());

// Case 1: Not a retried case should throw error back
intercept(AbfsRestOperationException.class,
() -> client.deletePath(
"/NonExistingPath",
false,
null));

// mock idempotency check to mimic retried case
AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
fs.getAbfsStore().getClient(),
this.getConfiguration());

// Case 2: Mimic retried case
// Idempotency check on Delete always returns success
AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
when(idempotencyRetOp.getResult()).thenReturn(http200Op);

doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
when(mockClient.deletePath("/NonExistingPath", false,
null)).thenCallRealMethod();

Assertions.assertThat(mockClient.deletePath(
"/NonExistingPath",
false,
null)
.getResult()
.getStatusCode())
.describedAs("Idempotency check reports successful "
+ "delete. 200OK should be returned")
.isEqualTo(idempotencyRetOp.getResult().getStatusCode());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.junit.Test;
import org.junit.Assert;

import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
Expand All @@ -42,6 +43,8 @@
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.UUID.randomUUID;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -51,6 +54,8 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;

import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
* Test rename operation.
*/
Expand All @@ -77,6 +82,16 @@ public void testEnsureFileIsRenamed() throws Exception {
assertPathDoesNotExist(fs, "expected renamed", src);
}

@Test
public void testRenameWithPreExistingDestination() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path src = path("renameSrc");
touch(src);
Path dest = path("renameDest");
touch(dest);
assertRenameOutcome(fs, src, dest, false);
}

@Test
public void testRenameFileUnderDir() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Expand Down Expand Up @@ -197,6 +212,59 @@ public void testRenameRetryFailureWithDestOldLMT() throws Exception {
+ "TimespanForIdentifyingRecentOperationThroughLMT.");
}

@Test
public void testRenameIdempotencyTriggerHttpNotFound() throws Exception {
AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);

AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
when(http200Op.getStatusCode()).thenReturn(HTTP_OK);

// Check 1 where idempotency check fails to find dest path
// Rename should throw exception
testRenameIdempotencyTriggerChecks(http404Op);

// Check 2 where idempotency check finds the dest path
// Renam will be successful
testRenameIdempotencyTriggerChecks(http200Op);
}

private void testRenameIdempotencyTriggerChecks(
AbfsHttpOperation idempotencyRetHttpOp) throws Exception {

final AzureBlobFileSystem fs = getFileSystem();
AbfsClient client = TestAbfsClient.getMockAbfsClient(
fs.getAbfsStore().getClient(),
this.getConfiguration());

AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
when(idempotencyRetOp.getResult()).thenReturn(idempotencyRetHttpOp);
doReturn(idempotencyRetOp).when(client).renameIdempotencyCheckOp(any(),
any(), any());
when(client.renamePath(any(), any(), any())).thenCallRealMethod();

// rename on non-existing source file will trigger idempotency check
if (idempotencyRetHttpOp.getStatusCode() == HTTP_OK) {
// idempotency check found that destination exists and is recently created
Assertions.assertThat(client.renamePath(
"/NonExistingsourcepath",
"/destpath",
null)
.getResult()
.getStatusCode())
.describedAs("Idempotency check reports recent successful "
+ "rename. 200OK should be returned")
.isEqualTo(idempotencyRetOp.getResult().getStatusCode());
} else {
// rename dest not found. Original exception should be returned.
intercept(AbfsRestOperationException.class,
() -> client.renamePath(
"/NonExistingsourcepath",
"/destpath",
""));
}
}

private void testRenameTimeout(
int renameRequestStatus,
int renameIdempotencyCheckStatus,
Expand Down
Loading