Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ public class AbfsConfiguration{
FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE)
private boolean renameResilience;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION)
private boolean isChecksumValidationEnabled;

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
Expand Down Expand Up @@ -1150,4 +1154,13 @@ public boolean getRenameResilience() {
void setRenameResilience(boolean actualResilience) {
renameResilience = actualResilience;
}

public boolean getIsChecksumValidationEnabled() {
return isChecksumValidationEnabled;
}

@VisibleForTesting
public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) {
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public final class AbfsHttpConstants {
public static final String FORWARD_SLASH_ENCODE = "%2F";
public static final String AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER = "@";
public static final String UTF_8 = "utf-8";
public static final String MD5 = "MD5";
public static final String GMT_TIMEZONE = "GMT";
public static final String APPLICATION_JSON = "application/json";
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ public final class ConfigurationKeys {
/** Add extra resilience to rename failures, at the expense of performance. */
public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";

/** Add extra layer of verification of the integrity of the request content during transport. */
public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation";

public static String accountProperty(String property, String account) {
return property + "." + account;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public final class FileSystemConfigurations {
public static final int STREAM_ID_LEN = 12;
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false;

/**
* Limit of queued block upload operations before writes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
public static final String EXPECT = "Expect";
public static final String X_MS_RANGE_GET_CONTENT_MD5 = "x-ms-range-get-content-md5";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.hadoop.fs.azurebfs.contracts.exceptions;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;

/**
* Exception to wrap invalid checksum verification on client side.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class InvalidChecksumException extends AbfsRestOperationException {

private static final String ERROR_MESSAGE = "Checksum Validation For Read Operation Failed";

public InvalidChecksumException(final Exception innerException) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
innerException != null
? innerException.toString()
: ERROR_MESSAGE,
innerException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidChecksumException;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
Expand Down Expand Up @@ -75,6 +77,7 @@
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
Expand Down Expand Up @@ -761,6 +764,11 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry));
}

// Add MD5 Hash of request content as request header if feature is enabled
if (isChecksumValidationEnabled()) {
addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer);
}

// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
abfsUriQueryBuilder, cachedSasToken);
Expand Down Expand Up @@ -978,10 +986,17 @@ public AbfsRestOperation read(final String path, final long position, final byte
TracingContext tracingContext) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
requestHeaders.add(new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1)));

AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1));
requestHeaders.add(rangeHeader);
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));

// Add request header to fetch MD5 Hash of data returned by server.
if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) {
requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE));
}

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
Expand All @@ -999,6 +1014,11 @@ public AbfsRestOperation read(final String path, final long position, final byte
bufferLength, sasTokenForReuse);
op.execute(tracingContext);

// Verify the MD5 hash returned by server holds valid on the data received.
if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) {
verifyCheckSumForRead(buffer, path, op.getResult(), bufferOffset);
}

return op;
}

Expand Down Expand Up @@ -1412,6 +1432,91 @@ private void appendIfNotEmpty(StringBuilder sb, String regEx,
}
}

/**
* Add MD5 hash as request header to the append request
* @param requestHeaders
* @param reqParams
* @param buffer
*/
private void addCheckSumHeaderForWrite(List<AbfsHttpHeader> requestHeaders,
final AppendRequestParameters reqParams, final byte[] buffer)
throws AbfsRestOperationException {
try {
MessageDigest md5Digest = MessageDigest.getInstance(MD5);
byte[] dataToBeWritten = new byte[reqParams.getLength()];
System.arraycopy(buffer, reqParams.getoffset(), dataToBeWritten, 0, reqParams.getLength());
byte[] md5Bytes = md5Digest.digest(dataToBeWritten);
String md5Hash = Base64.getEncoder().encodeToString(md5Bytes);
requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, md5Hash));
} catch (NoSuchAlgorithmException e) {
throw new InvalidChecksumException(e);
}
}

/**
* To verify the checksum information received from server for the data read
* @param buffer stores the data received from server
* @param pathStr stores the path being read
* @param result HTTP Operation Result
* @param bufferOffset Position where data returned by server is saved in buffer
* @throws AbfsRestOperationException
*/
private void verifyCheckSumForRead(final byte[] buffer, final String pathStr,
final AbfsHttpOperation result, final int bufferOffset)
throws AbfsRestOperationException{
// Number of bytes returned by server could be less than or equal to what
// caller requests. In case it is less, extra bytes will be initialized to 0
// Server returned MD5 Hash will be computed on what server returned.
// We need to get exact data that server returned and compute its md5 hash
// Computed hash should be equal to what server returned
int numberOfBytesRead = (int)result.getBytesReceived();
if (numberOfBytesRead == 0) {
return;
}
byte[] dataRead = new byte[numberOfBytesRead];
System.arraycopy(buffer, bufferOffset, dataRead, 0, numberOfBytesRead);

try {
MessageDigest md5Digest = MessageDigest.getInstance(MD5);
byte[] md5Bytes = md5Digest.digest(dataRead);
String md5HashComputed = Base64.getEncoder().encodeToString(md5Bytes);
String md5HashActual = result.getResponseHeader(CONTENT_MD5);
if (!md5HashComputed.equals(md5HashActual)) {
throw new InvalidChecksumException(new PathIOException(pathStr));
}
} catch (NoSuchAlgorithmException e) {
throw new InvalidChecksumException(e);
}
}

/**
* Conditions check for allowing checksum support for read operation.
* As per the azure documentation following conditions should be met before
* Sending MD5 Hash in request headers.
* {@link https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read}
* 1. Range header should be present as one of the request headers
* 2. buffer length should not exceed 4MB.
* @param requestHeaders
* @param rangeHeader
* @param bufferLength
* @return true if all conditions are met
*/
private boolean isChecksumValidationEnabled(List<AbfsHttpHeader> requestHeaders,
final AbfsHttpHeader rangeHeader, final int bufferLength) {
return getAbfsConfiguration().getIsChecksumValidationEnabled() &&
requestHeaders.contains(rangeHeader) && bufferLength <= 4 * ONE_MB;
}

/**
* Conditions check for allowing checksum support for write operation.
* Server will support this if client sends the MD% Hash as a request header.
* {@link https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update}
* @return
*/
private boolean isChecksumValidationEnabled() {
return getAbfsConfiguration().getIsChecksumValidationEnabled();
}

@VisibleForTesting
URL getBaseUrl() {
return baseUrl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ protected AbstractAbfsIntegrationTest() throws Exception {
} else {
this.isIPAddress = false;
}

// For tests, we want to enforce checksum validation so that any regressions can be caught.
abfsConfig.setIsChecksumValidationEnabled(true);
}

protected boolean getIsNamespaceEnabled(AzureBlobFileSystem fs)
Expand Down
Loading