Skip to content

Commit 4245bbc

Browse files
committed
feat: get object from dragonfly
Signed-off-by: Gaius <[email protected]>
1 parent ec9e132 commit 4245bbc

File tree

2 files changed

+82
-38
lines changed

2 files changed

+82
-38
lines changed

client/daemon/daemon.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -225,18 +225,22 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
225225
return nil, err
226226
}
227227

228-
return &clientDaemon{
229-
once: &sync.Once{},
230-
done: make(chan bool),
231-
schedPeerHost: host,
232-
Option: *opt,
228+
objectStorage, err := objectstorage.New(opt, dynconfig, peerTaskManager, storageManager)
229+
if err != nil {
230+
return nil, err
231+
}
233232

233+
return &clientDaemon{
234+
once: &sync.Once{},
235+
done: make(chan bool),
236+
schedPeerHost: host,
237+
Option: *opt,
234238
RPCManager: rpcManager,
235239
PeerTaskManager: peerTaskManager,
236240
PieceManager: pieceManager,
237241
ProxyManager: proxyManager,
238242
UploadManager: uploadManager,
239-
ObjectStorage: objectstorage.New(dynconfig, peerTaskManager, storageManager),
243+
ObjectStorage: objectStorage,
240244
StorageManager: storageManager,
241245
GCManager: gc.NewManager(opt.GCInterval.Duration),
242246
dynconfig: dynconfig,

client/daemon/objectstorage/objectstorage.go

Lines changed: 72 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ package objectstorage
1818

1919
import (
2020
"context"
21+
"errors"
2122
"math"
2223
"net"
2324
"net/http"
25+
"strconv"
2426
"strings"
27+
"time"
2528

2629
"github.com/gin-gonic/gin"
2730
"github.com/go-http-utils/headers"
@@ -30,11 +33,15 @@ import (
3033
"d7y.io/dragonfly/v2/client/config"
3134
"d7y.io/dragonfly/v2/client/daemon/peer"
3235
"d7y.io/dragonfly/v2/client/daemon/storage"
33-
"d7y.io/dragonfly/v2/pkg/idgen"
3436
"d7y.io/dragonfly/v2/pkg/objectstorage"
3537
"d7y.io/dragonfly/v2/pkg/rpc/base"
3638
)
3739

40+
const (
41+
// defaultSignExpireTime is default expire of sign url.
42+
defaultSignExpireTime = 5 * time.Minute
43+
)
44+
3845
// ObjectStorage is the interface used for object storage server.
3946
type ObjectStorage interface {
4047
// Started object storage server.
@@ -47,17 +54,19 @@ type ObjectStorage interface {
4754
// objectStorage provides object storage function.
4855
type objectStorage struct {
4956
*http.Server
50-
dynconfig config.Dynconfig
51-
peeTaskManager peer.TaskManager
52-
storageManager storage.Manager
57+
dynconfig config.Dynconfig
58+
peerTaskManager peer.TaskManager
59+
storageManager storage.Manager
60+
peerIDGenerator peer.IDGenerator
5361
}
5462

5563
// New returns a new ObjectStorage instence.
56-
func New(dynconfig config.Dynconfig, peerTaskManager peer.TaskManager, storageManager storage.Manager) (ObjectStorage, error) {
64+
func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, peerTaskManager peer.TaskManager, storageManager storage.Manager) (ObjectStorage, error) {
5765
o := &objectStorage{
58-
dynconfig: dynconfig,
59-
peeTaskManager: peerTaskManager,
60-
storageManager: storageManager,
66+
dynconfig: dynconfig,
67+
peerTaskManager: peerTaskManager,
68+
storageManager: storageManager,
69+
peerIDGenerator: peer.NewPeerIDGenerator(cfg.Host.AdvertiseIP),
6170
}
6271

6372
router := o.initRouter()
@@ -106,30 +115,25 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
106115
return
107116
}
108117

109-
var urlMeta *base.UrlMeta
110-
var artifactRange *clientutil.Range
118+
var (
119+
urlMeta *base.UrlMeta
120+
artifactRange *clientutil.Range
121+
ranges []clientutil.Range
122+
err error
123+
)
111124

112-
// Set meta range's value.
113-
if rangeHeader := ctx.GetHeader(headers.Range); len(rangeHeader) > 0 {
114-
ranges, err := clientutil.ParseRange(rangeHeader, math.MaxInt)
125+
// Parse http range header.
126+
rangeHeader := ctx.GetHeader(headers.Range)
127+
if len(rangeHeader) > 0 {
128+
ranges, err = o.parseRangeHeader(rangeHeader)
115129
if err != nil {
116130
ctx.JSON(http.StatusRequestedRangeNotSatisfiable, gin.H{"errors": err.Error()})
117131
return
118132
}
119-
120-
if len(ranges) > 1 {
121-
ctx.JSON(http.StatusRequestedRangeNotSatisfiable, gin.H{"errors": "multiple range is not supported"})
122-
return
123-
}
124-
125-
if len(ranges) == 0 {
126-
ctx.JSON(http.StatusRequestedRangeNotSatisfiable, gin.H{"errors": "zero range is not supported"})
127-
return
128-
}
133+
artifactRange = &ranges[0]
129134

130135
// Range header in dragonfly is without "bytes=".
131136
urlMeta.Range = strings.TrimLeft(rangeHeader, "bytes=")
132-
artifactRange = &ranges[0]
133137
}
134138

135139
client, err := o.client()
@@ -145,14 +149,32 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
145149
}
146150
urlMeta.Digest = meta.Digest
147151

148-
taskID := idgen.TaskID(ctx.Request.URL.String(), urlMeta)
149-
o.peeTaskManager.StartStreamTask(ctx, &peer.StreamTaskRequest{
150-
URL: url,
151-
URLMeta: meta,
152-
Range: rg,
153-
PeerID: peerID,
154-
},
155-
)
152+
signURL, err := client.GetSignURL(ctx, params.ID, params.ObjectKey, objectstorage.MethodGet, defaultSignExpireTime)
153+
if err != nil {
154+
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
155+
return
156+
}
157+
158+
reader, attr, err := o.peerTaskManager.StartStreamTask(ctx, &peer.StreamTaskRequest{
159+
URL: signURL,
160+
URLMeta: urlMeta,
161+
Range: artifactRange,
162+
PeerID: o.peerIDGenerator.PeerID(),
163+
})
164+
if err != nil {
165+
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
166+
return
167+
}
168+
defer reader.Close()
169+
170+
var contentLength int64 = -1
171+
if l, ok := attr[headers.ContentLength]; ok {
172+
if i, err := strconv.ParseInt(l, 10, 64); err == nil {
173+
contentLength = i
174+
}
175+
}
176+
177+
ctx.DataFromReader(http.StatusOK, contentLength, attr[headers.ContentType], reader, nil)
156178
}
157179

158180
// createObject uses to upload object data.
@@ -184,3 +206,21 @@ func (o *objectStorage) client() (objectstorage.ObjectStorage, error) {
184206

185207
return client, nil
186208
}
209+
210+
// parseRangeHeader uses to parse range http header for dragonfly.
211+
func (o *objectStorage) parseRangeHeader(rangeHeader string) ([]clientutil.Range, error) {
212+
ranges, err := clientutil.ParseRange(rangeHeader, math.MaxInt)
213+
if err != nil {
214+
return nil, err
215+
}
216+
217+
if len(ranges) > 1 {
218+
return nil, errors.New("multiple range is not supported")
219+
}
220+
221+
if len(ranges) == 0 {
222+
return nil, errors.New("zero range is not supported")
223+
}
224+
225+
return ranges, nil
226+
}

0 commit comments

Comments
 (0)