Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ public void initialize(URI name, Configuration originalConf)
LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
" queue limit={}",
blockOutputBuffer, partSize, blockOutputActiveBlocks);
long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);

setMetadataStore(S3Guard.getMetadataStore(this));
allowAuthoritative = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
Expand All @@ -400,11 +403,6 @@ public void initialize(URI name, Configuration originalConf)
getMetadataStore(), allowAuthoritative);
}
initMultipartUploads(conf);
if (hasMetadataStore()) {
long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
}
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
Expand Down Expand Up @@ -3346,8 +3344,7 @@ void finishedWrite(String key, long length, String eTag, String versionId,
// See note about failure semantics in S3Guard documentation
try {
if (hasMetadataStore()) {
S3Guard.addAncestors(metadataStore, p, username, ttlTimeProvider,
operationState);
S3Guard.addAncestors(metadataStore, p, ttlTimeProvider, operationState);
S3AFileStatus status = createUploadFileStatus(p,
S3AUtils.objectRepresentsDirectory(key, length), length,
getDefaultBlockSize(p), username, eTag, versionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,9 @@
* directory helps prevent unnecessary queries during traversal of an entire
* sub-tree.
*
<<<<<<< ours
* Some mutating operations, notably
* {@link MetadataStore#deleteSubtree(Path, ITtlTimeProvider)} and
* {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider)},
=======
* Some mutating operations, notably {@link #deleteSubtree(Path)} and
* {@link MetadataStore#move(Collection, Collection, BulkOperationState)}
>>>>>>> theirs
* {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider, BulkOperationState)}
* are less efficient with this schema.
* They require mutating multiple items in the DynamoDB table.
*
Expand Down Expand Up @@ -343,6 +338,12 @@ public class DynamoDBMetadataStore implements MetadataStore,
*/
private ListeningExecutorService executor;

/**
* Time source. This is used during writes when parent
* entries need to be created.
*/
private ITtlTimeProvider timeProvider;

/**
* A utility function to create DynamoDB instance.
* @param conf the file system configuration
Expand Down Expand Up @@ -418,6 +419,7 @@ public void initialize(FileSystem fs) throws IOException {
this::retryEvent
);

timeProvider = new S3Guard.TtlTimeProvider(conf);
initTable();

instrumentation.initialized();
Expand All @@ -437,6 +439,9 @@ void bindToOwnerFilesystem(final S3AFileSystem fs) {
instrumentation = context.getInstrumentation().getS3GuardInstrumentation();
username = context.getUsername();
executor = context.createThrottledExecutor();
timeProvider = Preconditions.checkNotNull(
context.getTimeProvider(),
"ttlTimeProvider must not be null");
}

/**
Expand Down Expand Up @@ -795,11 +800,13 @@ DirListingMetadata getDirListingMetadataFromDirMetaAndList(Path path,
* Callers are required to synchronize on ancestorState.
* @param pathsToCreate paths to create
* @param ancestorState ongoing ancestor state.
* @param ttlTimeProvider Must not be null
* @return the full ancestry paths
*/
private Collection<DDBPathMetadata> completeAncestry(
final Collection<DDBPathMetadata> pathsToCreate,
final AncestorState ancestorState) throws PathIOException {
final AncestorState ancestorState,
final ITtlTimeProvider ttlTimeProvider) throws PathIOException {
List<DDBPathMetadata> ancestorsToAdd = new ArrayList<>(0);
LOG.debug("Completing ancestry for {} paths", pathsToCreate.size());
// we sort the inputs to guarantee that the topmost entries come first.
Expand Down Expand Up @@ -845,7 +852,7 @@ private Collection<DDBPathMetadata> completeAncestry(
parent, path);
final S3AFileStatus status = makeDirStatus(parent, username);
DDBPathMetadata md = new DDBPathMetadata(status, Tristate.FALSE,
false);
false, false, ttlTimeProvider.getNow());
ancestorState.put(parent, md);
ancestorsToAdd.add(md);
parent = parent.getParent();
Expand All @@ -857,12 +864,20 @@ private Collection<DDBPathMetadata> completeAncestry(
/**
* {@inheritDoc}
* <p>
* The implementation scans all up the directory tree and does a get()
* for each entry; at each level one is found it is added to the ancestor
* state.
* <p>
* The original implementation would stop on finding the first non-empty
* parent. This (re) implementation issues a GET for every parent entry
* and so detects and recovers from a tombstone marker further up the tree
* (i.e. an inconsistent store is corrected for).
* <p>
* if {@code operationState} is not null, when this method returns the
* operation state will be updated with all new entries created.
* This ensures that subsequent operations with the same store will not
* trigger new updates.
* @param qualifiedPath path to update
* @param timeProvider
* @param operationState (nullable) operational state for a bulk update
* @throws IOException on failure.
*/
Expand All @@ -878,6 +893,7 @@ public void addAncestors(
final AncestorState ancestorState = extractOrCreate(operationState,
BulkOperationState.OperationType.Rename);
Path parent = qualifiedPath.getParent();
boolean entryFound = false;

// Iterate up the parents.
// note that only ancestorState get/set operations are synchronized;
Expand All @@ -899,6 +915,9 @@ public void addAncestors(
// a directory entry will go in.
PathMetadata directory = get(parent);
if (directory == null || directory.isDeleted()) {
if (entryFound) {
LOG.warn("Inconsistent S3Guard table: adding directory {}", parent);
}
Copy link
Contributor

@ajfabbri ajfabbri Jun 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting change to this function. slower but more robust (the removed break below, that is, not this log message)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, we might as well do the depth(path) get operations in parallel if they always happen, and the break behavior you remove is not configurable. In terms of write latency it would remove depth(path)-1 round trips (approx.). Proposing this as a followup JIRA, not doing it here.

S3AFileStatus status = makeDirStatus(username, parent);
LOG.debug("Adding new ancestor entry {}", status);
DDBPathMetadata meta = new DDBPathMetadata(status, Tristate.FALSE,
Expand All @@ -909,6 +928,8 @@ public void addAncestors(
// here that put operation would actually (mistakenly) skip
// creating the entry.
} else {
// an entry was found. Check its type
entryFound = true;
if (directory.getFileStatus().isFile()) {
throw new PathIOException(parent.toString(),
"Cannot overwrite parent file: metadatstore is"
Expand All @@ -918,7 +939,6 @@ public void addAncestors(
synchronized (ancestorState) {
ancestorState.put(parent, new DDBPathMetadata(directory));
}
break;
}
parent = parent.getParent();
}
Expand All @@ -927,7 +947,7 @@ public void addAncestors(
if (!newDirs.isEmpty()) {
// patch up the time.
patchLastUpdated(newDirs, timeProvider);
innerPut(newDirs, operationState);
innerPut(newDirs, operationState, timeProvider);
}
}

Expand Down Expand Up @@ -983,7 +1003,8 @@ public void move(
newItems.addAll(
completeAncestry(
pathMetaToDDBPathMeta(pathsToCreate),
ancestorState));
ancestorState,
extractTimeProvider(ttlTimeProvider)));
}
}
// sort all the new items topmost first.
Expand Down Expand Up @@ -1162,7 +1183,7 @@ public void put(
public void put(
final Collection<? extends PathMetadata> metas,
@Nullable final BulkOperationState operationState) throws IOException {
innerPut(pathMetaToDDBPathMeta(metas), operationState);
innerPut(pathMetaToDDBPathMeta(metas), operationState, timeProvider);
}

/**
Expand All @@ -1176,13 +1197,15 @@ public void put(
* create entries in the table without parents.
* @param metas metadata entries to write.
* @param operationState (nullable) operational state for a bulk update
* @param ttlTimeProvider
* @throws IOException failure.
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Retries.RetryTranslated
private void innerPut(
final Collection<DDBPathMetadata> metas,
@Nullable final BulkOperationState operationState) throws IOException {
@Nullable final BulkOperationState operationState,
final ITtlTimeProvider ttlTimeProvider) throws IOException {
if (metas.isEmpty()) {
// Happens when someone calls put() with an empty list.
LOG.debug("Ignoring empty list of entries to put");
Expand All @@ -1196,7 +1219,7 @@ private void innerPut(
Item[] items;
synchronized (ancestorState) {
items = pathMetadataToItem(
completeAncestry(metas, ancestorState));
completeAncestry(metas, ancestorState, ttlTimeProvider));
}
LOG.debug("Saving batch of {} items to table {}, region {}", items.length,
tableName, region);
Expand Down Expand Up @@ -1534,7 +1557,7 @@ private void removeAuthoritativeDirFlag(
try {
LOG.debug("innerPut on metas: {}", metas);
if (!metas.isEmpty()) {
innerPut(metas, state);
innerPut(metas, state, timeProvider);
}
} catch (IOException e) {
String msg = String.format("IOException while setting false "
Expand Down Expand Up @@ -2210,6 +2233,17 @@ public AncestorState initiateBulkWrite(
return new AncestorState(operation, dest);
}

/**
* Extract a time provider from the argument or fall back to the
* one in the constructor.
* @param ttlTimeProvider nullable time source passed in as an argument.
* @return a non-null time source.
*/
private ITtlTimeProvider extractTimeProvider(
@Nullable ITtlTimeProvider ttlTimeProvider) {
return ttlTimeProvider != null ? ttlTimeProvider : timeProvider;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed w/ @bgaborg @mackrorysd this will go away soon, and is fine for now.

/**
* Take an {@code IllegalArgumentException} raised by a DDB operation
* and if it contains an inner SDK exception, unwrap it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.hadoop.fs.s3a.Retries.RetryTranslated;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.util.ReflectionUtils;

import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
Expand Down Expand Up @@ -537,7 +536,6 @@ public static void addMoveAncestors(MetadataStore ms,
public static void addAncestors(
final MetadataStore metadataStore,
final Path qualifiedPath,
final String username,
final ITtlTimeProvider timeProvider,
@Nullable final BulkOperationState operationState) throws IOException {
metadataStore.addAncestors(qualifiedPath, timeProvider, operationState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1208,11 +1208,12 @@ public void testPutFileDeepUnderTombstone() throws Throwable {
.initiateBulkWrite(BulkOperationState.OperationType.Put,
childPath);
S3Guard.addAncestors(getDynamoMetadataStore(),
childPath, "self", getTtlTimeProvider(),
childPath,
getTtlTimeProvider(),
ancestorState);
// getDirectory(base);
// now write the file again.
putFile(child, now, ancestorState);
// the ancestor will now exist.
getDirectory(base);
}

Expand Down