Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.lang.reflect.Field;

import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;

Expand Down Expand Up @@ -83,6 +84,7 @@ public class AbfsConfiguration{
private final Configuration rawConfig;
private final String accountName;
private final boolean isSecure;
private PrefixMode prefixMode;
private static final Logger LOG = LoggerFactory.getLogger(AbfsConfiguration.class);

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_IS_HNS_ENABLED,
Expand Down Expand Up @@ -336,6 +338,14 @@ public Trilean getIsNamespaceEnabledAccount() {
return Trilean.getTrilean(isNamespaceEnabledAccount);
}

public PrefixMode getPrefixMode() {
return prefixMode;
}

public void setPrefixMode(final PrefixMode prefixMode) {
this.prefixMode = prefixMode;
}

/**
* Gets the Azure Storage account name corresponding to this instance of configuration.
* @return the Azure Storage account name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -142,6 +143,8 @@ public class AzureBlobFileSystem extends FileSystem
private DataBlocks.BlockFactory blockFactory;
/** Maximum Active blocks per OutputStream. */
private int blockOutputActiveBlocks;
private PrefixMode prefixMode = PrefixMode.DFS;
private boolean isNamespaceEnabled;

@Override
public void initialize(URI uri, Configuration configuration)
Expand Down Expand Up @@ -190,9 +193,25 @@ public void initialize(URI uri, Configuration configuration)
tracingHeaderFormat = abfsConfiguration.getTracingHeaderFormat();
this.setWorkingDirectory(this.getHomeDirectory());

TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
try {
isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
} catch (AbfsRestOperationException ex) {
/* since the filesystem has not been created. The API for HNS account would
* return 404 status.
*/
if(ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
isNamespaceEnabled = true;
} else {
throw ex;
}
}
if (!isNamespaceEnabled && uri.toString().contains(FileSystemUriSchemes.WASB_DNS_PREFIX)) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the value of isNamespaceEnabled getting initialized anywhere before the getIsNamespaceEnabled call happens?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first call to determine, it's not getting called before this.

this.prefixMode = PrefixMode.BLOB;
}
abfsConfiguration.setPrefixMode(this.prefixMode);
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) {
try {
this.createFileSystem(tracingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DNS_PREFIX;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.WASB_DNS_PREFIX;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;

Expand Down Expand Up @@ -175,6 +177,15 @@ private byte[] getSHA256Hash(String key) throws IOException {
}
}

private URL changePrefixFromBlobtoDfs(URL url) throws InvalidUriException {
try {
url = new URL(url.toString().replace(WASB_DNS_PREFIX, ABFS_DNS_PREFIX));
} catch (MalformedURLException ex) {
throw new InvalidUriException(url.toString());
}
return url;
}

private String getBase64EncodedString(String key) {
return getBase64EncodedString(key.getBytes(StandardCharsets.UTF_8));
}
Expand Down Expand Up @@ -243,7 +254,10 @@ public AbfsRestOperation createFilesystem(TracingContext tracingContext) throws
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);

final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
URL url = createRequestUrl(abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.CreateFileSystem,
this,
Expand All @@ -267,7 +281,10 @@ public AbfsRestOperation setFilesystemProperties(final String properties, Tracin
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);

final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
URL url = createRequestUrl(abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetFileSystemProperties,
this,
Expand All @@ -292,7 +309,10 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, abfsUriQueryBuilder);

final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
URL url = createRequestUrl(abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.ListPaths,
this,
Expand All @@ -309,7 +329,10 @@ public AbfsRestOperation getFilesystemProperties(TracingContext tracingContext)
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);

final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
URL url = createRequestUrl(abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.GetFileSystemProperties,
this,
Expand All @@ -326,7 +349,10 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) throws
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);

final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
URL url = createRequestUrl(abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.DeleteFileSystem,
this,
Expand Down Expand Up @@ -372,7 +398,10 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
: SASTokenProvider.CREATE_DIRECTORY_OPERATION;
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.CreatePath,
this,
Expand Down Expand Up @@ -407,7 +436,10 @@ public AbfsRestOperation acquireLease(final String path, int duration, TracingCo

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
Expand All @@ -427,7 +459,10 @@ public AbfsRestOperation renewLease(final String path, final String leaseId,

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
Expand All @@ -447,7 +482,10 @@ public AbfsRestOperation releaseLease(final String path,

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
Expand All @@ -467,7 +505,10 @@ public AbfsRestOperation breakLease(final String path,

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
Expand Down Expand Up @@ -498,7 +539,10 @@ public AbfsRestOperation renamePath(String source, final String destination,
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
appendSASTokenToQuery(destination, SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);

final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.RenamePath,
this,
Expand Down Expand Up @@ -539,7 +583,10 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
abfsUriQueryBuilder, cachedSasToken);

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.Append,
this,
Expand Down Expand Up @@ -624,7 +671,10 @@ public AbfsRestOperation flush(final String path, final long position,
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
abfsUriQueryBuilder, cachedSasToken);

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.Flush,
this,
Expand All @@ -651,7 +701,10 @@ public AbfsRestOperation setPathProperties(final String path, final String prope
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_PROPERTIES_ACTION);
appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, abfsUriQueryBuilder);

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetPathProperties,
this,
Expand Down Expand Up @@ -680,7 +733,10 @@ public AbfsRestOperation getPathStatus(final String path, final boolean includeP
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.GetPathStatus,
this,
Expand All @@ -705,7 +761,10 @@ public AbfsRestOperation read(final String path, final long position, final byte
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
abfsUriQueryBuilder, cachedSasToken);

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.ReadFile,
this,
Expand All @@ -731,7 +790,10 @@ public AbfsRestOperation deletePath(final String path, final boolean recursive,
String operation = recursive ? SASTokenProvider.DELETE_RECURSIVE_OPERATION : SASTokenProvider.DELETE_OPERATION;
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.DeletePath,
this,
Expand Down Expand Up @@ -815,7 +877,10 @@ public AbfsRestOperation setOwner(final String path, final String owner, final S
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
appendSASTokenToQuery(path, SASTokenProvider.SET_OWNER_OPERATION, abfsUriQueryBuilder);

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetOwner,
this,
Expand All @@ -841,7 +906,10 @@ public AbfsRestOperation setPermission(final String path, final String permissio
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
appendSASTokenToQuery(path, SASTokenProvider.SET_PERMISSION_OPERATION, abfsUriQueryBuilder);

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetPermissions,
this,
Expand Down Expand Up @@ -928,7 +996,7 @@ public AbfsRestOperation checkAccess(String path, String rwx, TracingContext tra
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, CHECK_ACCESS);
abfsUriQueryBuilder.addQuery(QUERY_FS_ACTION, rwx);
appendSASTokenToQuery(path, SASTokenProvider.CHECK_ACCESS_OPERATION, abfsUriQueryBuilder);
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets check if if blob endpoint API and dfs endpoint API gives same exception -> statusCode, exceptionMessage.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2023-04-02 22:49:55,061 DEBUG [JUnit-testFsActionEXECUTE]: services.AbfsIoUtils (AbfsIoUtils.java:dumpHeadersToDebugLog(64)) - HTTP Response=HTTP/1.1 403 This request is not authorized to perform this operation using this permission.

The status code for blob endpoint is 403

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The response code and message is same for both blob and dfs endpoint.

AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.CheckAccess, this,
AbfsHttpConstants.HTTP_METHOD_HEAD, url, createDefaultHeaders());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

public enum PrefixMode {
DFS,
BLOB
}