Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions command/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ var app = &cli.App{
Name: "auth-google-adc",
Usage: "inject the authorization bearer token as a request header using Google Application Default Credentials. Set automatically if a file URI starts with gs:// and no endpoint is set",
},
&cli.BoolFlag{
Name: "use-grpc",
Usage: "use gRPC for Google Cloud Storage operations (requires Google Cloud Storage client library v1.46.0+)",
},
&cli.BoolFlag{
Name: "retry-on-forbidden",
Usage: "retry for Forbidden error code",
Expand Down Expand Up @@ -244,6 +248,7 @@ func NewStorageOpts(c *cli.Context) storage.Options {
LogLevel: log.LevelFromString(c.String("log")),
NoSuchUploadRetryCount: c.Int("no-such-upload-retry-count"),
AuthGoogleADC: c.Bool("auth-google-adc"),
UseGRPC: c.Bool("use-grpc"),
RetryForbidden: c.Bool("retry-on-forbidden"),
}
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/peak/s5cmd/v2
go 1.19

require (
cloud.google.com/go/storage v1.46.0
github.com/aws/aws-sdk-go v1.44.334
github.com/cheggaaa/pb/v3 v3.1.4
github.com/google/go-cmp v0.6.0
Expand All @@ -19,6 +20,7 @@ require (
github.com/urfave/cli/v2 v2.11.2
go.uber.org/mock v0.4.0
golang.org/x/oauth2 v0.25.0
google.golang.org/api v0.203.0
gotest.tools/v3 v3.0.3
)

Expand Down
248 changes: 248 additions & 0 deletions storage/gcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package storage

import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"time"

"cloud.google.com/go/storage"

Check failure on line 12 in storage/gcs.go

View workflow job for this annotation

GitHub Actions / qa (1.20.x, ubuntu)

missing go.sum entry for module providing package cloud.google.com/go/storage (imported by github.com/peak/s5cmd/v2/storage); to add:
"google.golang.org/api/iterator"

Check failure on line 13 in storage/gcs.go

View workflow job for this annotation

GitHub Actions / qa (1.20.x, ubuntu)

missing go.sum entry for module providing package google.golang.org/api/iterator (imported by github.com/peak/s5cmd/v2/storage); to add:

"github.com/peak/s5cmd/v2/log"
"github.com/peak/s5cmd/v2/storage/url"
)

// GCS is a storage type which interacts with Google Cloud Storage using gRPC.
type GCS struct {
client *storage.Client
dryRun bool
log log.Logger
}

// NewGRPCClient creates a new GCS client with gRPC support.
func NewGRPCClient(ctx context.Context, opts Options) (*GCS, error) {
// Create gRPC-enabled client
client, err := storage.NewGRPCClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC client: %w", err)
}

return &GCS{
client: client,
dryRun: opts.DryRun,
log: log.New(opts.LogLevel),
}, nil
}

// Close closes the GCS client connection.
func (g *GCS) Close() error {
if g.client != nil {
return g.client.Close()
}
return nil
}

// Stat returns the Object structure describing the object.
func (g *GCS) Stat(ctx context.Context, src *url.URL) (*Object, error) {
bucket := g.client.Bucket(src.Bucket)
obj := bucket.Object(src.Path)

attrs, err := obj.Attrs(ctx)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, &ErrGivenObjectNotFound{ObjectAbsPath: src.Absolute()}
}
return nil, err
}

return g.convertAttrsToObject(src, attrs), nil
}

// List lists objects and directories/prefixes in the src.
func (g *GCS) List(ctx context.Context, src *url.URL, followSymlinks bool) <-chan *Object {
ch := make(chan *Object)

go func() {
defer close(ch)

bucket := g.client.Bucket(src.Bucket)
query := &storage.Query{
Prefix: src.Path,
}

// If the path ends with a delimiter, we want to list objects with that prefix
if strings.HasSuffix(src.Path, "/") || src.Path == "" {
query.Delimiter = "/"
}

it := bucket.Objects(ctx, query)
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
obj := &Object{
URL: src,
Err: err,
}
ch <- obj
return
}

// Handle prefixes (directories)
if attrs.Prefix != "" {
objURL := src.Clone()
objURL.Path = attrs.Prefix
obj := &Object{
URL: objURL,
Type: ObjectType{mode: os.ModeDir | 0755},
}
ch <- obj
continue
}

// Handle regular objects
objURL := src.Clone()
objURL.Path = attrs.Name
obj := g.convertAttrsToObject(objURL, attrs)
ch <- obj
}
}()

return ch
}

// Delete deletes the object at the given URL.
func (g *GCS) Delete(ctx context.Context, src *url.URL) error {
if g.dryRun {
return nil
}

bucket := g.client.Bucket(src.Bucket)
obj := bucket.Object(src.Path)

if err := obj.Delete(ctx); err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return &ErrGivenObjectNotFound{ObjectAbsPath: src.Absolute()}
}
return err
}

return nil
}

// MultiDelete deletes multiple objects.
func (g *GCS) MultiDelete(ctx context.Context, urls <-chan *url.URL) <-chan *Object {
ch := make(chan *Object)

go func() {
defer close(ch)

for url := range urls {
err := g.Delete(ctx, url)
obj := &Object{
URL: url,
Err: err,
}
ch <- obj
}
}()

return ch
}

// Copy copies an object from src to dst.
func (g *GCS) Copy(ctx context.Context, src, dst *url.URL, metadata Metadata) error {
if g.dryRun {
return nil
}

srcBucket := g.client.Bucket(src.Bucket)
srcObj := srcBucket.Object(src.Path)

dstBucket := g.client.Bucket(dst.Bucket)
dstObj := dstBucket.Object(dst.Path)

// Check if src and dst are in the same bucket for server-side copy
if src.Bucket == dst.Bucket {
// Server-side copy
copier := dstObj.CopierFrom(srcObj)

// Apply metadata if provided
if metadata.ContentType != "" {
copier.ContentType = metadata.ContentType
}
if metadata.CacheControl != "" {
copier.CacheControl = metadata.CacheControl
}
if metadata.ContentEncoding != "" {
copier.ContentEncoding = metadata.ContentEncoding
}
if metadata.ContentDisposition != "" {
copier.ContentDisposition = metadata.ContentDisposition
}
if metadata.StorageClass != "" {
copier.StorageClass = metadata.StorageClass
}
if metadata.UserDefined != nil {
copier.Metadata = metadata.UserDefined
}

_, err := copier.Run(ctx)
return err
}

// Cross-bucket copy: download then upload
reader, err := srcObj.NewReader(ctx)
if err != nil {
return fmt.Errorf("failed to read source object: %w", err)
}
defer reader.Close()

writer := dstObj.NewWriter(ctx)

// Apply metadata
if metadata.ContentType != "" {
writer.ContentType = metadata.ContentType
}
if metadata.CacheControl != "" {
writer.CacheControl = metadata.CacheControl
}
if metadata.ContentEncoding != "" {
writer.ContentEncoding = metadata.ContentEncoding
}
if metadata.ContentDisposition != "" {
writer.ContentDisposition = metadata.ContentDisposition
}
if metadata.StorageClass != "" {
writer.StorageClass = metadata.StorageClass
}
if metadata.UserDefined != nil {
writer.Metadata = metadata.UserDefined
}

if _, err := io.Copy(writer, reader); err != nil {
writer.Close()
return fmt.Errorf("failed to copy data: %w", err)
}

return writer.Close()
}

// convertAttrsToObject converts GCS ObjectAttrs to storage.Object.
func (g *GCS) convertAttrsToObject(u *url.URL, attrs *storage.ObjectAttrs) *Object {
modTime := attrs.Updated
return &Object{
URL: u,
Etag: attrs.Etag,
ModTime: &modTime,
Type: ObjectType{mode: 0644},
Size: attrs.Size,
StorageClass: StorageClass(attrs.StorageClass),
}
}
9 changes: 8 additions & 1 deletion storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ func NewLocalClient(opts Options) *Filesystem {
return &Filesystem{dryRun: opts.DryRun}
}

func NewRemoteClient(ctx context.Context, url *url.URL, opts Options) (*S3, error) {
func NewRemoteClient(ctx context.Context, url *url.URL, opts Options) (Storage, error) {
// If gRPC is enabled and this is a GCS URL, use the native GCS client
if opts.UseGRPC && url.Scheme == "gs" {
return NewGRPCClient(ctx, opts)
}

newOpts := Options{
MaxRetries: opts.MaxRetries,
NoSuchUploadRetryCount: opts.NoSuchUploadRetryCount,
Expand All @@ -75,6 +80,7 @@ func NewRemoteClient(ctx context.Context, url *url.URL, opts Options) (*S3, erro
bucket: url.Bucket,
region: opts.region,
AuthGoogleADC: opts.AuthGoogleADC,
UseGRPC: opts.UseGRPC,
RetryForbidden: opts.RetryForbidden,
}

Expand Down Expand Up @@ -104,6 +110,7 @@ type Options struct {
bucket string
region string
AuthGoogleADC bool
UseGRPC bool
RetryForbidden bool
}

Expand Down
Loading