diff --git a/storage/doc.go b/storage/doc.go index 3726432c86f1..d0434db2935f 100644 --- a/storage/doc.go +++ b/storage/doc.go @@ -407,6 +407,10 @@ roles which must be enabled in order to do the export successfully. To disable this export, you can use the [WithDisabledClientMetrics] client option. +The gRPC client automatically computes and sends CRC32C checksums for uploads using [Writer], +which provides an additional layer of data integrity validation when compared to the HTTP client. +This behavior can optionally be disabled by using [Writer.DisableAutoChecksum]. + # Storage Control API Certain control plane and long-running operations for Cloud Storage (including Folder diff --git a/storage/grpc_writer.go b/storage/grpc_writer.go index 2af970ecb83d..b8773b1700b4 100644 --- a/storage/grpc_writer.go +++ b/storage/grpc_writer.go @@ -819,11 +819,12 @@ func bidiWriteObjectRequest(r gRPCBidiWriteRequest, bufChecksum *uint32, objectC } type getObjectChecksumsParams struct { - fullObjectChecksum func() uint32 - finishWrite bool sendCRC32C bool disableAutoChecksum bool - attrs *ObjectAttrs + objectAttrs *ObjectAttrs + fullObjectChecksum func() uint32 + finishWrite bool + takeoverWriter bool } // getObjectChecksums determines what checksum information to include in the final @@ -840,9 +841,10 @@ func getObjectChecksums(params *getObjectChecksumsParams) *storagepb.ObjectCheck // send user's checksum on last write op if available if params.sendCRC32C { - return toProtoChecksums(params.sendCRC32C, params.attrs) + return toProtoChecksums(params.sendCRC32C, params.objectAttrs) } - if params.disableAutoChecksum { + // TODO(b/461982277): Enable checksum validation for appendable takeover writer gRPC + if params.disableAutoChecksum || params.takeoverWriter { return nil } return &storagepb.ObjectChecksums{ @@ -879,8 +881,11 @@ type gRPCOneshotBidiWriteBufferSender struct { firstMessage *storagepb.BidiWriteObjectRequest streamErr error - checksumSettings func() (bool, bool, *ObjectAttrs) - fullObjectChecksum func() uint32 + // Checksum related settings. + sendCRC32C bool + disableAutoChecksum bool + objectAttrs *ObjectAttrs + fullObjectChecksum func() uint32 } func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWriteBufferSender { @@ -894,9 +899,9 @@ func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWrite CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), }, - checksumSettings: func() (bool, bool, *ObjectAttrs) { - return w.sendCRC32C, w.disableAutoChecksum, w.attrs - }, + sendCRC32C: w.sendCRC32C, + disableAutoChecksum: w.disableAutoChecksum, + objectAttrs: w.attrs, fullObjectChecksum: func() uint32 { return w.fullObjectChecksum }, @@ -939,17 +944,16 @@ func (s *gRPCOneshotBidiWriteBufferSender) connect(ctx context.Context, cs gRPCB continue } - sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings() var bufChecksum *uint32 - if !disableAutoChecksum { + if !s.disableAutoChecksum { bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable)) } objectChecksums := getObjectChecksums(&getObjectChecksumsParams{ + sendCRC32C: s.sendCRC32C, + objectAttrs: s.objectAttrs, fullObjectChecksum: s.fullObjectChecksum, + disableAutoChecksum: s.disableAutoChecksum, finishWrite: r.finishWrite, - sendCRC32C: sendCrc32C, - disableAutoChecksum: disableAutoChecksum, - attrs: attrs, }) req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums) @@ -996,8 +1000,11 @@ type gRPCResumableBidiWriteBufferSender struct { startWriteRequest *storagepb.StartResumableWriteRequest upid string - checksumSettings func() (bool, bool, *ObjectAttrs) - fullObjectChecksum func() uint32 + // Checksum related settings. + sendCRC32C bool + disableAutoChecksum bool + objectAttrs *ObjectAttrs + fullObjectChecksum func() uint32 streamErr error } @@ -1011,9 +1018,9 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() *gRPCResumableBidiW CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), }, - checksumSettings: func() (bool, bool, *ObjectAttrs) { - return w.sendCRC32C, w.disableAutoChecksum, w.attrs - }, + sendCRC32C: w.sendCRC32C, + disableAutoChecksum: w.disableAutoChecksum, + objectAttrs: w.attrs, fullObjectChecksum: func() uint32 { return w.fullObjectChecksum }, @@ -1076,17 +1083,16 @@ func (s *gRPCResumableBidiWriteBufferSender) connect(ctx context.Context, cs gRP continue } - sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings() var bufChecksum *uint32 - if !disableAutoChecksum { + if !s.disableAutoChecksum { bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable)) } objectChecksums := getObjectChecksums(&getObjectChecksumsParams{ + sendCRC32C: s.sendCRC32C, + objectAttrs: s.objectAttrs, fullObjectChecksum: s.fullObjectChecksum, + disableAutoChecksum: s.disableAutoChecksum, finishWrite: r.finishWrite, - sendCRC32C: sendCrc32C, - disableAutoChecksum: disableAutoChecksum, - attrs: attrs, }) req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums) @@ -1142,12 +1148,18 @@ type gRPCAppendBidiWriteBufferSender struct { bucket string routingToken *string - firstMessage *storagepb.BidiWriteObjectRequest - - objectChecksums *storagepb.ObjectChecksums + firstMessage *storagepb.BidiWriteObjectRequest finalizeOnClose bool objResource *storagepb.Object + // Checksum related settings. + sendCRC32C bool + disableAutoChecksum bool + objectAttrs *ObjectAttrs + fullObjectChecksum func() uint32 + + takeoverWriter bool + streamErr error } @@ -1164,8 +1176,13 @@ func (w *gRPCWriter) newGRPCAppendableObjectBufferSender() *gRPCAppendBidiWriteB }, CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), }, - objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), - finalizeOnClose: w.finalizeOnClose, + finalizeOnClose: w.finalizeOnClose, + sendCRC32C: w.sendCRC32C, + disableAutoChecksum: w.disableAutoChecksum, + objectAttrs: w.attrs, + fullObjectChecksum: func() uint32 { + return w.fullObjectChecksum + }, } } @@ -1278,8 +1295,14 @@ func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender() *gRPCAppendTakeove AppendObjectSpec: writeObjectSpecAsAppendObjectSpec(w.spec, w.appendGen), }, }, - objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), - finalizeOnClose: w.finalizeOnClose, + finalizeOnClose: w.finalizeOnClose, + takeoverWriter: true, + sendCRC32C: w.sendCRC32C, + disableAutoChecksum: w.disableAutoChecksum, + objectAttrs: w.attrs, + fullObjectChecksum: func() uint32 { + return w.fullObjectChecksum + }, }, takeoverReported: false, handleTakeoverCompletion: func(c gRPCBidiWriteCompletion) { @@ -1409,12 +1432,20 @@ func (s *gRPCAppendBidiWriteBufferSender) send(stream storagepb.Storage_BidiWrit flush: flush, finishWrite: finalizeObject, } - // TODO(b/453869602): implement default checksumming for appendable writes - req := bidiWriteObjectRequest(r, nil, nil) - if finalizeObject { - // appendable objects pass checksums on the finalize message only - req.ObjectChecksums = s.objectChecksums + + var bufChecksum *uint32 + if !s.disableAutoChecksum { + bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable)) } + objectChecksums := getObjectChecksums(&getObjectChecksumsParams{ + sendCRC32C: s.sendCRC32C, + objectAttrs: s.objectAttrs, + fullObjectChecksum: s.fullObjectChecksum, + disableAutoChecksum: s.disableAutoChecksum, + finishWrite: finalizeObject, + takeoverWriter: s.takeoverWriter, + }) + req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums) if sendFirstMessage { proto.Merge(req, s.firstMessage) } diff --git a/storage/grpc_writer_test.go b/storage/grpc_writer_test.go index 1ed388959d81..ce53a2999761 100644 --- a/storage/grpc_writer_test.go +++ b/storage/grpc_writer_test.go @@ -27,6 +27,7 @@ func TestGetObjectChecksums(t *testing.T) { fullObjectChecksum func() uint32 finishWrite bool sendCRC32C bool + takeoverWriter bool disableAutoChecksum bool attrs *ObjectAttrs want *storagepb.ObjectChecksums @@ -73,16 +74,24 @@ func TestGetObjectChecksums(t *testing.T) { Crc32C: proto.Uint32(456), }, }, + // TODO(b/461982277): remove this testcase once checksums for takeover writer is implemented + { + name: "takeover writer should return nil", + finishWrite: true, + takeoverWriter: true, + want: nil, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got := getObjectChecksums(&getObjectChecksumsParams{ + disableAutoChecksum: tt.disableAutoChecksum, + sendCRC32C: tt.sendCRC32C, + objectAttrs: tt.attrs, fullObjectChecksum: tt.fullObjectChecksum, finishWrite: tt.finishWrite, - sendCRC32C: tt.sendCRC32C, - disableAutoChecksum: tt.disableAutoChecksum, - attrs: tt.attrs, + takeoverWriter: tt.takeoverWriter, }) if !proto.Equal(got, tt.want) { t.Errorf("getObjectChecksums() = %v, want %v", got, tt.want) diff --git a/storage/integration_test.go b/storage/integration_test.go index c4b29cae8a2b..235ec97960ee 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -1844,6 +1844,111 @@ func TestIntegration_MultiChunkWrite(t *testing.T) { }) } +func TestIntegration_WriterCRC32CValidation(t *testing.T) { + ctx := skipZonalBucket(context.Background(), "Test for resumable and oneshot writers") + h := testHelper{t} + multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) { + testCases := []struct { + name string + content []byte + chunkSize int + sendCRC32C bool + disableAutoChecksum bool + incorrectChecksum bool + wantErr bool + }{ + { + name: "oneshot with user-sent CRC32C", + content: []byte("small content for oneshot upload"), + chunkSize: 0, + sendCRC32C: true, + }, + { + name: "oneshot with default CRC32", + content: []byte("small content for oneshot upload"), + chunkSize: 0, + }, + { + name: "oneshot with disabled auto checksum", + content: []byte("small content for oneshot upload"), + chunkSize: 0, + disableAutoChecksum: true, + }, + { + name: "resumable with user-sent CRC32C", + content: bytes.Repeat([]byte("a"), 1*MiB), + chunkSize: 256 * 1024, + sendCRC32C: true, + }, + { + name: "resumable with default CRC32", + content: bytes.Repeat([]byte("a"), 1*MiB), + chunkSize: 256 * 1024, + }, + { + name: "resumable with disabled auto checksum", + content: bytes.Repeat([]byte("a"), 1*MiB), + chunkSize: 256 * 1024, + disableAutoChecksum: true, + }, + { + name: "resumable with incorrect checksum", + content: bytes.Repeat([]byte("a"), 1*MiB), + chunkSize: 256 * 1024, + sendCRC32C: true, + incorrectChecksum: true, + wantErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + correctCRC32C := crc32.Checksum(tc.content, crc32cTable) + obj := client.Bucket(bucket).Object(uidSpaceObjects.New()) + t.Cleanup(func() { + h.mustDeleteObject(obj) + }) + + w := obj.NewWriter(ctx) + w.ChunkSize = tc.chunkSize + w.SendCRC32C = tc.sendCRC32C + w.CRC32C = correctCRC32C + if tc.incorrectChecksum { + w.CRC32C = correctCRC32C + 1 + } + w.DisableAutoChecksum = tc.disableAutoChecksum + + if _, err := w.Write(tc.content); err != nil { + t.Fatalf("Writer.Write: %v", err) + } + err := w.Close() + if tc.incorrectChecksum { + if !errorIsStatusCode(err, http.StatusBadRequest, codes.InvalidArgument) { + t.Fatalf("expected an InvalidArgument error for incorrect checksum, but got %v", err) + } + return + } + if err != nil { + t.Fatalf("Writer.Close: %v", err) + } + // Verify content. + r, err := obj.NewReader(ctx) + if err != nil { + t.Fatalf("NewReader failed: %v", err) + } + defer r.Close() + gotContent, err := io.ReadAll(r) + if err != nil { + t.Fatalf("ReadAll failed: %v", err) + } + if !bytes.Equal(gotContent, tc.content) { + t.Errorf("content mismatch: got %d bytes, want %d bytes", len(gotContent), len(tc.content)) + } + }) + } + }) +} + func TestIntegration_ConditionalDownload(t *testing.T) { multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) { h := testHelper{t} @@ -3276,11 +3381,14 @@ func TestIntegration_WriterAppend(t *testing.T) { bkt := client.Bucket(bucket) testCases := []struct { - name string - finalize bool - content []byte - chunkSize int - flushOffset int64 + name string + finalize bool + content []byte + chunkSize int + flushOffset int64 + sendCRC bool + disableAutoChecksum bool + incorrectChecksum bool }{ { name: "finalized_object", @@ -3289,6 +3397,29 @@ func TestIntegration_WriterAppend(t *testing.T) { chunkSize: 4 * MiB, flushOffset: -1, // no flush }, + { + name: "finalized_object with user checksum", + finalize: true, + content: randomBytes9MiB, + chunkSize: 4 * MiB, + sendCRC: true, + flushOffset: -1, // no flush + }, + { + name: "finalized_object with disabled checksum", + finalize: true, + content: randomBytes9MiB, + chunkSize: 4 * MiB, + disableAutoChecksum: true, + flushOffset: -1, // no flush + }, + { + name: "finalized_object with incorrect checksum", + finalize: true, + content: randomBytes9MiB, + chunkSize: 4 * MiB, + incorrectChecksum: true, + }, { name: "unfinalized_object", finalize: false, @@ -3334,8 +3465,25 @@ func TestIntegration_WriterAppend(t *testing.T) { w.Append = true w.FinalizeOnClose = tc.finalize w.ChunkSize = tc.chunkSize + w.SendCRC32C = tc.sendCRC + w.DisableAutoChecksum = tc.disableAutoChecksum + w.CRC32C = crc32.Checksum(tc.content, crc32cTable) content := tc.content + // If incorrectChecksum is true, write data and close writer + // immediately to validate if writer returns error + if tc.incorrectChecksum { + w.CRC32C++ // simulate incorrect checksum + w.SendCRC32C = true + if _, err := w.Write(content); err != nil { + t.Fatalf("writer.Write: %v", err) + } + if err := w.Close(); !errorIsStatusCode(err, http.StatusBadRequest, codes.InvalidArgument) { + t.Fatalf("expected an InvalidArgument error for incorrect checksum, but got %v", err) + } + return + } + // If flushOffset is 0, just do a flush and check the attributes. if tc.flushOffset == 0 { if _, err := w.Flush(); err != nil { diff --git a/storage/writer.go b/storage/writer.go index cba6184cb115..e65e714d5e21 100644 --- a/storage/writer.go +++ b/storage/writer.go @@ -65,9 +65,14 @@ type Writer struct { // from being sent. If SendCRC32C is true and the Writer's CRC32C field is // populated, that checksum will still be sent to GCS for validation. // + // Automatic CRC32C checksum calculation introduces increased CPU overhead + // because of checksum computation in gRPC writes. Use this field to disable + // it if needed. + // // Note: DisableAutoChecksum must be set before the first call to // Writer.Write(). Automatic checksumming is not enabled for writes - // using the HTTP client or for unfinalized writes to appendable objects in gRPC. + // using the HTTP client or for full object checksums for unfinalized writes to + // appendable objects in gRPC. DisableAutoChecksum bool // ChunkSize controls the maximum number of bytes of the object that the