diff --git a/storage/client_test.go b/storage/client_test.go index ae18703bcf12..42924f808bba 100644 --- a/storage/client_test.go +++ b/storage/client_test.go @@ -2042,16 +2042,17 @@ func TestMRDAddAfterCloseEmulated(t *testing.T) { callbackErr = err } reader.Add(buf, 10, 3000, callback) + reader.Wait() if callbackErr == nil { t.Fatalf("Expected error: stream to be closed") } - if got, want := callbackErr, "stream is closed"; !strings.Contains(got.Error(), want) { + if got, want := callbackErr, "downloader closed"; !strings.Contains(got.Error(), want) { t.Errorf("err: got %q, want err to contain %q", got.Error(), want) } }) } -func TestMRDAddSanityCheck(t *testing.T) { +func TestMRDAddSanityCheckEmulated(t *testing.T) { transportClientTest(skipHTTP("mrd is implemented for grpc client"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) { setBidiReads(t, client) content := make([]byte, 5000) @@ -2101,14 +2102,15 @@ func TestMRDAddSanityCheck(t *testing.T) { reader.Add(buf, 10000, 3000, callback1) // Request fails as limit is negative. reader.Add(buf, 10, -1, callback2) - if got, want := err1, fmt.Errorf("offset larger than size of object"); got.Error() != want.Error() { - t.Errorf("err: got %v, want %v", got.Error(), want.Error()) + reader.Wait() + if status.Code(err1) != codes.OutOfRange { + t.Errorf("err1: got %v, want OutOfRange", err1) } - if got, want := err2, fmt.Errorf("limit can't be negative"); got.Error() != want.Error() { - t.Errorf("err: got %v, want %v", got.Error(), want.Error()) + if got, want := err2.Error(), "limit cannot be negative"; !strings.Contains(got, want) { + t.Errorf("err2: got %v, want to contain %v", got, want) } - if err = reader.Close(); err != nil { - t.Errorf("Error while closing reader %v", err) + if err = reader.Close(); status.Code(err) != codes.OutOfRange { + t.Errorf("Unexpected err while closing reader %v", err) } }) } @@ -3098,7 +3100,6 @@ func (bp *testBufferPool) getAllocsAndFrees() (int64, int64) { // Test that successful downloads using Reader and MultiRangeDownloader free // all of their allocated buffers. func TestReadCodecLeaksEmulated(t *testing.T) { - t.Skip("flaky https://github.com/googleapis/google-cloud-go/issues/13321") checkEmulatorEnvironment(t) ctx := context.Background() var bp testBufferPool diff --git a/storage/emulator_test.sh b/storage/emulator_test.sh index 4d8da2eeb37d..9db695b0caf9 100755 --- a/storage/emulator_test.sh +++ b/storage/emulator_test.sh @@ -31,6 +31,7 @@ if [ "$minor_ver" -lt "$min_minor_ver" ]; then exit 0 fi +export DOCKER_API_VERSION=1.39 export STORAGE_EMULATOR_HOST="http://localhost:9000" export STORAGE_EMULATOR_HOST_GRPC="localhost:8888" @@ -66,6 +67,7 @@ function cleanup() { docker stop $CONTAINER_NAME unset STORAGE_EMULATOR_HOST; unset STORAGE_EMULATOR_HOST_GRPC; + unset DOCKER_API_VERSION } trap cleanup EXIT diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 7dc3d0a189c8..dfe38703757f 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -24,7 +24,6 @@ import ( "log" "os" "strconv" - "sync" "cloud.google.com/go/iam/apiv1/iampb" gapic "cloud.google.com/go/storage/internal/apiv2" @@ -1093,550 +1092,6 @@ func contextMetadataFromBidiReadObject(req *storagepb.BidiReadObjectRequest) []s return []string{"x-goog-request-params", fmt.Sprintf("bucket=%s", req.GetReadObjectSpec().GetBucket())} } -func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params *newMultiRangeDownloaderParams, opts ...storageOption) (mr *MultiRangeDownloader, err error) { - if !c.config.grpcBidiReads { - return nil, errors.New("storage: MultiRangeDownloader requires the experimental.WithGRPCBidiReads option") - } - - ctx, _ = startSpan(ctx, "grpcStorageClient.NewMultiRangeDownloader") - defer func() { endSpan(ctx, err) }() - s := callSettings(c.settings, opts...) - // Force the use of the custom codec to enable zero-copy reads. - s.gax = append(s.gax, gax.WithGRPCOptions( - grpc.ForceCodecV2(bytesCodecV2{}), - )) - - if s.userProject != "" { - ctx = setUserProjectMetadata(ctx, s.userProject) - } - - b := bucketResourceName(globalProjectAlias, params.bucket) - object := params.object - bidiObject := &storagepb.BidiReadObjectSpec{ - Bucket: b, - Object: object, - CommonObjectRequestParams: toProtoCommonObjectRequestParams(params.encryptionKey), - } - - // The default is a negative value, which means latest. - if params.gen >= 0 { - bidiObject.Generation = params.gen - } - - if params.handle != nil && len(*params.handle) != 0 { - bidiObject.ReadHandle = &storagepb.BidiReadHandle{ - Handle: *params.handle, - } - } - req := &storagepb.BidiReadObjectRequest{ - ReadObjectSpec: bidiObject, - } - - openStream := func(readHandle ReadHandle) (*bidiReadStreamResponse, context.CancelFunc, error) { - if err := applyCondsProto("grpcStorageClient.BidiReadObject", params.gen, params.conds, bidiObject); err != nil { - return nil, nil, err - } - if len(readHandle) != 0 { - req.GetReadObjectSpec().ReadHandle = &storagepb.BidiReadHandle{ - Handle: readHandle, - } - } - databufs := mem.BufferSlice{} - - var stream storagepb.Storage_BidiReadObjectClient - var decoder *readResponseDecoder - cc, cancel := context.WithCancel(ctx) - err = run(cc, func(ctx context.Context) error { - openAndSendReq := func() error { - mdCtx := gax.InsertMetadataIntoOutgoingContext(ctx, contextMetadataFromBidiReadObject(req)...) - - stream, err = c.raw.BidiReadObject(mdCtx, s.gax...) - if err != nil { - return err - } - // If stream opened succesfully, send first message on the stream. - // First message to stream should contain read_object_spec - err = stream.Send(req) - if err != nil { - return err - } - // Use RecvMsg to get the raw buffer slice instead of Recv(). - err = stream.RecvMsg(&databufs) - if err != nil { - return err - } - return nil - } - - err := openAndSendReq() - - // We might get a redirect error here for an out-of-region request. - // Add the routing token and read handle to the request and do one - // retry. - if st, ok := status.FromError(err); ok && st.Code() == codes.Aborted { - // BidiReadObjectRedirectedError error is only returned on initial open in case of a redirect. - // The routing token that should be used when reopening the read stream. Needs to be exported. - for _, detail := range st.Details() { - if bidiError, ok := detail.(*storagepb.BidiReadObjectRedirectedError); ok { - bidiObject.ReadHandle = bidiError.ReadHandle - bidiObject.RoutingToken = bidiError.RoutingToken - databufs = mem.BufferSlice{} - err = openAndSendReq() - break - } - } - } - if err != nil { - databufs.Free() - return err - } - - // Use the custom decoder to parse the raw buffer without copying object data. - decoder = &readResponseDecoder{ - databufs: databufs, - } - err = decoder.readFullObjectResponse() - return err - }, s.retry, s.idempotent) - if err != nil { - // Close the stream context we just created to ensure we don't leak - // resources. - cancel() - return nil, nil, err - } - return &bidiReadStreamResponse{stream: stream, decoder: decoder}, cancel, nil - } - - // For the first time open stream without adding any range. - resp, cancel, err := openStream(nil) - if err != nil { - return nil, err - } - - // The first message was Recv'd on stream open, use it to populate the - // object metadata. - msg := resp.decoder.msg - obj := msg.GetMetadata() - - mrd := &gRPCBidiReader{ - stream: resp.stream, - cancel: cancel, - settings: s, - readHandle: msg.GetReadHandle().GetHandle(), - readIDGenerator: &readIDGenerator{}, - reopen: openStream, - readSpec: bidiObject, - rangesToRead: make(chan []mrdRange, 100), - ctx: ctx, - closeReceiver: make(chan bool, 10), - closeSender: make(chan bool, 10), - senderRetry: make(chan bool), // create unbuffered channel for closing the streamManager goroutine. - receiverRetry: make(chan bool), // create unbuffered channel for closing the streamReceiver goroutine. - activeRanges: make(map[int64]mrdRange), - done: false, - numActiveRanges: 0, - streamRecreation: false, - } - - // sender receives ranges from user adds and requests these ranges from GCS. - sender := func() { - var currentSpec []mrdRange - for { - select { - case <-mrd.ctx.Done(): - mrd.mu.Lock() - mrd.done = true - mrd.mu.Unlock() - return - case <-mrd.senderRetry: - return - case <-mrd.closeSender: - mrd.mu.Lock() - if len(mrd.activeRanges) != 0 { - for key := range mrd.activeRanges { - mrd.activeRanges[key].callback(mrd.activeRanges[key].offset, mrd.activeRanges[key].totalBytesWritten, fmt.Errorf("stream closed early")) - delete(mrd.activeRanges, key) - } - } - mrd.numActiveRanges = 0 - mrd.mu.Unlock() - return - case currentSpec = <-mrd.rangesToRead: - var readRanges []*storagepb.ReadRange - var err error - mrd.mu.Lock() - for _, v := range currentSpec { - mrd.activeRanges[v.readID] = v - readRanges = append(readRanges, &storagepb.ReadRange{ReadOffset: v.offset, ReadLength: v.limit, ReadId: v.readID}) - } - mrd.mu.Unlock() - // We can just send 100 request to gcs in one request. - // In case of Add we will send only one range request to gcs but in case of retry we can have more than 100 ranges. - // Hence be will divide the request in chunk of 100. - // For example with 457 ranges on stream we will have 5 request to gcs [0:99], [100:199], [200:299], [300:399], [400:456] - requestCount := len(readRanges) / 100 - if len(readRanges)%100 != 0 { - requestCount++ - } - for i := 0; i < requestCount; i++ { - start := i * 100 - end := (i + 1) * 100 - if end > len(readRanges) { - end = len(readRanges) - } - curReq := readRanges[start:end] - err = mrd.stream.Send(&storagepb.BidiReadObjectRequest{ - ReadRanges: curReq, - }) - if err != nil { - // cancel stream and reopen the stream again. - // Incase again an error is thrown close the streamManager goroutine. - mrd.retrier(err, "manager") - break - } - } - - } - } - } - - // receives ranges responses on the stream and executes the callback. - receiver := func() { - var err error - for { - select { - case <-mrd.ctx.Done(): - mrd.done = true - return - case <-mrd.receiverRetry: - return - case <-mrd.closeReceiver: - return - default: - // This function reads the data sent for a particular range request and has a callback - // to indicate that output buffer is filled. - databufs := mem.BufferSlice{} - err = mrd.stream.RecvMsg(&databufs) - if err == io.EOF { - err = nil - } else { - // Cancel stream and reopen the stream again. - // In case again an error is thrown, close the streamManager goroutine. - // TODO: special handling for not found error. - mrd.retrier(err, "receiver") - } - - if err == nil { - // Use the custom decoder to parse the message. - decoder := &readResponseDecoder{databufs: databufs} - if err := decoder.readFullObjectResponse(); err != nil { - mrd.retrier(err, "receiver") - continue // Move to next iteration after retry - } - msg := decoder.msg - - if msg.GetReadHandle().GetHandle() != nil { - mrd.readHandle = msg.GetReadHandle().GetHandle() - } - - mrd.mu.Lock() - if len(mrd.activeRanges) == 0 && mrd.numActiveRanges == 0 { - mrd.mu.Unlock() - mrd.closeReceiver <- true - mrd.closeSender <- true - return - } - mrd.mu.Unlock() - for _, val := range msg.GetObjectDataRanges() { - id := val.GetReadRange().GetReadId() - func() { - mrd.mu.Lock() - defer mrd.mu.Unlock() - currRange, ok := mrd.activeRanges[id] - if !ok { - // it's ok to ignore responses for read_id not in map as user would have been notified by callback. - return - } - - // The decoder holds the object content. writeToAndUpdateCRC writes - // it to the user's buffer without an intermediate copy. - written, _, err := decoder.writeToAndUpdateCRC(currRange.writer, id, func(b []byte) { - // crc update logic can be added here if needed - }) - - if err != nil { - currRange.callback(currRange.offset, currRange.totalBytesWritten, err) - mrd.numActiveRanges-- - delete(mrd.activeRanges, id) - } else { - currRange = mrdRange{ - readID: currRange.readID, - writer: currRange.writer, - offset: currRange.offset, - limit: currRange.limit, - currentBytesWritten: currRange.currentBytesWritten + written, - totalBytesWritten: currRange.totalBytesWritten + written, - callback: currRange.callback, - } - mrd.activeRanges[id] = currRange - } - if val.GetRangeEnd() { - currRange.callback(currRange.offset, currRange.totalBytesWritten, nil) - mrd.numActiveRanges-- - delete(mrd.activeRanges, id) - } - }() - } - // Free the buffers once the message has been processed. - decoder.databufs.Free() - } - } - } - } - - mrd.retrier = func(err error, thread string) { - mrd.mu.Lock() - if !mrd.streamRecreation { - mrd.streamRecreation = true - } else { - mrd.mu.Unlock() - return - } - mrd.mu.Unlock() - // close both the go routines to make the stream recreation syncronous. - if thread == "receiver" { - mrd.senderRetry <- true - } else { - mrd.receiverRetry <- true - } - err = mrd.retryStream(err) - if err != nil { - mrd.mu.Lock() - for key := range mrd.activeRanges { - mrd.activeRanges[key].callback(mrd.activeRanges[key].offset, mrd.activeRanges[key].totalBytesWritten, err) - delete(mrd.activeRanges, key) - } - // In case we hit an permanent error, delete entries from map and remove active tasks. - mrd.numActiveRanges = 0 - mrd.mu.Unlock() - mrd.close() - } else { - // If stream recreation happened successfully lets again start - // both the goroutine making the whole flow asynchronous again. - if thread == "receiver" { - go sender() - } else { - go receiver() - } - } - mrd.mu.Lock() - mrd.streamRecreation = false - mrd.mu.Unlock() - } - - go sender() - go receiver() - - return &MultiRangeDownloader{ - Attrs: ReaderObjectAttrs{ - Size: obj.GetSize(), // this is the size of the entire object, even if only a range was requested. - ContentType: obj.GetContentType(), - ContentEncoding: obj.GetContentEncoding(), - CacheControl: obj.GetCacheControl(), - LastModified: obj.GetUpdateTime().AsTime(), - Metageneration: obj.GetMetageneration(), - Generation: obj.GetGeneration(), - }, - reader: mrd, - }, nil -} - -type gRPCBidiReader struct { - ctx context.Context - stream storagepb.Storage_BidiReadObjectClient - cancel context.CancelFunc - settings *settings - readHandle ReadHandle - readIDGenerator *readIDGenerator - reopen func(ReadHandle) (*bidiReadStreamResponse, context.CancelFunc, error) - readSpec *storagepb.BidiReadObjectSpec - closeReceiver chan bool - closeSender chan bool - senderRetry chan bool - receiverRetry chan bool - // rangesToRead are ranges that have not yet been sent or have been sent but - // must be retried. - rangesToRead chan []mrdRange - // activeRanges are ranges that are currently being sent or are waiting for - // a response from GCS. - activeRanges map[int64]mrdRange // always use the mutex when accessing the map - numActiveRanges int64 // always use the mutex when accessing this variable - done bool // always use the mutex when accessing this variable, indicates whether stream is closed or not. - mu sync.Mutex // protects all vars in gRPCBidiReader from concurrent access - retrier func(error, string) - streamRecreation bool // This helps us identify if stream recreation is in progress or not. If stream recreation gets called from two goroutine then this will stop second one. -} - -func (mrd *gRPCBidiReader) activeRange() []mrdRange { - mrd.mu.Lock() - defer mrd.mu.Unlock() - var activeRange []mrdRange - for k, v := range mrd.activeRanges { - activeRange = append(activeRange, mrdRange{ - readID: k, - writer: v.writer, - offset: (v.offset + v.currentBytesWritten), - limit: v.limit - v.currentBytesWritten, - callback: v.callback, - currentBytesWritten: 0, - totalBytesWritten: v.totalBytesWritten, - }) - mrd.activeRanges[k] = activeRange[len(activeRange)-1] - } - return activeRange -} - -// retryStream cancel's stream and reopen the stream again. -func (mrd *gRPCBidiReader) retryStream(err error) error { - if mrd.settings.retry.runShouldRetry(err) { - // This will "close" the existing stream and immediately attempt to - // reopen the stream, but will backoff if further attempts are necessary. - // When Reopening the stream only failed readID will be added to stream. - return mrd.reopenStream(mrd.activeRange()) - } - return err -} - -// reopenStream "closes" the existing stream and attempts to reopen a stream and -// sets the Reader's stream and cancelStream properties in the process. -func (mrd *gRPCBidiReader) reopenStream(failSpec []mrdRange) error { - // Close existing stream and initialize new stream with updated offset. - if mrd.cancel != nil { - mrd.cancel() - } - - res, cancel, err := mrd.reopen(mrd.readHandle) - if err != nil { - return err - } - mrd.stream = res.stream - mrd.cancel = cancel - msg := res.decoder.msg - if msg.GetReadHandle().GetHandle() != nil { - mrd.readHandle = msg.GetReadHandle().GetHandle() - } - - // Process any data ranges that came back in the initial response. - // This prevents data loss from the first message on the new stream. - for _, val := range msg.GetObjectDataRanges() { - id := val.GetReadRange().GetReadId() - mrd.mu.Lock() - activeRange, ok := mrd.activeRanges[id] - if !ok { - mrd.mu.Unlock() - continue - } - - // Use the decoder's zero-copy write method. - written, _, writeErr := res.decoder.writeToAndUpdateCRC(activeRange.writer, id, nil) - if writeErr != nil { - activeRange.callback(activeRange.offset, activeRange.totalBytesWritten, writeErr) - mrd.numActiveRanges-- - delete(mrd.activeRanges, id) - } else { - activeRange.currentBytesWritten += written - activeRange.totalBytesWritten += written - mrd.activeRanges[id] = activeRange - } - - if val.GetRangeEnd() { - activeRange.callback(activeRange.offset, activeRange.totalBytesWritten, nil) - mrd.numActiveRanges-- - delete(mrd.activeRanges, id) - } - mrd.mu.Unlock() - } - // Once all data in the initial response has been read out, free buffers. - res.decoder.databufs.Free() - if failSpec != nil { - mrd.rangesToRead <- failSpec - } - return nil -} - -// add will add current range to stream. The size of the range is not validated -// by add; if the client requests more bytes than are available in the object -// the server will return an error. -func (mrd *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback func(int64, int64, error)) { - if limit < 0 { - callback(offset, 0, errors.New("storage: cannot add range because the limit cannot be negative")) - return - } - - id := mrd.readIDGenerator.Next() - if !mrd.done { - spec := mrdRange{readID: id, writer: output, offset: offset, limit: limit, currentBytesWritten: 0, totalBytesWritten: 0, callback: callback} - mrd.mu.Lock() - mrd.numActiveRanges++ - mrd.mu.Unlock() - mrd.rangesToRead <- []mrdRange{spec} - } else { - callback(offset, 0, errors.New("storage: cannot add range because the stream is closed")) - } -} - -func (mrd *gRPCBidiReader) wait() { - mrd.mu.Lock() - // we should wait until there is active task or an entry in the map. - // there can be a scenario we have nothing in map for a moment or too but still have active task. - // hence in case we have permanent errors we reduce active task to 0 so that this does not block wait. - keepWaiting := len(mrd.activeRanges) != 0 || mrd.numActiveRanges != 0 - mrd.mu.Unlock() - - for keepWaiting { - mrd.mu.Lock() - keepWaiting = len(mrd.activeRanges) != 0 || mrd.numActiveRanges != 0 - mrd.mu.Unlock() - } -} - -// Close will notify stream manager goroutine that the reader has been closed, if it's still running. -func (mrd *gRPCBidiReader) close() error { - if mrd.cancel != nil { - mrd.cancel() - } - mrd.mu.Lock() - mrd.done = true - mrd.numActiveRanges = 0 - mrd.mu.Unlock() - mrd.closeReceiver <- true - mrd.closeSender <- true - return nil -} - -func (mrd *gRPCBidiReader) getHandle() []byte { - return mrd.readHandle -} - -func (mrd *gRPCBidiReader) error() error { - mrd.mu.Lock() - defer mrd.mu.Unlock() - if mrd.done { - return errors.New("storage: stream is permanently closed") - } - return nil -} - -type mrdRange struct { - readID int64 - writer io.Writer - offset int64 - limit int64 - currentBytesWritten int64 - totalBytesWritten int64 - callback func(int64, int64, error) -} - func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) { // If bidi reads was not selected, use the legacy read object API. if !c.config.grpcBidiReads { diff --git a/storage/grpc_reader_multi_range.go b/storage/grpc_reader_multi_range.go index 14b78e1a1bd3..464018ef8d9a 100644 --- a/storage/grpc_reader_multi_range.go +++ b/storage/grpc_reader_multi_range.go @@ -14,27 +14,918 @@ package storage -import "sync" +import ( + "context" + "errors" + "fmt" + "io" + "sync" -// readIDGenerator generates unique read IDs for multi-range reads. -// Call readIDGenerator.Next to get the next ID. Safe to be called concurrently. -type readIDGenerator struct { - initOnce sync.Once - nextID chan int64 // do not use this field directly + "cloud.google.com/go/storage/internal/apiv2/storagepb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/mem" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + + gax "github.com/googleapis/gax-go/v2" +) + +const ( + mrdCommandChannelSize = 1 + mrdResponseChannelSize = 100 +) + +// --- internalMultiRangeDownloader Interface --- +// This provides an internal wrapper for the gRPC methods to avoid polluting +// reader.go with gRPC implementation details. The only implementation +// currently is for the gRPC transport with bidi APIs enabled. Creating +// a MultiRangeDownloader with any other client type will fail. +type internalMultiRangeDownloader interface { + add(output io.Writer, offset, length int64, callback func(int64, int64, error)) + close(err error) error + wait() + getHandle() []byte + getPermanentError() error + getSpanCtx() context.Context +} + +// --- grpcStorageClient method --- +// Top level entry point into the MultiRangeDownloader via the storageClient interface. +func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params *newMultiRangeDownloaderParams, opts ...storageOption) (*MultiRangeDownloader, error) { + if !c.config.grpcBidiReads { + return nil, errors.New("storage: MultiRangeDownloader requires the experimental.WithGRPCBidiReads option") + } + s := callSettings(c.settings, opts...) + if s.userProject != "" { + ctx = setUserProjectMetadata(ctx, s.userProject) + } + + b := bucketResourceName(globalProjectAlias, params.bucket) + readSpec := &storagepb.BidiReadObjectSpec{ + Bucket: b, + Object: params.object, + CommonObjectRequestParams: toProtoCommonObjectRequestParams(params.encryptionKey), + } + if params.gen >= 0 { + readSpec.Generation = params.gen + } + if params.handle != nil && len(*params.handle) > 0 { + readSpec.ReadHandle = &storagepb.BidiReadHandle{ + Handle: *params.handle, + } + } + + mCtx, cancel := context.WithCancel(ctx) + + // Create the manager + manager := &multiRangeDownloaderManager{ + ctx: mCtx, + cancel: cancel, + client: c, + settings: s, + params: params, + cmds: make(chan mrdCommand, mrdCommandChannelSize), + sessionResps: make(chan mrdSessionResult, mrdResponseChannelSize), + pendingRanges: make(map[int64]*rangeRequest), + readIDCounter: 1, + readSpec: readSpec, + attrsReady: make(chan struct{}), + spanCtx: ctx, + } + + mrd := &MultiRangeDownloader{ + impl: manager, + } + + manager.wg.Add(1) + go func() { + defer manager.wg.Done() + manager.eventLoop() + }() + + // Wait for attributes to be ready + select { + case <-manager.attrsReady: + if manager.permanentErr != nil { + cancel() + manager.wg.Wait() + return nil, manager.permanentErr + } + mrd.Attrs = manager.attrs + return mrd, nil + case <-ctx.Done(): + cancel() + manager.wg.Wait() + return nil, ctx.Err() + } +} + +// --- mrdCommand Interface and Implementations --- +// Used to pass commands from the user-facing code to the MRD manager. +// mrdCommand handlers are applied sequentially in the event loop. Therefore, it's okay +// for them to read/modify the manager state without concern for thread safety. +type mrdCommand interface { + apply(ctx context.Context, m *multiRangeDownloaderManager) +} +type mrdAddCmd struct { + output io.Writer + offset int64 + length int64 + callback func(int64, int64, error) +} + +func (c *mrdAddCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + m.handleAddCmd(ctx, c) +} + +type mrdCloseCmd struct { + err error +} + +func (c *mrdCloseCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + m.handleCloseCmd(ctx, c) +} + +type mrdWaitCmd struct { + doneC chan struct{} +} + +func (c *mrdWaitCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + m.handleWaitCmd(ctx, c) +} + +type mrdGetHandleCmd struct { + respC chan []byte +} + +func (c *mrdGetHandleCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + select { + case <-m.attrsReady: + select { + case c.respC <- m.lastReadHandle: + case <-m.ctx.Done(): + close(c.respC) + } + case <-m.ctx.Done(): + close(c.respC) + } +} + +type mrdErrorCmd struct { + respC chan error +} + +func (c *mrdErrorCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + select { + case c.respC <- m.permanentErr: + case <-ctx.Done(): + close(c.respC) + } +} + +// --- mrdSessionResult --- +// This is used to pass the zero-copy decoded response from the recv stream +// back up to the multiRangeDownloadManager for processing, or to pass +// an error if the session failed. +type mrdSessionResult struct { + decoder *readResponseDecoder + err error + redirect *storagepb.BidiReadObjectRedirectedError +} + +var errClosed = errors.New("downloader closed") + +// --- multiRangeDownloaderManager --- +// Manages main event loop for MRD commands and processing responses. +// Spawns bidiStreamSession to deal with actual stream management, retries, etc. +type multiRangeDownloaderManager struct { + ctx context.Context + cancel context.CancelFunc + client *grpcStorageClient + settings *settings + params *newMultiRangeDownloaderParams + wg sync.WaitGroup // syncs completion of event loop. + cmds chan mrdCommand + sessionResps chan mrdSessionResult + + // State + currentSession *bidiReadStreamSession + readIDCounter int64 + pendingRanges map[int64]*rangeRequest + permanentErr error + waiters []chan struct{} + readSpec *storagepb.BidiReadObjectSpec + lastReadHandle []byte + attrs ReaderObjectAttrs + attrsReady chan struct{} + attrsOnce sync.Once + spanCtx context.Context + callbackWg sync.WaitGroup +} + +type rangeRequest struct { + output io.Writer + offset int64 + length int64 + callback func(int64, int64, error) + + origOffset int64 + origLength int64 + + readID int64 + bytesWritten int64 + completed bool +} + +// Methods implementing internalMultiRangeDownloader +func (m *multiRangeDownloaderManager) add(output io.Writer, offset, length int64, callback func(int64, int64, error)) { + if err := m.ctx.Err(); err != nil { + if m.permanentErr != nil { + err = m.permanentErr + } + m.runCallback(offset, length, err, callback) + return + } + if length < 0 { + m.runCallback(offset, length, fmt.Errorf("storage: MultiRangeDownloader.Add limit cannot be negative"), callback) + return + } + + cmd := &mrdAddCmd{output: output, offset: offset, length: length, callback: callback} + select { + case m.cmds <- cmd: + case <-m.ctx.Done(): + err := m.ctx.Err() + if m.permanentErr != nil { + err = m.permanentErr + } + m.runCallback(offset, length, err, callback) + } +} + +func (m *multiRangeDownloaderManager) close(err error) error { + cmd := &mrdCloseCmd{err: err} + select { + case m.cmds <- cmd: + <-m.ctx.Done() + m.wg.Wait() + if m.permanentErr != nil && !errors.Is(m.permanentErr, errClosed) { + return m.permanentErr + } + return nil + case <-m.ctx.Done(): + m.wg.Wait() + return m.ctx.Err() + } +} + +func (m *multiRangeDownloaderManager) wait() { + doneC := make(chan struct{}) + cmd := &mrdWaitCmd{doneC: doneC} + select { + case m.cmds <- cmd: + select { + case <-doneC: + m.callbackWg.Wait() + return + case <-m.ctx.Done(): + m.callbackWg.Wait() + return + } + case <-m.ctx.Done(): + m.callbackWg.Wait() + return + } +} + +func (m *multiRangeDownloaderManager) getHandle() []byte { + select { + case <-m.attrsReady: + case <-m.ctx.Done(): + return nil + } + + respC := make(chan []byte, 1) + cmd := &mrdGetHandleCmd{respC: respC} + select { + case m.cmds <- cmd: + select { + case h, ok := <-respC: + if !ok { + return nil + } + return h + case <-m.ctx.Done(): + return nil + } + case <-m.ctx.Done(): + return nil + } +} + +func (m *multiRangeDownloaderManager) getPermanentError() error { + return m.permanentErr +} + +func (m *multiRangeDownloaderManager) getSpanCtx() context.Context { + return m.spanCtx +} + +func (m *multiRangeDownloaderManager) runCallback(origOffset, numBytes int64, err error, cb func(int64, int64, error)) { + m.callbackWg.Add(1) + go func() { + defer m.callbackWg.Done() + cb(origOffset, numBytes, err) + }() +} + +func (m *multiRangeDownloaderManager) eventLoop() { + defer func() { + if m.currentSession != nil { + m.currentSession.Shutdown() + } + finalErr := m.permanentErr + if finalErr == nil { + if ctxErr := m.ctx.Err(); ctxErr != nil { + finalErr = ctxErr + } + } + if finalErr == nil { + finalErr = errClosed + } + m.failAllPending(finalErr) + for _, waiter := range m.waiters { + close(waiter) + } + m.attrsOnce.Do(func() { close(m.attrsReady) }) + m.callbackWg.Wait() + }() + + // Blocking call to establish the first session and get attributes. + if err := m.establishInitialSession(); err != nil { + // permanentErr is set within establishInitialSession if necessary. + return // Exit eventLoop if we can't start. + } + + for { + select { + case <-m.ctx.Done(): + return + case cmd := <-m.cmds: + cmd.apply(m.ctx, m) + if _, ok := cmd.(*mrdCloseCmd); ok { + return + } + case result := <-m.sessionResps: + m.processSessionResult(result) + } + + if len(m.pendingRanges) == 0 { + for _, waiter := range m.waiters { + close(waiter) + } + m.waiters = nil + } + } +} + +func (m *multiRangeDownloaderManager) establishInitialSession() error { + retry := m.settings.retry + if retry == nil { + retry = defaultRetry + } + + var firstResult mrdSessionResult + + openStreamAndReceiveFirst := func(ctx context.Context, spec *storagepb.BidiReadObjectSpec) (*bidiReadStreamSession, mrdSessionResult) { + session, err := newBidiReadStreamSession(m.ctx, m.sessionResps, m.client, m.settings, m.params, spec) + if err != nil { + return nil, mrdSessionResult{err: err} + } + + select { + case result := <-m.sessionResps: + return session, result + case <-ctx.Done(): + session.Shutdown() + return nil, mrdSessionResult{err: ctx.Err()} + } + } + + err := run(m.ctx, func(ctx context.Context) error { + if m.currentSession != nil { + m.currentSession.Shutdown() + m.currentSession = nil + } + + currentSpec := proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec) + session, result := openStreamAndReceiveFirst(ctx, currentSpec) + + if result.err != nil { + if result.redirect != nil { + m.readSpec.RoutingToken = result.redirect.RoutingToken + m.readSpec.ReadHandle = result.redirect.ReadHandle + if session != nil { + session.Shutdown() + } + + // We might get a redirect error here for an out-of-region request. + // Add the routing token and read handle to the request and do one + // retry. + currentSpec = proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec) + session, result = openStreamAndReceiveFirst(ctx, currentSpec) + + if result.err != nil { + if session != nil { + session.Shutdown() + } + return result.err + } + } else { + // Not a redirect error, return to run() + if session != nil { + session.Shutdown() + } + return result.err + } + } + + // Success + m.currentSession = session + firstResult = result + return nil + }, retry, true) + + if err != nil { + m.setPermanentError(err) + return m.permanentErr + } + + // Process the successful first result + m.processSessionResult(firstResult) + if m.permanentErr != nil { + return m.permanentErr + } + return nil +} + +func (m *multiRangeDownloaderManager) handleAddCmd(ctx context.Context, cmd *mrdAddCmd) { + if m.permanentErr != nil { + m.runCallback(cmd.offset, cmd.length, m.permanentErr, cmd.callback) + return + } + + req := &rangeRequest{ + output: cmd.output, + offset: cmd.offset, + length: cmd.length, + origOffset: cmd.offset, + origLength: cmd.length, + callback: cmd.callback, + readID: m.readIDCounter, + } + m.readIDCounter++ + + // Attributes should be ready if we are processing Add commands + if req.offset < 0 { + err := m.convertToPositiveOffset(req) + if err != nil { + return + } + } + + if m.currentSession == nil { + // This should not happen if establishInitialSession was successful + m.failRange(req, errors.New("storage: session not available")) + return + } + + m.pendingRanges[req.readID] = req + + protoReq := &storagepb.BidiReadObjectRequest{ + ReadRanges: []*storagepb.ReadRange{{ + ReadOffset: req.offset, + ReadLength: req.length, + ReadId: req.readID, + }}, + } + m.currentSession.SendRequest(protoReq) +} + +func (m *multiRangeDownloaderManager) convertToPositiveOffset(req *rangeRequest) error { + if req.offset >= 0 { + return nil + } + objSize := m.attrs.Size + if objSize <= 0 { + err := errors.New("storage: cannot resolve negative offset without object size") + m.failRange(req, err) + return err + } + if req.length != 0 { + err := fmt.Errorf("storage: negative offset with non-zero length is not supported (offset: %d, length: %d)", req.origOffset, req.origLength) + m.failRange(req, err) + return err + } + start := objSize + req.offset + if start < 0 { + start = 0 + } + req.offset = start + req.length = objSize - start + return nil +} + +func (m *multiRangeDownloaderManager) handleCloseCmd(ctx context.Context, cmd *mrdCloseCmd) { + var err error + if cmd.err != nil { + err = cmd.err + } else { + err = errClosed + + } + m.setPermanentError(err) + m.cancel() +} + +func (m *multiRangeDownloaderManager) handleWaitCmd(ctx context.Context, cmd *mrdWaitCmd) { + if len(m.pendingRanges) == 0 { + close(cmd.doneC) + } else { + m.waiters = append(m.waiters, cmd.doneC) + } +} + +func (m *multiRangeDownloaderManager) processSessionResult(result mrdSessionResult) { + if result.err != nil { + m.handleStreamEnd(result) + return + } + + resp := result.decoder.msg + if handle := resp.GetReadHandle().GetHandle(); len(handle) > 0 { + m.lastReadHandle = handle + if m.params.handle != nil { + *m.params.handle = handle + } + } + + m.attrsOnce.Do(func() { + if meta := resp.GetMetadata(); meta != nil { + obj := newObjectFromProto(meta) + attrs := readerAttrsFromObject(obj) + m.attrs = attrs + close(m.attrsReady) + + for _, req := range m.pendingRanges { + if req.offset < 0 { + _ = m.convertToPositiveOffset(req) + } + } + } else { + m.handleStreamEnd(mrdSessionResult{err: errors.New("storage: first response from BidiReadObject stream missing metadata")}) + } + }) + + for _, dataRange := range resp.GetObjectDataRanges() { + readID := dataRange.GetReadRange().GetReadId() + req, exists := m.pendingRanges[readID] + if !exists || req.completed { + continue + } + written, _, err := result.decoder.writeToAndUpdateCRC(req.output, readID, nil) + req.bytesWritten += written + if err != nil { + m.failRange(req, err) + continue + } + + if dataRange.GetRangeEnd() { + req.completed = true + delete(m.pendingRanges, req.readID) + m.runCallback(req.origOffset, req.bytesWritten, nil, req.callback) + } + } + // Once all data in the initial response has been read out, free buffers. + result.decoder.databufs.Free() +} + +// ensureSession is now only for reconnecting *after* the initial session is up. +func (m *multiRangeDownloaderManager) ensureSession(ctx context.Context) error { + if m.currentSession != nil { + return nil + } + if m.permanentErr != nil { + return m.permanentErr + } + + // Using run for retries + return run(ctx, func(ctx context.Context) error { + if m.currentSession != nil { + return nil + } + if m.permanentErr != nil { + return m.permanentErr + } + + session, err := newBidiReadStreamSession(m.ctx, m.sessionResps, m.client, m.settings, m.params, proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec)) + if err != nil { + redirectErr, isRedirect := isRedirectError(err) + if isRedirect { + m.readSpec.RoutingToken = redirectErr.RoutingToken + m.readSpec.ReadHandle = redirectErr.ReadHandle + return fmt.Errorf("%w: %v", errBidiReadRedirect, err) + } + return err + } + m.currentSession = session + + var rangesToResend []*storagepb.ReadRange + for _, req := range m.pendingRanges { + if !req.completed { + readLength := req.length + if req.length > 0 { + readLength -= req.bytesWritten + } + if readLength < 0 { + readLength = 0 + } + + if req.length == 0 || readLength > 0 { + rangesToResend = append(rangesToResend, &storagepb.ReadRange{ + ReadOffset: req.offset + req.bytesWritten, + ReadLength: readLength, + ReadId: req.readID, + }) + } + } + } + if len(rangesToResend) > 0 { + m.currentSession.SendRequest(&storagepb.BidiReadObjectRequest{ReadRanges: rangesToResend}) + } + return nil + }, m.settings.retry, true) +} + +var errBidiReadRedirect = errors.New("bidi read object redirected") + +func (m *multiRangeDownloaderManager) handleStreamEnd(result mrdSessionResult) { + if m.currentSession != nil { + m.currentSession.Shutdown() + m.currentSession = nil + } + err := result.err + var ensureErr error + + if result.redirect != nil { + m.readSpec.RoutingToken = result.redirect.RoutingToken + m.readSpec.ReadHandle = result.redirect.ReadHandle + ensureErr = m.ensureSession(m.ctx) + } else if m.isRetryable(err) { + ensureErr = m.ensureSession(m.ctx) + } else { + if !errors.Is(err, context.Canceled) && !errors.Is(err, errClosed) { + m.setPermanentError(err) + } else if m.permanentErr == nil { + m.setPermanentError(errClosed) + } + m.failAllPending(m.permanentErr) + } + + // Handle error from ensureSession. + if ensureErr != nil && !m.isRetryable(ensureErr) { + m.setPermanentError(ensureErr) + m.failAllPending(m.permanentErr) + } +} + +func (m *multiRangeDownloaderManager) isRetryable(err error) bool { + if err == nil || errors.Is(err, context.Canceled) || errors.Is(err, errClosed) || err == io.EOF { + return false + } + if errors.Is(err, errBidiReadRedirect) { + return true + } + s, ok := status.FromError(err) + if !ok { + return false + } + switch s.Code() { + case codes.Unavailable, codes.ResourceExhausted, codes.Internal, codes.DeadlineExceeded: + return true + case codes.Aborted: + _, isRedirect := isRedirectError(err) + return isRedirect + default: + return false + } +} + +func (m *multiRangeDownloaderManager) failRange(req *rangeRequest, err error) { + if req.completed { + return + } + req.completed = true + delete(m.pendingRanges, req.readID) + m.runCallback(req.origOffset, req.bytesWritten, err, req.callback) +} + +func (m *multiRangeDownloaderManager) failAllPending(err error) { + for _, req := range m.pendingRanges { + if !req.completed { + req.completed = true + m.runCallback(req.origOffset, req.bytesWritten, err, req.callback) + } + } + m.pendingRanges = make(map[int64]*rangeRequest) +} + +// Set permanent error to the provided error, if it hasn't been set already. +func (m *multiRangeDownloaderManager) setPermanentError(err error) { + if m.permanentErr == nil { + m.permanentErr = err + } +} + +// --- bidiReadStreamSession --- +// Controls lifespan of an individual bi-directional gRPC stream to the +// object in GCS. Spins up goroutines for the read and write sides of the +// stream. +type bidiReadStreamSession struct { + ctx context.Context + cancel context.CancelFunc + + stream storagepb.Storage_BidiReadObjectClient + client *grpcStorageClient + settings *settings + params *newMultiRangeDownloaderParams + readSpec *storagepb.BidiReadObjectSpec + + reqC chan *storagepb.BidiReadObjectRequest + respC chan<- mrdSessionResult + wg sync.WaitGroup + + errOnce sync.Once + streamErr error } -func (g *readIDGenerator) init() { - g.nextID = make(chan int64, 1) - g.nextID <- 1 +func newBidiReadStreamSession(ctx context.Context, respC chan<- mrdSessionResult, client *grpcStorageClient, settings *settings, params *newMultiRangeDownloaderParams, readSpec *storagepb.BidiReadObjectSpec) (*bidiReadStreamSession, error) { + sCtx, cancel := context.WithCancel(ctx) + + s := &bidiReadStreamSession{ + ctx: sCtx, + cancel: cancel, + client: client, + settings: settings, + params: params, + readSpec: readSpec, + reqC: make(chan *storagepb.BidiReadObjectRequest, 100), + respC: respC, + } + + initialReq := &storagepb.BidiReadObjectRequest{ + ReadObjectSpec: s.readSpec, + } + reqCtx := gax.InsertMetadataIntoOutgoingContext(s.ctx, contextMetadataFromBidiReadObject(initialReq)...) + // Force the use of the custom codec to enable zero-copy reads. + s.settings.gax = append(s.settings.gax, gax.WithGRPCOptions( + grpc.ForceCodecV2(bytesCodecV2{}), + )) + + var err error + s.stream, err = client.raw.BidiReadObject(reqCtx, s.settings.gax...) + if err != nil { + cancel() + return nil, err + } + + if err := s.stream.Send(initialReq); err != nil { + s.stream.CloseSend() + cancel() + return nil, err + } + + s.wg.Add(2) + go s.sendLoop() + go s.receiveLoop() + + go func() { + s.wg.Wait() + s.cancel() + }() + + return s, nil } +func (s *bidiReadStreamSession) SendRequest(req *storagepb.BidiReadObjectRequest) { + select { + case s.reqC <- req: + case <-s.ctx.Done(): + } +} +func (s *bidiReadStreamSession) Shutdown() { + s.cancel() + s.wg.Wait() +} +func (s *bidiReadStreamSession) setError(err error) { + s.errOnce.Do(func() { + s.streamErr = err + }) +} +func (s *bidiReadStreamSession) sendLoop() { + defer s.wg.Done() + defer s.stream.CloseSend() + for { + select { + case req, ok := <-s.reqC: + if !ok { + return + } + if err := s.stream.Send(req); err != nil { + s.setError(err) + s.cancel() + return + } + case <-s.ctx.Done(): + return + } + } +} +func (s *bidiReadStreamSession) receiveLoop() { + defer s.wg.Done() + defer s.cancel() + for { + if err := s.ctx.Err(); err != nil { + return + } -// Next returns the Next read ID. It initializes the readIDGenerator if needed. -func (g *readIDGenerator) Next() int64 { - g.initOnce.Do(g.init) + // Receive message without a copy. + databufs := mem.BufferSlice{} + err := s.stream.RecvMsg(&databufs) + var decoder *readResponseDecoder + if err == nil { + // Use the custom decoder to parse the raw buffer without copying object data. + decoder = &readResponseDecoder{ + databufs: databufs, + } + err = decoder.readFullObjectResponse() + } - id := <-g.nextID - n := id + 1 - g.nextID <- n + if err != nil { + databufs.Free() + redirectErr, isRedirect := isRedirectError(err) + result := mrdSessionResult{err: err} + if isRedirect { + result.redirect = redirectErr + err = fmt.Errorf("%w: %v", errBidiReadRedirect, err) + result.err = err + } + s.setError(err) + + select { + case s.respC <- result: + case <-s.ctx.Done(): + } + return + } + + select { + case s.respC <- mrdSessionResult{decoder: decoder}: + case <-s.ctx.Done(): + return + } + } +} +func isRedirectError(err error) (*storagepb.BidiReadObjectRedirectedError, bool) { + st, ok := status.FromError(err) + if !ok { + return nil, false + } + if st.Code() != codes.Aborted { + return nil, false + } + for _, d := range st.Details() { + if bidiError, ok := d.(*storagepb.BidiReadObjectRedirectedError); ok { + if bidiError.RoutingToken != nil { + return bidiError, true + } + } + } + return nil, false +} - return id +func readerAttrsFromObject(o *ObjectAttrs) ReaderObjectAttrs { + if o == nil { + return ReaderObjectAttrs{} + } + return ReaderObjectAttrs{ + Size: o.Size, + ContentType: o.ContentType, + ContentEncoding: o.ContentEncoding, + CacheControl: o.CacheControl, + LastModified: o.Updated, + Generation: o.Generation, + Metageneration: o.Metageneration, + CRC32C: o.CRC32C, + } } diff --git a/storage/integration_test.go b/storage/integration_test.go index 2b2bf147d4c2..667ada1661f0 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -425,8 +425,8 @@ func TestIntegration_MultiRangeDownloader(t *testing.T) { want = content[k.offset : k.offset+k.limit] } if !bytes.Equal(k.buf.Bytes(), want) { - t.Errorf("Error in read range offset %v, limit %v, got: %v; want: %v", - k.offset, k.limit, k.buf.Bytes(), want) + t.Errorf("Error in read range offset %v, limit %v, got: %v bytes; want: %v bytes", + k.offset, k.limit, len(k.buf.Bytes()), len(want)) } if k.err != nil { t.Errorf("read range %v to %v : %v", k.offset, k.limit, k.err) diff --git a/storage/reader.go b/storage/reader.go index 0c5f5be0ab99..e76fb75cd3f5 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -161,11 +161,19 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) // preview; please contact your account manager if interested. The option // [experimental.WithGRPCBidiReads] or [experimental.WithZonalBucketAPIs] // must be selected in order to use this API. + +// NewMultiRangeDownloader creates a multi-range reader for an object. +// Must be called on a gRPC client created using [NewGRPCClient]. func (o *ObjectHandle) NewMultiRangeDownloader(ctx context.Context) (mrd *MultiRangeDownloader, err error) { - // This span covers the life of the reader. It is closed via the context - // in Reader.Close. - ctx, _ = startSpan(ctx, "Object.MultiRangeDownloader") - defer func() { endSpan(ctx, err) }() + // This span covers the life of the MRD. It is closed via the context + // in MultiRangeDownloader.Close. + var spanCtx context.Context + spanCtx, _ = startSpan(ctx, "Object.MultiRangeDownloader") + defer func() { + if err != nil { + endSpan(spanCtx, err) + } + }() if err := o.validate(); err != nil { return nil, err @@ -187,15 +195,8 @@ func (o *ObjectHandle) NewMultiRangeDownloader(ctx context.Context) (mrd *MultiR handle: &o.readHandle, } - r, err := o.c.tc.NewMultiRangeDownloader(ctx, params, opts...) - - // Pass the context so that the span can be closed in MultiRangeDownloader.Close(), or close the - // span now if there is an error. - if err == nil { - r.ctx = ctx - } - - return r, err + // This call will return the *MultiRangeDownloader with the .impl field set. + return o.c.tc.NewMultiRangeDownloader(spanCtx, params, opts...) } // decompressiveTranscoding returns true if the request was served decompressed @@ -387,17 +388,9 @@ func (r *Reader) ReadHandle() ReadHandle { // // This API is currently in preview and is not yet available for general use. type MultiRangeDownloader struct { - Attrs ReaderObjectAttrs - reader multiRangeDownloader - ctx context.Context -} - -type multiRangeDownloader interface { - add(output io.Writer, offset, limit int64, callback func(int64, int64, error)) - wait() - close() error - getHandle() []byte - error() error + // Attrs is populated when NewMultiRangeDownloader returns. + Attrs ReaderObjectAttrs + impl internalMultiRangeDownloader } // Add adds a new range to MultiRangeDownloader. @@ -421,7 +414,7 @@ type multiRangeDownloader interface { // of the read. Note that the length of the data read may be less than the // requested length if the end of the object is reached. func (mrd *MultiRangeDownloader) Add(output io.Writer, offset, length int64, callback func(int64, int64, error)) { - mrd.reader.add(output, offset, length, callback) + mrd.impl.add(output, offset, length, callback) } // Close the MultiRangeDownloader. It must be called when done reading. @@ -431,8 +424,8 @@ func (mrd *MultiRangeDownloader) Add(output io.Writer, offset, length int64, cal // "stream closed early" error if a response for a range is still not processed. // Call [MultiRangeDownloader.Wait] to avoid this error. func (mrd *MultiRangeDownloader) Close() error { - err := mrd.reader.close() - endSpan(mrd.ctx, err) + err := mrd.impl.close(nil) + endSpan(mrd.impl.getSpanCtx(), err) return err } @@ -440,18 +433,18 @@ func (mrd *MultiRangeDownloader) Close() error { // Adding new ranges after this has been called will cause an error. // Wait will wait for all callbacks to finish. func (mrd *MultiRangeDownloader) Wait() { - mrd.reader.wait() + mrd.impl.wait() } // GetHandle returns the read handle. This can be used to further speed up the // follow up read if the same object is read through a different stream. func (mrd *MultiRangeDownloader) GetHandle() []byte { - return mrd.reader.getHandle() + return mrd.impl.getHandle() // TODO: Consider plumbing context from caller } // Error returns an error if the MultiRangeDownloader is in a permanent failure // state. It returns a nil error if the MultiRangeDownloader is open and can be // used. func (mrd *MultiRangeDownloader) Error() error { - return mrd.reader.error() + return mrd.impl.getPermanentError() } diff --git a/storage/retry_conformance_test.go b/storage/retry_conformance_test.go index 5c8e60565759..f7135b828389 100644 --- a/storage/retry_conformance_test.go +++ b/storage/retry_conformance_test.go @@ -295,6 +295,7 @@ var methods = map[string][]retryFunc{ if err != nil { return err } + buf := new(bytes.Buffer) var err1 error callback := func(x, y int64, err error) {