-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19295. S3A: large uploads can timeout over slow links #7089
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
|
|
||
| package org.apache.hadoop.fs.s3a.impl; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.Base64; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
|
|
@@ -60,6 +61,7 @@ | |
|
|
||
| import static org.apache.commons.lang3.StringUtils.isNotEmpty; | ||
| import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; | ||
| import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout; | ||
| import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; | ||
| import static org.apache.hadoop.util.Preconditions.checkArgument; | ||
| import static org.apache.hadoop.util.Preconditions.checkNotNull; | ||
|
|
@@ -128,6 +130,12 @@ public class RequestFactoryImpl implements RequestFactory { | |
| */ | ||
| private final boolean isMultipartUploadEnabled; | ||
|
|
||
| /** | ||
| * Timeout for uploading objects/parts. | ||
| * This will be set on data put/post operations only. | ||
| */ | ||
| private final Duration partUploadTimeout; | ||
|
|
||
| /** | ||
| * Constructor. | ||
| * @param builder builder with all the configuration. | ||
|
|
@@ -142,6 +150,7 @@ protected RequestFactoryImpl( | |
| this.contentEncoding = builder.contentEncoding; | ||
| this.storageClass = builder.storageClass; | ||
| this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; | ||
| this.partUploadTimeout = builder.partUploadTimeout; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -344,6 +353,11 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key, | |
| putObjectRequestBuilder.storageClass(storageClass); | ||
| } | ||
|
|
||
| // Set the timeout for object uploads but not directory markers. | ||
| if (!isDirectoryMarker) { | ||
| setRequestTimeout(putObjectRequestBuilder, partUploadTimeout); | ||
| } | ||
|
|
||
| return prepareRequest(putObjectRequestBuilder); | ||
| } | ||
|
|
||
|
|
@@ -595,6 +609,9 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder( | |
| .partNumber(partNumber) | ||
| .contentLength(size); | ||
| uploadPartEncryptionParameters(builder); | ||
|
|
||
| // Set the request timeout for the part upload | ||
| setRequestTimeout(builder, partUploadTimeout); | ||
| return prepareRequest(builder); | ||
| } | ||
|
|
||
|
|
@@ -702,6 +719,13 @@ public static final class RequestFactoryBuilder { | |
| */ | ||
| private boolean isMultipartUploadEnabled = true; | ||
|
|
||
| /** | ||
| * Timeout for uploading objects/parts. | ||
| * This will be set on data put/post operations only. | ||
| * A zero value means "no custom timeout" | ||
| */ | ||
| private Duration partUploadTimeout = Duration.ZERO; | ||
|
||
|
|
||
| private RequestFactoryBuilder() { | ||
| } | ||
|
|
||
|
|
@@ -799,6 +823,18 @@ public RequestFactoryBuilder withMultipartUploadEnabled( | |
| this.isMultipartUploadEnabled = value; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Timeout for uploading objects/parts. | ||
| * This will be set on data put/post operations only. | ||
| * A zero value means "no custom timeout" | ||
| * @param value new value | ||
| * @return the builder | ||
| */ | ||
| public RequestFactoryBuilder withPartUploadTimeout(final Duration value) { | ||
| partUploadTimeout = value; | ||
| return this; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |
| import java.io.InputStream; | ||
| import java.io.UncheckedIOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.time.LocalDateTime; | ||
| import java.util.function.Supplier; | ||
| import javax.annotation.Nullable; | ||
|
|
||
|
|
@@ -224,6 +225,12 @@ public static abstract class BaseContentProvider<T extends InputStream> | |
| */ | ||
| private T currentStream; | ||
|
|
||
| /** | ||
| * When did this upload start? | ||
| * Use in error messages. | ||
| */ | ||
| private final LocalDateTime startTime; | ||
|
|
||
| /** | ||
| * Constructor. | ||
| * @param size size of the data. Must be non-negative. | ||
|
|
@@ -241,6 +248,7 @@ protected BaseContentProvider(int size, @Nullable Supplier<Boolean> isOpen) { | |
| checkArgument(size >= 0, "size is negative: %s", size); | ||
| this.size = size; | ||
| this.isOpen = isOpen; | ||
| this.startTime = LocalDateTime.now(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -275,7 +283,10 @@ public final InputStream newStream() { | |
| checkOpen(); | ||
| streamCreationCount++; | ||
| if (streamCreationCount > 1) { | ||
| LOG.info("Stream created more than once: {}", this); | ||
| LOG.info("Stream recreated: {}", this); | ||
| if (LOG.isDebugEnabled()) { | ||
| LOG.debug("Stream creation stack", new Exception("here")); | ||
|
||
| } | ||
| } | ||
| return setCurrentStream(createNewStream()); | ||
| } | ||
|
|
@@ -302,6 +313,14 @@ public int getSize() { | |
| return size; | ||
| } | ||
|
|
||
| /** | ||
| * When did this upload start? | ||
| * @return start time | ||
| */ | ||
| public LocalDateTime getStartTime() { | ||
| return startTime; | ||
| } | ||
|
|
||
| /** | ||
| * Current stream. | ||
| * When {@link #newStream()} is called, this is set to the new value, | ||
|
|
@@ -330,6 +349,7 @@ protected T setCurrentStream(T stream) { | |
| public String toString() { | ||
| return "BaseContentProvider{" + | ||
| "size=" + size + | ||
| ", initiated at " + startTime + | ||
| ", streamCreationCount=" + streamCreationCount + | ||
| ", currentStream=" + currentStream + | ||
| '}'; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this part upload timeout different that multipart upload timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its my new option, same value for simple PUT as multipart; we patch the individual requests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anyway