Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package main
import (
_ "net/http/pprof"
"os"
"strconv"
"time"

"github.com/juicedata/juicefs/pkg/chunk"
Expand Down Expand Up @@ -54,6 +55,11 @@ func cmdGateway() *cli.Command {
Name: "keep-etag",
Usage: "keep the ETag for uploaded objects",
},
&cli.StringFlag{
Name: "umask",
Value: "022",
Usage: "umask for new file in octal",
},
}

compoundFlags := [][]cli.Flag{
Expand Down Expand Up @@ -157,7 +163,13 @@ func (g *GateWay) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, er
addr := c.Args().Get(0)
removePassword(addr)
m, store, conf := initForSvc(c, "s3gateway", addr)
return jfsgateway.NewJFSGateway(conf, m, store, c.Bool("multi-buckets"), c.Bool("keep-etag"))

umask, err := strconv.ParseUint(c.String("umask"), 8, 16)
if err != nil {
logger.Fatalf("invalid umask %s: %s", c.String("umask"), err)
}

return jfsgateway.NewJFSGateway(conf, m, store, &jfsgateway.Config{MultiBucket: c.Bool("multi-buckets"), KeepEtag: c.Bool("keep-etag"), Mode: uint16(0777 &^ umask)})
}

func initForSvc(c *cli.Context, mp string, metaUrl string) (meta.Meta, chunk.ChunkStore, *vfs.Config) {
Expand Down
39 changes: 22 additions & 17 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,27 @@ const (
var mctx meta.Context
var logger = utils.GetLogger("juicefs")

func NewJFSGateway(conf *vfs.Config, m meta.Meta, store chunk.ChunkStore, multiBucket, keepEtag bool) (minio.ObjectLayer, error) {
type Config struct {
MultiBucket bool
KeepEtag bool
Mode uint16
}

func NewJFSGateway(conf *vfs.Config, m meta.Meta, store chunk.ChunkStore, gConf *Config) (minio.ObjectLayer, error) {
jfs, err := fs.NewFileSystem(conf, m, store)
if err != nil {
return nil, fmt.Errorf("Initialize failed: %s", err)
}
mctx = meta.NewContext(uint32(os.Getpid()), uint32(os.Getuid()), []uint32{uint32(os.Getgid())})
return &jfsObjects{fs: jfs, conf: conf, listPool: minio.NewTreeWalkPool(time.Minute * 30), multiBucket: multiBucket, keepEtag: keepEtag}, nil
return &jfsObjects{fs: jfs, conf: conf, listPool: minio.NewTreeWalkPool(time.Minute * 30), gConf: gConf}, nil
}

type jfsObjects struct {
minio.GatewayUnsupported
conf *vfs.Config
fs *fs.FileSystem
listPool *minio.TreeWalkPool
multiBucket bool
keepEtag bool
conf *vfs.Config
fs *fs.FileSystem
listPool *minio.TreeWalkPool
gConf *Config
}

func (n *jfsObjects) IsCompressionSupported() bool {
Expand Down Expand Up @@ -144,7 +149,7 @@ func jfsToObjectErr(ctx context.Context, err error, params ...string) error {

// isValidBucketName verifies whether a bucket name is valid.
func (n *jfsObjects) isValidBucketName(bucket string) bool {
if !n.multiBucket && bucket != n.conf.Format.Name {
if !n.gConf.MultiBucket && bucket != n.conf.Format.Name {
return false
}
return s3utils.CheckValidBucketNameStrict(bucket) == nil
Expand Down Expand Up @@ -173,7 +178,7 @@ func (n *jfsObjects) DeleteBucket(ctx context.Context, bucket string, forceDelet
if !n.isValidBucketName(bucket) {
return minio.BucketNameInvalid{Bucket: bucket}
}
if !n.multiBucket {
if !n.gConf.MultiBucket {
return minio.BucketNotEmpty{Bucket: bucket}
}
eno := n.fs.Delete(mctx, n.path(bucket))
Expand All @@ -184,7 +189,7 @@ func (n *jfsObjects) MakeBucketWithLocation(ctx context.Context, bucket string,
if !n.isValidBucketName(bucket) {
return minio.BucketNameInvalid{Bucket: bucket}
}
if !n.multiBucket {
if !n.gConf.MultiBucket {
return nil
}
eno := n.fs.Mkdir(mctx, n.path(bucket), 0755)
Expand Down Expand Up @@ -214,7 +219,7 @@ func isReservedOrInvalidBucket(bucketEntry string, strict bool) bool {
}

func (n *jfsObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInfo, err error) {
if !n.multiBucket {
if !n.gConf.MultiBucket {
fi, eno := n.fs.Stat(mctx, "/")
if eno != 0 {
return nil, jfsToObjectErr(ctx, eno)
Expand Down Expand Up @@ -472,7 +477,7 @@ func (n *jfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
}

var etag []byte
if n.keepEtag {
if n.gConf.KeepEtag {
etag, _ = n.fs.GetXattr(mctx, src, s3Etag)
if len(etag) != 0 {
eno = n.fs.SetXattr(mctx, dst, s3Etag, etag, 0)
Expand Down Expand Up @@ -546,7 +551,7 @@ func (n *jfsObjects) GetObjectInfo(ctx context.Context, bucket, object string, o
return
}
var etag []byte
if n.keepEtag {
if n.gConf.KeepEtag {
etag, _ = n.fs.GetXattr(mctx, n.path(bucket, object), s3Etag)
}
return minio.ObjectInfo{
Expand Down Expand Up @@ -586,7 +591,7 @@ func (n *jfsObjects) mkdirAll(ctx context.Context, p string, mode os.FileMode) e
func (n *jfsObjects) putObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (err error) {
tmpname := n.tpath(bucket, "tmp", minio.MustGetUUID())
_ = n.mkdirAll(ctx, path.Dir(tmpname), 0755)
f, eno := n.fs.Create(mctx, tmpname, 0644)
f, eno := n.fs.Create(mctx, tmpname, n.gConf.Mode)
if eno != 0 {
logger.Errorf("create %s: %s", tmpname, eno)
err = eno
Expand Down Expand Up @@ -659,7 +664,7 @@ func (n *jfsObjects) PutObject(ctx context.Context, bucket string, object string
return objInfo, jfsToObjectErr(ctx, eno, bucket, object)
}
etag := r.MD5CurrentHexString()
if n.keepEtag {
if n.gConf.KeepEtag {
eno = n.fs.SetXattr(mctx, p, s3Etag, []byte(etag), 0)
if eno != 0 {
logger.Errorf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", p, s3Etag, etag, 0)
Expand Down Expand Up @@ -837,7 +842,7 @@ func (n *jfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object

tmp := n.ppath(bucket, uploadID, "complete")
_ = n.fs.Delete(mctx, tmp)
_, eno := n.fs.Create(mctx, tmp, 0755)
_, eno := n.fs.Create(mctx, tmp, n.gConf.Mode)
if eno != 0 {
err = jfsToObjectErr(ctx, eno, bucket, object, uploadID)
logger.Errorf("create complete: %s", err)
Expand Down Expand Up @@ -885,7 +890,7 @@ func (n *jfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object

// Calculate s3 compatible md5sum for complete multipart.
s3MD5 := minio.ComputeCompleteMultipartMD5(parts)
if n.keepEtag {
if n.gConf.KeepEtag {
eno = n.fs.SetXattr(mctx, name, s3Etag, []byte(s3MD5), 0)
if eno != 0 {
logger.Warnf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", name, s3Etag, s3MD5, 0)
Expand Down