Skip to content

Commit c1fde4f

Browse files
HADOOP-16948. Support infinite lease dirs. (#1925)
* HADOOP-16948. Support single writer dirs. * HADOOP-16948. Fix findbugs and checkstyle problems. * HADOOP-16948. Fix remaining checkstyle problems. * HADOOP-16948. Add DurationInfo, retry policy for acquiring lease, and javadocs * HADOOP-16948. Convert ABFS client to use an executor for lease ops * HADOOP-16948. Fix ABFS lease test for non-HNS * HADOOP-16948. Fix checkstyle and javadoc * HADOOP-16948. Address review comments * HADOOP-16948. Use daemon threads for ABFS lease ops * HADOOP-16948. Make lease duration configurable * HADOOP-16948. Add error messages to test assertions * HADOOP-16948. Remove extra isSingleWriterKey call * HADOOP-16948. Use only infinite lease duration due to cost of renewal ops * HADOOP-16948. Remove acquire/renew/release lease methods * HADOOP-16948. Rename single writer dirs to infinite lease dirs * HADOOP-16948. Fix checkstyle * HADOOP-16948. Wait for acquire lease future * HADOOP-16948. Add unit test for acquire lease failure
1 parent cb3ed32 commit c1fde4f

22 files changed

Lines changed: 1032 additions & 42 deletions

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
3232
import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations;
3333
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
34+
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerWithOutlierConfigurationValidatorAnnotation;
3435
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
3536
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
3637
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
@@ -208,6 +209,15 @@ public class AbfsConfiguration{
208209
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
209210
private String azureAppendBlobDirs;
210211

212+
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INFINITE_LEASE_KEY,
213+
DefaultValue = DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES)
214+
private String azureInfiniteLeaseDirs;
215+
216+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_LEASE_THREADS,
217+
MinValue = MIN_LEASE_THREADS,
218+
DefaultValue = DEFAULT_LEASE_THREADS)
219+
private int numLeaseThreads;
220+
211221
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
212222
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
213223
private boolean createRemoteFileSystemDuringInitialization;
@@ -296,6 +306,8 @@ public AbfsConfiguration(final Configuration rawConfig, String accountName)
296306
field.setAccessible(true);
297307
if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
298308
field.set(this, validateInt(field));
309+
} else if (field.isAnnotationPresent(IntegerWithOutlierConfigurationValidatorAnnotation.class)) {
310+
field.set(this, validateIntWithOutlier(field));
299311
} else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
300312
field.set(this, validateLong(field));
301313
} else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
@@ -634,6 +646,14 @@ public String getAppendBlobDirs() {
634646
return this.azureAppendBlobDirs;
635647
}
636648

649+
public String getAzureInfiniteLeaseDirs() {
650+
return this.azureInfiniteLeaseDirs;
651+
}
652+
653+
public int getNumLeaseThreads() {
654+
return this.numLeaseThreads;
655+
}
656+
637657
public boolean getCreateRemoteFileSystemDuringInitialization() {
638658
// we do not support creating the filesystem when AuthType is SAS
639659
return this.createRemoteFileSystemDuringInitialization
@@ -843,6 +863,21 @@ int validateInt(Field field) throws IllegalAccessException, InvalidConfiguration
843863
validator.ThrowIfInvalid()).validate(value);
844864
}
845865

866+
int validateIntWithOutlier(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
867+
IntegerWithOutlierConfigurationValidatorAnnotation validator =
868+
field.getAnnotation(IntegerWithOutlierConfigurationValidatorAnnotation.class);
869+
String value = get(validator.ConfigurationKey());
870+
871+
// validate
872+
return new IntegerConfigurationBasicValidator(
873+
validator.OutlierValue(),
874+
validator.MinValue(),
875+
validator.MaxValue(),
876+
validator.DefaultValue(),
877+
validator.ConfigurationKey(),
878+
validator.ThrowIfInvalid()).validate(value);
879+
}
880+
846881
long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
847882
LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
848883
String value = rawConfig.get(validator.ConfigurationKey());

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.apache.hadoop.security.token.Token;
8888
import org.apache.hadoop.security.UserGroupInformation;
8989
import org.apache.hadoop.util.functional.RemoteIterators;
90+
import org.apache.hadoop.util.DurationInfo;
9091
import org.apache.hadoop.util.LambdaUtils;
9192
import org.apache.hadoop.util.Progressable;
9293

@@ -505,6 +506,26 @@ public FileStatus getFileStatus(final Path f) throws IOException {
505506
}
506507
}
507508

509+
/**
510+
* Break the current lease on an ABFS file if it exists. A lease that is broken cannot be
511+
* renewed. A new lease may be obtained on the file immediately.
512+
*
513+
* @param f file name
514+
* @throws IOException on any exception while breaking the lease
515+
*/
516+
public void breakLease(final Path f) throws IOException {
517+
LOG.debug("AzureBlobFileSystem.breakLease path: {}", f);
518+
519+
Path qualifiedPath = makeQualified(f);
520+
521+
try (DurationInfo ignored = new DurationInfo(LOG, false, "Break lease for %s",
522+
qualifiedPath)) {
523+
abfsStore.breakLease(qualifiedPath);
524+
} catch(AzureBlobFileSystemException ex) {
525+
checkException(f, ex);
526+
}
527+
}
528+
508529
/**
509530
* Qualify a path to one which uses this FileSystem and, if relative,
510531
* made absolute.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.time.Instant;
4040
import java.util.ArrayList;
4141
import java.util.Arrays;
42+
import java.util.Collections;
4243
import java.util.Date;
4344
import java.util.HashMap;
4445
import java.util.HashSet;
@@ -48,10 +49,14 @@
4849
import java.util.Map;
4950
import java.util.Optional;
5051
import java.util.Set;
52+
import java.util.WeakHashMap;
53+
import java.util.concurrent.ExecutionException;
5154

5255
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
5356
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
5457
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
58+
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
59+
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
5560
import org.slf4j.Logger;
5661
import org.slf4j.LoggerFactory;
5762

@@ -100,6 +105,7 @@
100105
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
101106
import org.apache.hadoop.fs.azurebfs.services.AuthType;
102107
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
108+
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
103109
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
104110
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
105111
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
@@ -145,8 +151,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
145151
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
146152
private static final int GET_SET_AGGREGATE_COUNT = 2;
147153

154+
private final Map<AbfsLease, Object> leaseRefs;
155+
148156
private final AbfsConfiguration abfsConfiguration;
149157
private final Set<String> azureAtomicRenameDirSet;
158+
private Set<String> azureInfiniteLeaseDirSet;
150159
private Trilean isNamespaceEnabled;
151160
private final AuthType authType;
152161
private final UserGroupInformation userGroupInformation;
@@ -167,6 +176,8 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
167176
final String fileSystemName = authorityParts[0];
168177
final String accountName = authorityParts[1];
169178

179+
leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
180+
170181
try {
171182
this.abfsConfiguration = new AbfsConfiguration(configuration, accountName);
172183
} catch (IllegalAccessException exception) {
@@ -195,6 +206,7 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
195206

196207
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
197208
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
209+
updateInfiniteLeaseDirs();
198210
this.authType = abfsConfiguration.getAuthType(accountName);
199211
boolean usingOauth = (authType == AuthType.OAuth);
200212
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
@@ -246,7 +258,24 @@ public String getPrimaryGroup() {
246258

247259
@Override
248260
public void close() throws IOException {
249-
IOUtils.cleanupWithLogger(LOG, client);
261+
List<ListenableFuture<?>> futures = new ArrayList<>();
262+
for (AbfsLease lease : leaseRefs.keySet()) {
263+
if (lease == null) {
264+
continue;
265+
}
266+
ListenableFuture<?> future = client.submit(() -> lease.free());
267+
futures.add(future);
268+
}
269+
try {
270+
Futures.allAsList(futures).get();
271+
} catch (InterruptedException e) {
272+
LOG.error("Interrupted freeing leases", e);
273+
Thread.currentThread().interrupt();
274+
} catch (ExecutionException e) {
275+
LOG.error("Error freeing leases", e);
276+
} finally {
277+
IOUtils.cleanupWithLogger(LOG, client);
278+
}
250279
}
251280

252281
byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
@@ -496,12 +525,14 @@ public OutputStream createFile(final Path path,
496525
}
497526
perfInfo.registerResult(op.getResult()).registerSuccess(true);
498527

528+
AbfsLease lease = maybeCreateLease(relativePath);
529+
499530
return new AbfsOutputStream(
500531
client,
501532
statistics,
502533
relativePath,
503534
0,
504-
populateAbfsOutputStreamContext(isAppendBlob));
535+
populateAbfsOutputStreamContext(isAppendBlob, lease));
505536
}
506537
}
507538

@@ -573,7 +604,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
573604
return op;
574605
}
575606

576-
private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
607+
private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob,
608+
AbfsLease lease) {
577609
int bufferSize = abfsConfiguration.getWriteBufferSize();
578610
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
579611
bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
@@ -587,6 +619,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppend
587619
.withAppendBlob(isAppendBlob)
588620
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
589621
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
622+
.withLease(lease)
590623
.build();
591624
}
592625

@@ -705,15 +738,29 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic
705738
isAppendBlob = true;
706739
}
707740

741+
AbfsLease lease = maybeCreateLease(relativePath);
742+
708743
return new AbfsOutputStream(
709744
client,
710745
statistics,
711746
relativePath,
712747
offset,
713-
populateAbfsOutputStreamContext(isAppendBlob));
748+
populateAbfsOutputStreamContext(isAppendBlob, lease));
714749
}
715750
}
716751

752+
/**
753+
* Break any current lease on an ABFS file.
754+
*
755+
* @param path file name
756+
* @throws AzureBlobFileSystemException on any exception while breaking the lease
757+
*/
758+
public void breakLease(final Path path) throws AzureBlobFileSystemException {
759+
LOG.debug("lease path: {}", path);
760+
761+
client.breakLease(getRelativePath(path));
762+
}
763+
717764
public void rename(final Path source, final Path destination) throws
718765
AzureBlobFileSystemException {
719766
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
@@ -1347,6 +1394,13 @@ public boolean isAtomicRenameKey(String key) {
13471394
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
13481395
}
13491396

1397+
public boolean isInfiniteLeaseKey(String key) {
1398+
if (azureInfiniteLeaseDirSet.isEmpty()) {
1399+
return false;
1400+
}
1401+
return isKeyForDirectorySet(key, azureInfiniteLeaseDirSet);
1402+
}
1403+
13501404
/**
13511405
* A on-off operation to initialize AbfsClient for AzureBlobFileSystem
13521406
* Operations.
@@ -1636,4 +1690,32 @@ void setNamespaceEnabled(Trilean isNamespaceEnabled){
16361690
this.isNamespaceEnabled = isNamespaceEnabled;
16371691
}
16381692

1693+
private void updateInfiniteLeaseDirs() {
1694+
this.azureInfiniteLeaseDirSet = new HashSet<>(Arrays.asList(
1695+
abfsConfiguration.getAzureInfiniteLeaseDirs().split(AbfsHttpConstants.COMMA)));
1696+
// remove the empty string, since isKeyForDirectory returns true for empty strings
1697+
// and we don't want to default to enabling infinite lease dirs
1698+
this.azureInfiniteLeaseDirSet.remove("");
1699+
}
1700+
1701+
private AbfsLease maybeCreateLease(String relativePath)
1702+
throws AzureBlobFileSystemException {
1703+
boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath);
1704+
if (!enableInfiniteLease) {
1705+
return null;
1706+
}
1707+
AbfsLease lease = new AbfsLease(client, relativePath);
1708+
leaseRefs.put(lease, null);
1709+
return lease;
1710+
}
1711+
1712+
@VisibleForTesting
1713+
boolean areLeasesFreed() {
1714+
for (AbfsLease lease : leaseRefs.keySet()) {
1715+
if (lease != null && !lease.isFreed()) {
1716+
return false;
1717+
}
1718+
}
1719+
return true;
1720+
}
16391721
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ public final class AbfsHttpConstants {
3939
public static final String GET_ACCESS_CONTROL = "getAccessControl";
4040
public static final String CHECK_ACCESS = "checkAccess";
4141
public static final String GET_STATUS = "getStatus";
42+
public static final String ACQUIRE_LEASE_ACTION = "acquire";
43+
public static final String BREAK_LEASE_ACTION = "break";
44+
public static final String RELEASE_LEASE_ACTION = "release";
45+
public static final String RENEW_LEASE_ACTION = "renew";
46+
public static final String DEFAULT_LEASE_BREAK_PERIOD = "0";
4247
public static final String DEFAULT_TIMEOUT = "90";
4348
public static final String APPEND_BLOB_TYPE = "appendblob";
4449
public static final String TOKEN_VERSION = "2";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,15 @@ public final class ConfigurationKeys {
8787
/** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
8888
* Default is empty. **/
8989
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
90+
/** Provides a config to provide comma separated path prefixes which support infinite leases.
91+
* Files under these paths will be leased when created or opened for writing and the lease will
92+
* be released when the file is closed. The lease may be broken with the breakLease method on
93+
* AzureBlobFileSystem. Default is empty.
94+
* **/
95+
public static final String FS_AZURE_INFINITE_LEASE_KEY = "fs.azure.infinite-lease.directories";
96+
/** Provides a number of threads to use for lease operations for infinite lease directories.
97+
* Must be set to a minimum of 1 if infinite lease directories are to be used. Default is 0. **/
98+
public static final String FS_AZURE_LEASE_THREADS = "fs.azure.lease.threads";
9099
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
91100
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
92101
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ public final class FileSystemConfigurations {
7878
public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
7979
public static final boolean DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE = true;
8080
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
81+
public static final String DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES = "";
82+
public static final int DEFAULT_LEASE_THREADS = 0;
83+
public static final int MIN_LEASE_THREADS = 0;
84+
public static final int DEFAULT_LEASE_DURATION = -1;
85+
public static final int INFINITE_LEASE_DURATION = -1;
86+
public static final int MIN_LEASE_DURATION = 15;
87+
public static final int MAX_LEASE_DURATION = 60;
8188

8289
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
8390

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ public final class HttpHeaderConfigurations {
6060
public static final String X_MS_UMASK = "x-ms-umask";
6161
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
6262
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
63+
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
64+
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
65+
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
66+
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
67+
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
6368

6469
private HttpHeaderConfigurations() {}
6570
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,22 @@ public class ConfigurationValidationAnnotations {
4646
boolean ThrowIfInvalid() default false;
4747
}
4848

49+
@Target({ ElementType.FIELD })
50+
@Retention(RetentionPolicy.RUNTIME)
51+
public @interface IntegerWithOutlierConfigurationValidatorAnnotation {
52+
String ConfigurationKey();
53+
54+
int MaxValue() default Integer.MAX_VALUE;
55+
56+
int MinValue() default Integer.MIN_VALUE;
57+
58+
int OutlierValue() default Integer.MIN_VALUE;
59+
60+
int DefaultValue();
61+
62+
boolean ThrowIfInvalid() default false;
63+
}
64+
4965
/**
5066
* Describes the requirements when validating the annotated long field.
5167
*/

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public AzureBlobFileSystemException(final String message, final Exception innerE
3737
super(message, innerException);
3838
}
3939

40+
public AzureBlobFileSystemException(final String message, final Throwable innerThrowable) {
41+
super(message, innerThrowable);
42+
}
43+
4044
@Override
4145
public String toString() {
4246
if (this.getMessage() == null && this.getCause() == null) {

0 commit comments

Comments
 (0)