Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,18 @@ type HostOption struct {
}

type DownloadOption struct {
DefaultPattern string `mapstructure:"defaultPattern" yaml:"defaultPattern"`
TotalRateLimit util.RateLimit `mapstructure:"totalRateLimit" yaml:"totalRateLimit"`
PerPeerRateLimit util.RateLimit `mapstructure:"perPeerRateLimit" yaml:"perPeerRateLimit"`
PieceDownloadTimeout time.Duration `mapstructure:"pieceDownloadTimeout" yaml:"pieceDownloadTimeout"`
DownloadGRPC ListenOption `mapstructure:"downloadGRPC" yaml:"downloadGRPC"`
PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"`
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
TransportOption *TransportOption `mapstructure:"transportOption" yaml:"transportOption"`
GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"`
Prefetch bool `mapstructure:"prefetch" yaml:"prefetch"`
WatchdogTimeout time.Duration `mapstructure:"watchdogTimeout" yaml:"watchdogTimeout"`
DefaultPattern string `mapstructure:"defaultPattern" yaml:"defaultPattern"`
TotalRateLimit util.RateLimit `mapstructure:"totalRateLimit" yaml:"totalRateLimit"`
PerPeerRateLimit util.RateLimit `mapstructure:"perPeerRateLimit" yaml:"perPeerRateLimit"`
PieceDownloadTimeout time.Duration `mapstructure:"pieceDownloadTimeout" yaml:"pieceDownloadTimeout"`
DownloadGRPC ListenOption `mapstructure:"downloadGRPC" yaml:"downloadGRPC"`
PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"`
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
Transport *TransportOption `mapstructure:"transportOption" yaml:"transportOption"`
GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"`
Prefetch bool `mapstructure:"prefetch" yaml:"prefetch"`
WatchdogTimeout time.Duration `mapstructure:"watchdogTimeout" yaml:"watchdogTimeout"`
Concurrent *ConcurrentOption `mapstructure:"concurrent" yaml:"concurrent"`
}

type TransportOption struct {
Expand All @@ -247,6 +248,19 @@ type TransportOption struct {
ExpectContinueTimeout time.Duration `mapstructure:"expectContinueTimeout" yaml:"expectContinueTimeout"`
}

type ConcurrentOption struct {
// ThresholdSize indicates the threshold to download pieces concurrently
ThresholdSize util.Size `mapstructure:"thresholdSize" yaml:"thresholdSize"`
// GoroutineCount indicates the concurrent goroutine count for every task
GoroutineCount int `mapstructure:"goroutineCount" yaml:"goroutineCount"`
// InitBackoff second for every piece failed, default: 0.5
InitBackoff float64 `mapstructure:"initBackoff" yaml:"initBackoff"`
// MaxBackoff second for every piece failed, default: 3
MaxBackoff float64 `mapstructure:"maxBackoff" yaml:"maxBackoff"`
// MaxAttempts for every piece failed,default: 3
MaxAttempts int `mapstructure:"maxAttempts" yaml:"MaxAttempts"`
}

type ProxyOption struct {
// WARNING: when add more option, please update ProxyOption.unmarshal function
ListenOption `mapstructure:",squash" yaml:",inline"`
Expand Down
3 changes: 2 additions & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
pieceManager, err := peer.NewPieceManager(
opt.Download.PieceDownloadTimeout,
peer.WithLimiter(rate.NewLimiter(opt.Download.TotalRateLimit.Limit, int(opt.Download.TotalRateLimit.Limit))),
peer.WithCalculateDigest(opt.Download.CalculateDigest), peer.WithTransportOption(opt.Download.TransportOption),
peer.WithCalculateDigest(opt.Download.CalculateDigest), peer.WithTransportOption(opt.Download.Transport),
peer.WithConcurrentOption(opt.Download.Concurrent),
)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func (pt *peerTaskConductor) backSource() {

ctx, span := tracer.Start(pt.ctx, config.SpanBackSource)
pt.SetContentLength(-1)
err := pt.pieceManager.DownloadSource(ctx, pt, pt.request)
err := pt.pieceManager.DownloadSource(ctx, pt, pt.request, pt.rg)
if err != nil {
pt.Errorf("download from source error: %s", err)
span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/peer/peertask_piecetask_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (poller *pieceTaskPoller) getPieceTasksByPeer(
count int
ptc = poller.peerTaskConductor
)
p, _, err := retry.Run(ptc.ctx, func() (any, bool, error) {
p, _, err := retry.Run(ptc.ctx, 0.05, 0.2, 40, func() (any, bool, error) {
// GetPieceTasks must be fast, so short time out is okay
ctx, cancel := context.WithTimeout(ptc.ctx, 4*time.Second)
defer cancel()
Expand Down Expand Up @@ -235,7 +235,7 @@ func (poller *pieceTaskPoller) getPieceTasksByPeer(
trace.WithAttributes(config.AttributeGetPieceRetry.Int(count)))
ptc.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId)
return nil, false, dferrors.ErrEmptyValue
}, 0.05, 0.2, 40, nil)
})
if peerPacketChanged {
return nil, errPeerPacketChanged
}
Expand Down
Loading