Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,22 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
return nil, err
}

return &clientDaemon{
once: &sync.Once{},
done: make(chan bool),
schedPeerHost: host,
Option: *opt,
objectStorage, err := objectstorage.New(opt, dynconfig, peerTaskManager, storageManager)
if err != nil {
return nil, err
}

return &clientDaemon{
once: &sync.Once{},
done: make(chan bool),
schedPeerHost: host,
Option: *opt,
RPCManager: rpcManager,
PeerTaskManager: peerTaskManager,
PieceManager: pieceManager,
ProxyManager: proxyManager,
UploadManager: uploadManager,
ObjectStorage: objectstorage.New(dynconfig, peerTaskManager, storageManager),
ObjectStorage: objectStorage,
StorageManager: storageManager,
GCManager: gc.NewManager(opt.GCInterval.Duration),
dynconfig: dynconfig,
Expand Down
126 changes: 118 additions & 8 deletions client/daemon/objectstorage/objectstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,28 @@ package objectstorage

import (
"context"
"errors"
"math"
"net"
"net/http"
"strconv"
"strings"
"time"

"github.com/gin-gonic/gin"
"github.com/go-http-utils/headers"

"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/peer"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/pkg/objectstorage"
"d7y.io/dragonfly/v2/pkg/rpc/base"
)

const (
// defaultSignExpireTime is default expire of sign url.
defaultSignExpireTime = 5 * time.Minute
)

// ObjectStorage is the interface used for object storage server.
Expand All @@ -40,25 +54,27 @@ type ObjectStorage interface {
// objectStorage provides object storage function.
type objectStorage struct {
*http.Server
dynconfig config.Dynconfig
peeTaskManager peer.TaskManager
storageManager storage.Manager
dynconfig config.Dynconfig
peerTaskManager peer.TaskManager
storageManager storage.Manager
peerIDGenerator peer.IDGenerator
}

// New returns a new ObjectStorage instence.
func New(dynconfig config.Dynconfig, peerTaskManager peer.TaskManager, storageManager storage.Manager) ObjectStorage {
func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, peerTaskManager peer.TaskManager, storageManager storage.Manager) (ObjectStorage, error) {
o := &objectStorage{
dynconfig: dynconfig,
peeTaskManager: peerTaskManager,
storageManager: storageManager,
dynconfig: dynconfig,
peerTaskManager: peerTaskManager,
storageManager: storageManager,
peerIDGenerator: peer.NewPeerIDGenerator(cfg.Host.AdvertiseIP),
}

router := o.initRouter()
o.Server = &http.Server{
Handler: router,
}

return o
return o, nil
}

// Started object storage server.
Expand Down Expand Up @@ -98,6 +114,67 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

var (
urlMeta *base.UrlMeta
artifactRange *clientutil.Range
ranges []clientutil.Range
err error
)

// Parse http range header.
rangeHeader := ctx.GetHeader(headers.Range)
if len(rangeHeader) > 0 {
ranges, err = o.parseRangeHeader(rangeHeader)
if err != nil {
ctx.JSON(http.StatusRequestedRangeNotSatisfiable, gin.H{"errors": err.Error()})
return
}
artifactRange = &ranges[0]

// Range header in dragonfly is without "bytes=".
urlMeta.Range = strings.TrimLeft(rangeHeader, "bytes=")
}

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

meta, err := client.GetObjectMetadata(ctx, params.ID, params.ObjectKey)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
urlMeta.Digest = meta.Digest

signURL, err := client.GetSignURL(ctx, params.ID, params.ObjectKey, objectstorage.MethodGet, defaultSignExpireTime)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

reader, attr, err := o.peerTaskManager.StartStreamTask(ctx, &peer.StreamTaskRequest{
URL: signURL,
URLMeta: urlMeta,
Range: artifactRange,
PeerID: o.peerIDGenerator.PeerID(),
})
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
defer reader.Close()

var contentLength int64 = -1
if l, ok := attr[headers.ContentLength]; ok {
if i, err := strconv.ParseInt(l, 10, 64); err == nil {
contentLength = i
}
}

ctx.DataFromReader(http.StatusOK, contentLength, attr[headers.ContentType], reader, nil)
}

// createObject uses to upload object data.
Expand All @@ -114,3 +191,36 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
return
}
}

// client uses to generate client of object storage.
func (o *objectStorage) client() (objectstorage.ObjectStorage, error) {
config, err := o.dynconfig.GetObjectStorage()
if err != nil {
return nil, err
}

client, err := objectstorage.New(config.Name, config.Region, config.Endpoint, config.AccessKey, config.SecretKey)
if err != nil {
return nil, err
}

return client, nil
}

// parseRangeHeader uses to parse range http header for dragonfly.
func (o *objectStorage) parseRangeHeader(rangeHeader string) ([]clientutil.Range, error) {
ranges, err := clientutil.ParseRange(rangeHeader, math.MaxInt)
if err != nil {
return nil, err
}

if len(ranges) > 1 {
return nil, errors.New("multiple range is not supported")
}

if len(ranges) == 0 {
return nil, errors.New("zero range is not supported")
}

return ranges, nil
}
32 changes: 30 additions & 2 deletions pkg/objectstorage/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,37 @@
package objectstorage

const (
// ServiceNameS3 is name of s3 storage
// ServiceNameS3 is name of s3 storage.
ServiceNameS3 = "s3"

// ServiceNameOSS is name of oss storage
// ServiceNameOSS is name of oss storage.
ServiceNameOSS = "oss"
)

const (
// MetaDigest is key of digest meta.
MetaDigest = "digest"
)

// Method is the client operation method .
type Method string

const (
// MethodHead is the head operation.
MethodHead Method = "HEAD"

// MethodGet is the get operation.
MethodGet Method = "GET"

// MethodPut is the put operation.
MethodPut Method = "PUT"

// MethodPost is the post operation.
MethodPost Method = "POST"

// MethodDelete is the delete operation.
MethodDelete Method = "Delete"

// MethodList is the list operation.
MethodList Method = "List"
)
24 changes: 15 additions & 9 deletions pkg/objectstorage/objectstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,33 @@ import (
)

type ObjectMetadata struct {
// Key is object key
// Key is object key.
Key string

// ContentDisposition is Content-Disposition header
// ContentDisposition is Content-Disposition header.
ContentDisposition string

// ContentEncoding is Content-Encoding header
// ContentEncoding is Content-Encoding header.
ContentEncoding string

// ContentLanguage is Content-Language header
// ContentLanguage is Content-Language header.
ContentLanguage string

// ContentLanguage is Content-Length header
// ContentLanguage is Content-Length header.
ContentLength int64

// ContentType is Content-Type header
// ContentType is Content-Type header.
ContentType string

// Etag is Etag header
// Etag is Etag header.
Etag string

// Digest is object digest.
Digest string
}

type BucketMetadata struct {
// Name is bucket name
// Name is bucket name.
Name string

// CreateAt is bucket create time.
Expand All @@ -76,13 +79,16 @@ type ObjectStorage interface {
GetOject(ctx context.Context, bucketName, objectKey string) (io.ReadCloser, error)

// CreateObject creates data of object.
CreateObject(ctx context.Context, bucketName, objectKey string, reader io.Reader) error
CreateObject(ctx context.Context, bucketName, objectKey, digest string, reader io.Reader) error

// DeleteObject deletes data of object.
DeleteObject(ctx context.Context, bucketName, objectKey string) error

// ListObjectMetadatas returns metadata of objects.
ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker string, limit int64) ([]*ObjectMetadata, error)

// GetSignURL returns sign url of object.
GetSignURL(ctx context.Context, bucketName, objectKey string, method Method, expire time.Duration) (string, error)
}

// New object storage interface.
Expand Down
35 changes: 33 additions & 2 deletions pkg/objectstorage/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"strconv"
"time"

aliyunoss "github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/go-http-utils/headers"
Expand Down Expand Up @@ -109,6 +110,7 @@ func (o *oss) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin
ContentLength: contentLength,
ContentType: header.Get(headers.ContentType),
Etag: header.Get(headers.ETag),
Digest: header.Get(aliyunoss.HTTPHeaderOssMetaPrefix + MetaDigest),
}, nil
}

Expand All @@ -123,13 +125,14 @@ func (o *oss) GetOject(ctx context.Context, bucketName, objectKey string) (io.Re
}

// CreateObject creates data of object.
func (o *oss) CreateObject(ctx context.Context, bucketName, objectKey string, reader io.Reader) error {
func (o *oss) CreateObject(ctx context.Context, bucketName, objectKey, digest string, reader io.Reader) error {
bucket, err := o.client.Bucket(bucketName)
if err != nil {
return err
}

return bucket.PutObject(objectKey, reader)
meta := aliyunoss.Meta(MetaDigest, digest)
return bucket.PutObject(objectKey, reader, meta)
}

// DeleteObject deletes data of object.
Expand Down Expand Up @@ -164,3 +167,31 @@ func (o *oss) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marke

return metadatas, nil
}

// GetSignURL returns sign url of object.
func (o *oss) GetSignURL(ctx context.Context, bucketName, objectKey string, method Method, expire time.Duration) (string, error) {
var ossHTTPMethod aliyunoss.HTTPMethod
switch method {
case MethodGet:
ossHTTPMethod = aliyunoss.HTTPGet
case MethodPut:
ossHTTPMethod = aliyunoss.HTTPPut
case MethodHead:
ossHTTPMethod = aliyunoss.HTTPHead
case MethodPost:
ossHTTPMethod = aliyunoss.HTTPPost
case MethodDelete:
ossHTTPMethod = aliyunoss.HTTPDelete
case MethodList:
ossHTTPMethod = aliyunoss.HTTPGet
default:
return "", fmt.Errorf("not support method %s", method)
}

bucket, err := o.client.Bucket(bucketName)
if err != nil {
return "", err
}

return bucket.SignURL(objectKey, ossHTTPMethod, int64(expire.Seconds()))
}
Loading