Skip to content
This repository was archived by the owner on Dec 20, 2024. It is now read-only.

Commit ef57720

Browse files
committed
fix: cdn-source pattern supports range task
Signed-off-by: lowzj <[email protected]>
1 parent 9a4e7b0 commit ef57720

File tree

10 files changed

+130
-31
lines changed

10 files changed

+130
-31
lines changed

dfget/core/api/download_api.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/dragonflyoss/Dragonfly/dfget/config"
2727
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
28+
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
2829
"github.com/dragonflyoss/Dragonfly/version"
2930
)
3031

@@ -56,8 +57,10 @@ func NewDownloadAPI() DownloadAPI {
5657
}
5758

5859
func (d *downloadAPI) Download(ip string, port int, req *DownloadRequest, timeout time.Duration) (*http.Response, error) {
60+
if req == nil {
61+
return nil, fmt.Errorf("nil dwonload request")
62+
}
5963
headers := make(map[string]string)
60-
headers[config.StrRange] = config.StrBytes + "=" + req.PieceRange
6164
headers[config.StrPieceNum] = strconv.Itoa(req.PieceNum)
6265
headers[config.StrPieceSize] = fmt.Sprint(req.PieceSize)
6366
headers[config.StrUserAgent] = "dfget/" + version.DFGetVersion
@@ -67,12 +70,52 @@ func (d *downloadAPI) Download(ip string, port int, req *DownloadRequest, timeou
6770
}
6871
}
6972

70-
var url string
71-
if strings.Contains(req.Path, "://") {
73+
var (
74+
url string
75+
rangeStr string
76+
)
77+
if isFromSource(req) {
78+
rangeStr = getRealRange(req.PieceRange, headers[config.StrRange])
7279
url = req.Path
7380
} else {
81+
rangeStr = req.PieceRange
7482
url = fmt.Sprintf("http://%s:%d%s", ip, port, req.Path)
7583
}
84+
headers[config.StrRange] = httputils.ConstructRangeStr(rangeStr)
7685

7786
return httputils.HTTPGetTimeout(url, headers, timeout)
7887
}
88+
89+
func isFromSource(req *DownloadRequest) bool {
90+
return strings.Contains(req.Path, "://")
91+
}
92+
93+
// getRealRange
94+
// pieceRange: "start-end"
95+
// rangeHeaderValue: "bytes=sourceStart-sourceEnd"
96+
// return: "realStart-realEnd"
97+
func getRealRange(pieceRange string, rangeHeaderValue string) string {
98+
if rangeHeaderValue == "" {
99+
return pieceRange
100+
}
101+
rangeEle := strings.Split(rangeHeaderValue, "=")
102+
if len(rangeEle) != 2 {
103+
return pieceRange
104+
}
105+
106+
lower, upper, err := rangeutils.ParsePieceIndex(rangeEle[1])
107+
if err != nil {
108+
return pieceRange
109+
}
110+
start, end, err := rangeutils.ParsePieceIndex(pieceRange)
111+
if err != nil {
112+
return pieceRange
113+
}
114+
115+
realStart := start + lower
116+
realEnd := end + lower
117+
if realEnd > upper {
118+
realEnd = upper
119+
}
120+
return fmt.Sprintf("%d-%d", realStart, realEnd)
121+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright The Dragonfly Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"github.com/go-check/check"
21+
)
22+
23+
type DownloadAPITestSuite struct {
24+
}
25+
26+
func init() {
27+
check.Suite(&DownloadAPITestSuite{})
28+
}
29+
30+
// ----------------------------------------------------------------------------
31+
// unit tests for DownloadAPI
32+
33+
func (s *DownloadAPITestSuite) TestGetRealRange(c *check.C) {
34+
cases := []struct {
35+
pieceRange string
36+
rangeValue string
37+
expected string
38+
}{
39+
{"0-1", "", "0-1"},
40+
{"0-1", "1-100", "1-2"},
41+
{"0-100", "1-100", "1-100"},
42+
{"100-100", "1-100", "101-100"},
43+
{"100-200", "1-100", "101-100"},
44+
}
45+
46+
for _, v := range cases {
47+
res := getRealRange(v.pieceRange, "bytes="+v.rangeValue)
48+
c.Assert(res, check.Equals, v.expected,
49+
check.Commentf("%v", v))
50+
}
51+
}

supernode/util/range_util.go renamed to pkg/rangeutils/range_util.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package util
17+
package rangeutils
1818

1919
import (
2020
"fmt"
@@ -23,7 +23,8 @@ import (
2323
)
2424

2525
const (
26-
separator = "-"
26+
separator = "-"
27+
invalidPieceIndex = -1
2728
)
2829

2930
// CalculatePieceSize calculates the size of piece
@@ -51,23 +52,24 @@ func CalculatePieceNum(rangeStr string) int {
5152
}
5253

5354
// ParsePieceIndex parses the start and end index ​​according to range string.
55+
// rangeStr: "start-end"
5456
func ParsePieceIndex(rangeStr string) (start, end int64, err error) {
5557
ranges := strings.Split(rangeStr, separator)
5658
if len(ranges) != 2 {
57-
return -1, -1, fmt.Errorf("range value(%s) is illegal which should be like 0-45535", rangeStr)
59+
return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range value(%s) is illegal which should be like 0-45535", rangeStr)
5860
}
5961

6062
startIndex, err := strconv.ParseInt(ranges[0], 10, 64)
6163
if err != nil {
62-
return -1, -1, fmt.Errorf("range(%s) start is not a number", rangeStr)
64+
return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range(%s) start is not a number", rangeStr)
6365
}
6466
endIndex, err := strconv.ParseInt(ranges[1], 10, 64)
6567
if err != nil {
66-
return -1, -1, fmt.Errorf("range(%s) end is not a number", rangeStr)
68+
return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range(%s) end is not a number", rangeStr)
6769
}
6870

6971
if endIndex < startIndex {
70-
return -1, -1, fmt.Errorf("range(%s) start is larger than end", rangeStr)
72+
return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range(%s) start is larger than end", rangeStr)
7173
}
7274

7375
return startIndex, endIndex, nil

supernode/util/range_util_test.go renamed to pkg/rangeutils/range_util_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package util
17+
package rangeutils
1818

1919
import (
2020
"testing"

supernode/daemon/mgr/cdn/downloader.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ import (
2020
"context"
2121
"net/http"
2222

23-
errorType "github.com/dragonflyoss/Dragonfly/pkg/errortypes"
24-
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
25-
"github.com/dragonflyoss/Dragonfly/supernode/util"
26-
2723
"github.com/pkg/errors"
2824
"github.com/sirupsen/logrus"
25+
26+
errorType "github.com/dragonflyoss/Dragonfly/pkg/errortypes"
27+
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
28+
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
2929
)
3030

3131
// download downloads the file from the original address and
@@ -38,7 +38,7 @@ func (cm *Manager) download(ctx context.Context, taskID, url string, headers map
3838
checkCode := []int{http.StatusOK, http.StatusPartialContent}
3939

4040
if startPieceNum > 0 {
41-
breakRange, err := util.CalculateBreakRange(startPieceNum, int(pieceContSize), httpFileLength)
41+
breakRange, err := rangeutils.CalculateBreakRange(startPieceNum, int(pieceContSize), httpFileLength)
4242
if err != nil {
4343
return nil, errors.Wrapf(errorType.ErrInvalidValue, "failed to calculate the breakRange: %v", err)
4444
}

supernode/daemon/mgr/cdn/manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/dragonflyoss/Dragonfly/pkg/limitreader"
2727
"github.com/dragonflyoss/Dragonfly/pkg/metricsutils"
2828
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
29+
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
2930
"github.com/dragonflyoss/Dragonfly/pkg/ratelimiter"
3031
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
3132
"github.com/dragonflyoss/Dragonfly/supernode/config"
@@ -216,7 +217,7 @@ func (cm *Manager) GetPieceMD5(ctx context.Context, taskID string, pieceNum int,
216217

217218
if source == PieceMd5SourceFile {
218219
// get piece length
219-
start, end, err := util.ParsePieceIndex(pieceRange)
220+
start, end, err := rangeutils.ParsePieceIndex(pieceRange)
220221
if err != nil {
221222
return "", errors.Wrapf(err, "failed to parse piece range(%s)", pieceRange)
222223
}

supernode/daemon/mgr/pieceerror/md5_not_match.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ package pieceerror
1919
import (
2020
"context"
2121

22+
"github.com/sirupsen/logrus"
23+
2224
"github.com/dragonflyoss/Dragonfly/apis/types"
25+
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
2326
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
24-
"github.com/dragonflyoss/Dragonfly/supernode/util"
25-
26-
"github.com/sirupsen/logrus"
2727
)
2828

2929
var _ Handler = &FileMd5NotMatchHandler{}
@@ -45,7 +45,7 @@ func NewFileMd5NotMatchHandler(gcManager mgr.GCMgr, cdnManager mgr.CDNMgr) (Hand
4545
}
4646

4747
func (fnmh *FileMd5NotMatchHandler) Handle(ctx context.Context, pieceErrorRequest *types.PieceErrorRequest) error {
48-
pieceNum := util.CalculatePieceNum(pieceErrorRequest.Range)
48+
pieceNum := rangeutils.CalculatePieceNum(pieceErrorRequest.Range)
4949

5050
// get piece MD5 by reading the meta file
5151
metaPieceMD5, err := fnmh.cdnManager.GetPieceMD5(ctx, pieceErrorRequest.TaskID, pieceNum, pieceErrorRequest.Range, "meta")

supernode/daemon/mgr/task/manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/dragonflyoss/Dragonfly/apis/types"
2424
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
2525
"github.com/dragonflyoss/Dragonfly/pkg/metricsutils"
26+
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
2627
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
2728
"github.com/dragonflyoss/Dragonfly/pkg/syncmap"
2829
"github.com/dragonflyoss/Dragonfly/supernode/config"
@@ -272,7 +273,7 @@ func (tm *Manager) UpdatePieceStatus(ctx context.Context, taskID, pieceRange str
272273
logrus.Debugf("get update piece status request: %+v with taskID(%s) pieceRange(%s)", pieceUpdateRequest, taskID, pieceRange)
273274

274275
// calculate the pieceNum according to the pieceRange
275-
pieceNum := util.CalculatePieceNum(pieceRange)
276+
pieceNum := rangeutils.CalculatePieceNum(pieceRange)
276277
if pieceNum == -1 {
277278
return errors.Wrapf(errortypes.ErrInvalidValue,
278279
"failed to parse pieceRange: %s to pieceNum for taskID: %s, clientID: %s",

supernode/daemon/mgr/task/manager_util.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/dragonflyoss/Dragonfly/pkg/digest"
2727
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
2828
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
29+
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
2930
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
3031
"github.com/dragonflyoss/Dragonfly/pkg/timeutils"
3132
"github.com/dragonflyoss/Dragonfly/supernode/config"
@@ -290,7 +291,7 @@ func (tm *Manager) processTaskStart(ctx context.Context, srcCID string, task *ty
290291
// req.DstPID, req.PieceRange, req.PieceResult, req.DfgetTaskStatus
291292
func (tm *Manager) processTaskRunning(ctx context.Context, srcCID, srcPID string, task *types.TaskInfo, req *types.PiecePullRequest,
292293
dfgetTask *types.DfGetTask) (bool, interface{}, error) {
293-
pieceNum := util.CalculatePieceNum(req.PieceRange)
294+
pieceNum := rangeutils.CalculatePieceNum(req.PieceRange)
294295
if pieceNum == -1 {
295296
return false, nil, errors.Wrapf(errortypes.ErrInvalidValue, "pieceRange: %s", req.PieceRange)
296297
}
@@ -411,7 +412,7 @@ func (tm *Manager) pieceResultToPieceInfo(ctx context.Context, pr *mgr.PieceResu
411412
PeerIP: peer.IP.String(),
412413
PeerPort: peer.Port,
413414
PieceMD5: pieceMD5,
414-
PieceRange: util.CalculatePieceRange(pr.PieceNum, pieceSize),
415+
PieceRange: rangeutils.CalculatePieceRange(pr.PieceNum, pieceSize),
415416
PieceSize: pieceSize,
416417
}, nil
417418
}

supernode/server/0.3_bridge.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,17 @@ import (
2121
"encoding/json"
2222
"net/http"
2323

24+
"github.com/go-openapi/strfmt"
25+
"github.com/gorilla/schema"
26+
"github.com/pkg/errors"
27+
"github.com/sirupsen/logrus"
28+
2429
"github.com/dragonflyoss/Dragonfly/apis/types"
2530
"github.com/dragonflyoss/Dragonfly/pkg/constants"
2631
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
2732
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
33+
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
2834
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
29-
sutil "github.com/dragonflyoss/Dragonfly/supernode/util"
30-
31-
"github.com/go-openapi/strfmt"
32-
"github.com/gorilla/schema"
33-
"github.com/pkg/errors"
34-
"github.com/sirupsen/logrus"
3535
)
3636

3737
// RegisterResponseData is the data when registering supernode successfully.
@@ -200,7 +200,7 @@ func (s *Server) pullPieceTask(ctx context.Context, rw http.ResponseWriter, req
200200
}
201201
datas = append(datas, &PullPieceTaskResponseContinueData{
202202
Range: v.PieceRange,
203-
PieceNum: sutil.CalculatePieceNum(v.PieceRange),
203+
PieceNum: rangeutils.CalculatePieceNum(v.PieceRange),
204204
PieceSize: v.PieceSize,
205205
PieceMd5: v.PieceMD5,
206206
Cid: cid,
@@ -229,7 +229,7 @@ func (s *Server) reportPiece(ctx context.Context, rw http.ResponseWriter, req *h
229229

230230
// If piece is downloaded from supernode, add metrics.
231231
if s.Config.IsSuperCID(dstCID) {
232-
m.pieceDownloadedBytes.WithLabelValues().Add(float64(sutil.CalculatePieceSize(pieceRange)))
232+
m.pieceDownloadedBytes.WithLabelValues().Add(float64(rangeutils.CalculatePieceSize(pieceRange)))
233233
}
234234

235235
request := &types.PieceUpdateRequest{

0 commit comments

Comments
 (0)