diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index cc08e0dc0b3..b9f3f22d30c 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -22,4 +22,8 @@ * `aws` : Adds missing sdk error checking when seeking readers ([#379](https://github.com/aws/aws-sdk-go-v2/pull/379)) * Adds support for nonseekable io.Reader. Adds support for streamed payloads for unsigned body request. * Fixes [#371](https://github.com/aws/aws-sdk-go-v2/issues/371) +* `service/s3` : Fixes unexpected EOF error by s3manager ([#386](https://github.com/aws/aws-sdk-go-v2/pull/386)) + * Fixes bug which threw unexpected EOF error when s3 upload is performed for a file of maximum allowed size + * Fixes [#316](https://github.com/aws/aws-sdk-go-v2/issues/316) + diff --git a/service/s3/s3manager/upload.go b/service/s3/s3manager/upload.go index 7300b6ae471..634b2fb6acb 100644 --- a/service/s3/s3manager/upload.go +++ b/service/s3/s3manager/upload.go @@ -144,8 +144,13 @@ type Uploader struct { // MaxUploadParts is the max number of parts which will be uploaded to S3. // Will be used to calculate the partsize of the object to be uploaded. // E.g: 5GB file, with MaxUploadParts set to 100, will upload the file - // as 100, 50MB parts. - // With a limited of s3.MaxUploadParts (10,000 parts). + // as 100, 50MB parts. With a limited of s3.MaxUploadParts (10,000 parts). + // + // MaxUploadParts must not be used to limit the total number of bytes uploaded. + // Use a type like to io.LimitReader (https://golang.org/pkg/io/#LimitedReader) + // instead. An io.LimitReader is helpful when uploading an unbounded reader + // to S3, and you know its maximum size. Otherwise the reader's io.EOF returned + // error must be used to signal end of stream. MaxUploadParts int // The client to use when uploading to S3. @@ -532,21 +537,6 @@ func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) { // Read and queue the rest of the parts for u.geterr() == nil && err == nil { - num++ - // This upload exceeded maximum number of supported parts, error now. - if num > int64(u.cfg.MaxUploadParts) || num > int64(MaxUploadParts) { - var msg string - if num > int64(u.cfg.MaxUploadParts) { - msg = fmt.Sprintf("exceeded total allowed configured MaxUploadParts (%d). Adjust PartSize to fit in this limit", - u.cfg.MaxUploadParts) - } else { - msg = fmt.Sprintf("exceeded total allowed S3 limit MaxUploadParts (%d). Adjust PartSize to fit in this limit", - MaxUploadParts) - } - u.seterr(awserr.New("TotalPartsExceeded", msg, nil)) - break - } - var reader io.ReadSeeker var nextChunkLen int reader, nextChunkLen, err = u.nextReader() @@ -566,6 +556,21 @@ func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) { break } + num++ + // This upload exceeded maximum number of supported parts, error now. + if num > int64(u.cfg.MaxUploadParts) || num > int64(MaxUploadParts) { + var msg string + if num > int64(u.cfg.MaxUploadParts) { + msg = fmt.Sprintf("exceeded total allowed configured MaxUploadParts (%d). Adjust PartSize to fit in this limit", + u.cfg.MaxUploadParts) + } else { + msg = fmt.Sprintf("exceeded total allowed S3 limit MaxUploadParts (%d). Adjust PartSize to fit in this limit", + MaxUploadParts) + } + u.seterr(awserr.New("TotalPartsExceeded", msg, nil)) + break + } + ch <- chunk{buf: reader, num: num} } diff --git a/service/s3/s3manager/upload_test.go b/service/s3/s3manager/upload_test.go index f154fe1cb3f..60ff0ac9a2b 100644 --- a/service/s3/s3manager/upload_test.go +++ b/service/s3/s3manager/upload_test.go @@ -24,7 +24,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3/s3manager" ) -var emptyList = []string{} +var emptyList []string func val(i interface{}, s string) interface{} { v, err := awsutil.ValuesAtPath(i, s) @@ -1005,3 +1005,39 @@ func TestUploadWithContextCanceled(t *testing.T) { t.Errorf("expected error message to contain %q, but did not %q", e, a) } } + +// S3 Uploader incorrectly fails an upload if the content being uploaded +// has a size of MinPartSize * MaxUploadParts. +func TestUploadMaxPartsEOF(t *testing.T) { + s, ops, _ := loggingSvc(emptyList) + mgr := s3manager.NewUploaderWithClient(s, func(u *s3manager.Uploader) { + u.Concurrency = 1 + u.PartSize = s3manager.DefaultUploadPartSize + u.MaxUploadParts = 2 + }) + f := bytes.NewReader(make([]byte, int(mgr.PartSize)*mgr.MaxUploadParts)) + + r1 := io.NewSectionReader(f, 0, s3manager.DefaultUploadPartSize) + r2 := io.NewSectionReader(f, s3manager.DefaultUploadPartSize, 2*s3manager.DefaultUploadPartSize) + body := io.MultiReader(r1, r2) + + _, err := mgr.Upload(&s3manager.UploadInput{ + Bucket: aws.String("Bucket"), + Key: aws.String("Key"), + Body: body, + }) + + if err != nil { + t.Fatalf("expect no error, got %v", err) + } + + expectOps := []string{ + "CreateMultipartUpload", + "UploadPart", + "UploadPart", + "CompleteMultipartUpload", + } + if e, a := expectOps, *ops; !reflect.DeepEqual(e, a) { + t.Errorf("expect %v ops, got %v", e, a) + } +}