Skip to content
This repository was archived by the owner on Dec 20, 2024. It is now read-only.

Commit 1da7fba

Browse files
add seed downloader
Signed-off-by: allen.wq <[email protected]>
1 parent 06d2368 commit 1da7fba

File tree

5 files changed

+507
-0
lines changed

5 files changed

+507
-0
lines changed

dfdaemon/seed/downloader.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright The Dragonfly Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package seed
18+
19+
import (
20+
"bytes"
21+
"context"
22+
"fmt"
23+
"io"
24+
"net/http"
25+
"time"
26+
27+
"github.com/dragonflyoss/Dragonfly/dfget/config"
28+
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
29+
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
30+
"github.com/dragonflyoss/Dragonfly/pkg/limitreader"
31+
"github.com/dragonflyoss/Dragonfly/pkg/ratelimiter"
32+
33+
"github.com/pkg/errors"
34+
"github.com/sirupsen/logrus"
35+
)
36+
37+
// downloader manage the downloading of seed file.
38+
type downloader interface {
39+
DownloadToWriterAt(ctx context.Context, rangeStruct httputils.RangeStruct, timeout time.Duration, writeOff int64, writerAt io.WriterAt, rateLimit bool) (length int64, err error)
40+
}
41+
42+
func newLocalDownloader(url string, header map[string][]string, rate *ratelimiter.RateLimiter, copyCache bool) downloader {
43+
return &localDownloader{
44+
url: url,
45+
header: header,
46+
rate: rate,
47+
copyCache: copyCache,
48+
}
49+
}
50+
51+
type localDownloader struct {
52+
url string
53+
header map[string][]string
54+
55+
// downloader will limit the rate.
56+
rate *ratelimiter.RateLimiter
57+
58+
// if copyCache sets, the response body will store to memory cache and transfer to writer
59+
copyCache bool
60+
}
61+
62+
func (ld *localDownloader) DownloadToWriterAt(ctx context.Context, rangeStruct httputils.RangeStruct, timeout time.Duration,
63+
writeOff int64, writerAt io.WriterAt, rateLimit bool) (length int64, err error) {
64+
return ld.download(ctx, rangeStruct, timeout, writeOff, writerAt, rateLimit)
65+
}
66+
67+
func (ld *localDownloader) download(ctx context.Context, rangeStruct httputils.RangeStruct, timeout time.Duration,
68+
writeOff int64, writerAt io.WriterAt, rateLimit bool) (length int64, err error) {
69+
var (
70+
written int64
71+
n int
72+
rd io.Reader
73+
)
74+
75+
header := map[string]string{}
76+
for k, v := range ld.header {
77+
header[k] = v[0]
78+
}
79+
80+
header[config.StrRange] = fmt.Sprintf("bytes=%d-%d", rangeStruct.StartIndex, rangeStruct.EndIndex)
81+
resp, err := httputils.HTTPWithHeaders("GET", ld.url, header, timeout, nil)
82+
if err != nil {
83+
return 0, err
84+
}
85+
86+
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
87+
return 0, errortypes.NewHTTPError(resp.StatusCode, "resp code is not 200 or 206")
88+
}
89+
90+
expectedLen := rangeStruct.EndIndex - rangeStruct.StartIndex + 1
91+
92+
defer resp.Body.Close()
93+
rd = resp.Body
94+
if rateLimit {
95+
rd = limitreader.NewLimitReaderWithLimiter(ld.rate, resp.Body, false)
96+
}
97+
98+
// in copyCache pattern, the bytes buffer will be transferred to io.WriterAt, and will be held by io.WriterAt.
99+
if ld.copyCache {
100+
buf := bytes.NewBuffer(nil)
101+
buf.Grow(int(expectedLen))
102+
written, err = io.CopyN(buf, rd, expectedLen)
103+
if err != nil && err != io.EOF {
104+
logrus.Errorf("failed to read data [%d, %d] from resp.body: %v", rangeStruct.StartIndex, rangeStruct.EndIndex, err)
105+
}
106+
107+
if written < expectedLen {
108+
return 0, errors.Wrap(io.ErrShortWrite, fmt.Sprintf("download from [%d,%d], expecte read %d, but got %d", rangeStruct.StartIndex, rangeStruct.EndIndex, expectedLen, written))
109+
}
110+
111+
n, err = writerAt.WriteAt(buf.Bytes(), writeOff)
112+
written = int64(n)
113+
} else {
114+
written, err = CopyBufferToWriterAt(writeOff, writerAt, rd)
115+
}
116+
117+
if err == io.EOF {
118+
err = nil
119+
}
120+
121+
if err != nil {
122+
return 0, errors.Wrap(err, fmt.Sprintf("failed to download from [%d,%d]", rangeStruct.StartIndex, rangeStruct.EndIndex))
123+
}
124+
125+
if written < expectedLen {
126+
return 0, errors.Wrap(io.ErrShortWrite, fmt.Sprintf("download from [%d,%d], expecte read %d, but got %d", rangeStruct.StartIndex, rangeStruct.EndIndex, expectedLen, written))
127+
}
128+
129+
return written, err
130+
}

dfdaemon/seed/downloader_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright The Dragonfly Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package seed
18+
19+
import (
20+
"bytes"
21+
"context"
22+
"fmt"
23+
24+
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
25+
"github.com/dragonflyoss/Dragonfly/pkg/ratelimiter"
26+
27+
"github.com/go-check/check"
28+
)
29+
30+
type mockBufferWriterAt struct {
31+
buf *bytes.Buffer
32+
}
33+
34+
func newMockBufferWriterAt() *mockBufferWriterAt {
35+
return &mockBufferWriterAt{
36+
buf: bytes.NewBuffer(nil),
37+
}
38+
}
39+
40+
func (mb *mockBufferWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
41+
if off != int64(mb.buf.Len()) {
42+
return 0, fmt.Errorf("failed to seek to %d", off)
43+
}
44+
45+
return mb.buf.Write(p)
46+
}
47+
48+
func (mb *mockBufferWriterAt) Bytes() []byte {
49+
return mb.buf.Bytes()
50+
}
51+
52+
func (suite *SeedTestSuite) checkLocalDownloadDataFromFileServer(c *check.C, path string, off int64, size int64) {
53+
buf := newMockBufferWriterAt()
54+
55+
ld := newLocalDownloader(fmt.Sprintf("http://%s/%s", suite.host, path), nil, ratelimiter.NewRateLimiter(0, 0), false)
56+
57+
length, err := ld.DownloadToWriterAt(context.Background(), httputils.RangeStruct{StartIndex: off, EndIndex: off + size - 1}, 0, 0, buf, true)
58+
c.Check(err, check.IsNil)
59+
c.Check(size, check.Equals, length)
60+
61+
expectData, err := suite.readFromFileServer(path, off, size)
62+
c.Check(err, check.IsNil)
63+
c.Check(string(buf.Bytes()), check.Equals, string(expectData))
64+
65+
buf2 := newMockBufferWriterAt()
66+
ld2 := newLocalDownloader(fmt.Sprintf("http://%s/%s", suite.host, path), nil, ratelimiter.NewRateLimiter(0, 0), true)
67+
68+
length2, err := ld2.DownloadToWriterAt(context.Background(), httputils.RangeStruct{StartIndex: off, EndIndex: off + size - 1}, 0, 0, buf2, false)
69+
c.Check(err, check.IsNil)
70+
c.Check(size, check.Equals, length2)
71+
c.Check(string(buf2.Bytes()), check.Equals, string(expectData))
72+
}
73+
74+
func (suite *SeedTestSuite) TestLocalDownload(c *check.C) {
75+
// test read fileA
76+
suite.checkLocalDownloadDataFromFileServer(c, "fileA", 0, 500*1024)
77+
suite.checkLocalDownloadDataFromFileServer(c, "fileA", 0, 100*1024)
78+
for i := 0; i < 5; i++ {
79+
suite.checkLocalDownloadDataFromFileServer(c, "fileA", int64(i*100*1024), 100*1024)
80+
}
81+
82+
// test read fileB
83+
suite.checkLocalDownloadDataFromFileServer(c, "fileB", 0, 1024*1024)
84+
suite.checkLocalDownloadDataFromFileServer(c, "fileB", 0, 100*1024)
85+
for i := 0; i < 20; i++ {
86+
suite.checkLocalDownloadDataFromFileServer(c, "fileB", int64(i*50*1024), 50*1024)
87+
}
88+
suite.checkLocalDownloadDataFromFileServer(c, "fileB", 1000*1024, 24*1024)
89+
90+
// test read fileC
91+
suite.checkLocalDownloadDataFromFileServer(c, "fileC", 0, 1500*1024)
92+
suite.checkLocalDownloadDataFromFileServer(c, "fileC", 0, 100*1024)
93+
suite.checkLocalDownloadDataFromFileServer(c, "fileC", 1400*1024, 100*1024)
94+
for i := 0; i < 75; i++ {
95+
suite.checkLocalDownloadDataFromFileServer(c, "fileC", int64(i*20*1024), 20*1024)
96+
}
97+
98+
//test read fileF
99+
for i := 0; i < 50; i++ {
100+
suite.checkLocalDownloadDataFromFileServer(c, "fileF", int64(i*20000), 20000)
101+
}
102+
}

dfdaemon/seed/utils.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright The Dragonfly Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package seed
18+
19+
import (
20+
"bytes"
21+
"io"
22+
)
23+
24+
// CopyBufferToWriterAt copy data from rd and write to io.WriterAt.
25+
func CopyBufferToWriterAt(off int64, writerAt io.WriterAt, rd io.Reader) (n int64, err error) {
26+
buffer := bytes.NewBuffer(nil)
27+
bufSize := int64(256 * 1024)
28+
29+
for {
30+
_, err := io.CopyN(buffer, rd, bufSize)
31+
if err != nil && err != io.EOF {
32+
return 0, err
33+
}
34+
35+
wcount, werr := writerAt.WriteAt(buffer.Bytes(), off)
36+
n += int64(wcount)
37+
if werr != nil {
38+
return n, err
39+
}
40+
41+
if err == io.EOF {
42+
return n, io.EOF
43+
}
44+
45+
buffer.Reset()
46+
off += int64(wcount)
47+
}
48+
}

0 commit comments

Comments
 (0)