Skip to content

Commit f46f370

Browse files
committed
feat: add piece download timeout (#621)
Signed-off-by: Gaius <[email protected]>
1 parent 9de5efb commit f46f370

File tree

11 files changed

+29
-18
lines changed

11 files changed

+29
-18
lines changed

client/config/peerhost.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,12 @@ type HostOption struct {
142142
}
143143

144144
type DownloadOption struct {
145-
TotalRateLimit clientutil.RateLimit `mapstructure:"totalRateLimit" yaml:"totalRateLimit"`
146-
PerPeerRateLimit clientutil.RateLimit `mapstructure:"perPeerRateLimit" yaml:"perPeerRateLimit"`
147-
DownloadGRPC ListenOption `mapstructure:"downloadGRPC" yaml:"downloadGRPC"`
148-
PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"`
149-
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
145+
TotalRateLimit clientutil.RateLimit `mapstructure:"totalRateLimit" yaml:"totalRateLimit"`
146+
PerPeerRateLimit clientutil.RateLimit `mapstructure:"perPeerRateLimit" yaml:"perPeerRateLimit"`
147+
PieceDownloadTimeout time.Duration `mapstructure:"pieceDownloadTimeout" yaml:"pieceDownloadTimeout"`
148+
DownloadGRPC ListenOption `mapstructure:"downloadGRPC" yaml:"downloadGRPC"`
149+
PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"`
150+
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
150151
}
151152

152153
type ProxyOption struct {

client/config/peerhost_darwin.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package config
2020

2121
import (
2222
"net"
23+
"time"
2324

2425
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
2526
"golang.org/x/time/rate"
@@ -60,6 +61,7 @@ var peerHostConfig = DaemonOption{
6061
NetTopology: "",
6162
},
6263
Download: DownloadOption{
64+
PieceDownloadTimeout: 30 * time.Second,
6365
TotalRateLimit: clientutil.RateLimit{
6466
Limit: rate.Limit(DefaultTotalDownloadLimit),
6567
},

client/config/peerhost_linux.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package config
2020

2121
import (
2222
"net"
23+
"time"
2324

2425
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
2526
"golang.org/x/time/rate"
@@ -60,6 +61,7 @@ var peerHostConfig = DaemonOption{
6061
NetTopology: "",
6162
},
6263
Download: DownloadOption{
64+
PieceDownloadTimeout: 30 * time.Second,
6365
TotalRateLimit: clientutil.RateLimit{
6466
Limit: rate.Limit(DefaultTotalDownloadLimit),
6567
},

client/config/peerhost_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ func TestPeerHostOption_Load(t *testing.T) {
248248
AdvertiseIP: "0.0.0.0",
249249
},
250250
Download: DownloadOption{
251+
PieceDownloadTimeout: 30 * time.Second,
251252
TotalRateLimit: clientutil.RateLimit{
252253
Limit: 209715200,
253254
},

client/config/testdata/config/daemon.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ host:
1818
netTopology: d7y
1919

2020
download:
21+
pieceDownloadTimeout: 30s
2122
totalRateLimit: 200Mi
2223
perPeerRateLimit: 20Mi
2324
downloadGRPC:

client/daemon/daemon.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ func New(opt *config.DaemonOption) (Daemon, error) {
126126
}
127127

128128
pieceManager, err := peer.NewPieceManager(storageManager,
129+
opt.Download.PieceDownloadTimeout,
129130
peer.WithLimiter(rate.NewLimiter(opt.Download.TotalRateLimit.Limit, int(opt.Download.TotalRateLimit.Limit))),
130-
peer.WithCalculateDigest(opt.Download.CalculateDigest))
131+
peer.WithCalculateDigest(opt.Download.CalculateDigest),
132+
)
131133
if err != nil {
132134
return nil, err
133135
}

client/daemon/peer/piece_downloader.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,22 @@ var defaultTransport http.RoundTripper = &http.Transport{
6565
ExpectContinueTimeout: 2 * time.Second,
6666
}
6767

68-
func NewPieceDownloader(opts ...func(*pieceDownloader) error) (PieceDownloader, error) {
68+
func NewPieceDownloader(timeout time.Duration, opts ...func(*pieceDownloader) error) (PieceDownloader, error) {
6969
pd := &pieceDownloader{}
70+
7071
for _, opt := range opts {
7172
if err := opt(pd); err != nil {
7273
return nil, err
7374
}
7475
}
76+
7577
if pd.transport == nil {
7678
pd.transport = defaultTransport
7779
}
80+
7881
pd.httpClient = &http.Client{
7982
Transport: pd.transport,
80-
Timeout: 30 * time.Second,
83+
Timeout: timeout,
8184
}
8285
return pd, nil
8386
}

client/daemon/peer/piece_downloader_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"net/http/httptest"
2828
"net/url"
2929
"testing"
30+
"time"
3031

3132
"github.com/go-http-utils/headers"
3233
testifyassert "github.com/stretchr/testify/assert"
@@ -42,6 +43,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) {
4243
assert := testifyassert.New(t)
4344
testData, err := ioutil.ReadFile(test.File)
4445
assert.Nil(err, "load test file")
46+
pieceDownloadTimeout := 30 * time.Second
4547

4648
tests := []struct {
4749
handleFunc func(w http.ResponseWriter, r *http.Request)
@@ -110,7 +112,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) {
110112
addr, _ := url.Parse(server.URL)
111113
factories := []func() (PieceDownloader, error){
112114
func() (PieceDownloader, error) {
113-
return NewPieceDownloader()
115+
return NewPieceDownloader(pieceDownloadTimeout)
114116
}, func() (PieceDownloader, error) {
115117
return NewOptimizedPieceDownloader()
116118
}}

client/daemon/peer/piece_manager.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type pieceManager struct {
5353

5454
var _ PieceManager = (*pieceManager)(nil)
5555

56-
func NewPieceManager(s storage.TaskStorageDriver, opts ...func(*pieceManager)) (PieceManager, error) {
56+
func NewPieceManager(s storage.TaskStorageDriver, pieceDownloadTimeout time.Duration, opts ...func(*pieceManager)) (PieceManager, error) {
5757
pm := &pieceManager{
5858
storageManager: s,
5959
computePieceSize: cdnutil.ComputePieceSize,
@@ -65,17 +65,11 @@ func NewPieceManager(s storage.TaskStorageDriver, opts ...func(*pieceManager)) (
6565

6666
// set default value
6767
if pm.pieceDownloader == nil {
68-
pm.pieceDownloader, _ = NewPieceDownloader()
68+
pm.pieceDownloader, _ = NewPieceDownloader(pieceDownloadTimeout)
6969
}
7070
return pm, nil
7171
}
7272

73-
func WithPieceDownloader(d PieceDownloader) func(*pieceManager) {
74-
return func(pm *pieceManager) {
75-
pm.pieceDownloader = d
76-
}
77-
}
78-
7973
func WithCalculateDigest(enable bool) func(*pieceManager) {
8074
return func(pm *pieceManager) {
8175
logger.Infof("set calculateDigest to %t for piece manager", enable)

client/daemon/peer/piece_manager_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func TestPieceManager_DownloadSource(t *testing.T) {
5858
output = "../test/testdata/test.output"
5959
)
6060

61+
pieceDownloadTimeout := 30 * time.Second
6162
storageManager, _ := storage.NewStorageManager(
6263
config.SimpleLocalTaskStoreStrategy,
6364
&config.StorageOption{
@@ -176,7 +177,7 @@ func TestPieceManager_DownloadSource(t *testing.T) {
176177
}))
177178
defer ts.Close()
178179

179-
pm, err := NewPieceManager(storageManager)
180+
pm, err := NewPieceManager(storageManager, pieceDownloadTimeout)
180181
assert.Nil(err)
181182
pm.(*pieceManager).computePieceSize = func(length int64) int32 {
182183
return tc.pieceSize

0 commit comments

Comments
 (0)