Skip to content

Commit 3d235ba

Browse files
authored
feat: concurrent multiple pieces back source (#1426)
* feat: concurrent multiple pieces back source * chore: update http source client Signed-off-by: Jim Ma <[email protected]>
1 parent d380344 commit 3d235ba

File tree

19 files changed

+489
-51
lines changed

19 files changed

+489
-51
lines changed

client/config/peerhost.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -224,17 +224,18 @@ type HostOption struct {
224224
}
225225

226226
type DownloadOption struct {
227-
DefaultPattern string `mapstructure:"defaultPattern" yaml:"defaultPattern"`
228-
TotalRateLimit util.RateLimit `mapstructure:"totalRateLimit" yaml:"totalRateLimit"`
229-
PerPeerRateLimit util.RateLimit `mapstructure:"perPeerRateLimit" yaml:"perPeerRateLimit"`
230-
PieceDownloadTimeout time.Duration `mapstructure:"pieceDownloadTimeout" yaml:"pieceDownloadTimeout"`
231-
DownloadGRPC ListenOption `mapstructure:"downloadGRPC" yaml:"downloadGRPC"`
232-
PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"`
233-
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
234-
TransportOption *TransportOption `mapstructure:"transportOption" yaml:"transportOption"`
235-
GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"`
236-
Prefetch bool `mapstructure:"prefetch" yaml:"prefetch"`
237-
WatchdogTimeout time.Duration `mapstructure:"watchdogTimeout" yaml:"watchdogTimeout"`
227+
DefaultPattern string `mapstructure:"defaultPattern" yaml:"defaultPattern"`
228+
TotalRateLimit util.RateLimit `mapstructure:"totalRateLimit" yaml:"totalRateLimit"`
229+
PerPeerRateLimit util.RateLimit `mapstructure:"perPeerRateLimit" yaml:"perPeerRateLimit"`
230+
PieceDownloadTimeout time.Duration `mapstructure:"pieceDownloadTimeout" yaml:"pieceDownloadTimeout"`
231+
DownloadGRPC ListenOption `mapstructure:"downloadGRPC" yaml:"downloadGRPC"`
232+
PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"`
233+
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
234+
Transport *TransportOption `mapstructure:"transportOption" yaml:"transportOption"`
235+
GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"`
236+
Prefetch bool `mapstructure:"prefetch" yaml:"prefetch"`
237+
WatchdogTimeout time.Duration `mapstructure:"watchdogTimeout" yaml:"watchdogTimeout"`
238+
Concurrent *ConcurrentOption `mapstructure:"concurrent" yaml:"concurrent"`
238239
}
239240

240241
type TransportOption struct {
@@ -247,6 +248,19 @@ type TransportOption struct {
247248
ExpectContinueTimeout time.Duration `mapstructure:"expectContinueTimeout" yaml:"expectContinueTimeout"`
248249
}
249250

251+
type ConcurrentOption struct {
252+
// ThresholdSize indicates the threshold to download pieces concurrently
253+
ThresholdSize util.Size `mapstructure:"thresholdSize" yaml:"thresholdSize"`
254+
// GoroutineCount indicates the concurrent goroutine count for every task
255+
GoroutineCount int `mapstructure:"goroutineCount" yaml:"goroutineCount"`
256+
// InitBackoff second for every piece failed, default: 0.5
257+
InitBackoff float64 `mapstructure:"initBackoff" yaml:"initBackoff"`
258+
// MaxBackoff second for every piece failed, default: 3
259+
MaxBackoff float64 `mapstructure:"maxBackoff" yaml:"maxBackoff"`
260+
// MaxAttempts for every piece failed,default: 3
261+
MaxAttempts int `mapstructure:"maxAttempts" yaml:"MaxAttempts"`
262+
}
263+
250264
type ProxyOption struct {
251265
// WARNING: when add more option, please update ProxyOption.unmarshal function
252266
ListenOption `mapstructure:",squash" yaml:",inline"`

client/daemon/daemon.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
180180
pieceManager, err := peer.NewPieceManager(
181181
opt.Download.PieceDownloadTimeout,
182182
peer.WithLimiter(rate.NewLimiter(opt.Download.TotalRateLimit.Limit, int(opt.Download.TotalRateLimit.Limit))),
183-
peer.WithCalculateDigest(opt.Download.CalculateDigest), peer.WithTransportOption(opt.Download.TransportOption),
183+
peer.WithCalculateDigest(opt.Download.CalculateDigest), peer.WithTransportOption(opt.Download.Transport),
184+
peer.WithConcurrentOption(opt.Download.Concurrent),
184185
)
185186
if err != nil {
186187
return nil, err

client/daemon/peer/peertask_conductor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ func (pt *peerTaskConductor) backSource() {
486486

487487
ctx, span := tracer.Start(pt.ctx, config.SpanBackSource)
488488
pt.SetContentLength(-1)
489-
err := pt.pieceManager.DownloadSource(ctx, pt, pt.request)
489+
err := pt.pieceManager.DownloadSource(ctx, pt, pt.request, pt.rg)
490490
if err != nil {
491491
pt.Errorf("download from source error: %s", err)
492492
span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))

client/daemon/peer/peertask_piecetask_poller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ func (poller *pieceTaskPoller) getPieceTasksByPeer(
160160
count int
161161
ptc = poller.peerTaskConductor
162162
)
163-
p, _, err := retry.Run(ptc.ctx, func() (any, bool, error) {
163+
p, _, err := retry.Run(ptc.ctx, 0.05, 0.2, 40, func() (any, bool, error) {
164164
// GetPieceTasks must be fast, so short time out is okay
165165
ctx, cancel := context.WithTimeout(ptc.ctx, 4*time.Second)
166166
defer cancel()
@@ -235,7 +235,7 @@ func (poller *pieceTaskPoller) getPieceTasksByPeer(
235235
trace.WithAttributes(config.AttributeGetPieceRetry.Int(count)))
236236
ptc.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId)
237237
return nil, false, dferrors.ErrEmptyValue
238-
}, 0.05, 0.2, 40, nil)
238+
})
239239
if peerPacketChanged {
240240
return nil, errPeerPacketChanged
241241
}

0 commit comments

Comments
 (0)