Skip to content

Commit bb578e9

Browse files
feat: auto switch to concurrent back source based on download speed
Signed-off-by: greenhandatsjtu <[email protected]>
1 parent 6f0b90c commit bb578e9

File tree

8 files changed

+274
-32
lines changed

8 files changed

+274
-32
lines changed

client/config/peerhost.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ type TransportOption struct {
254254
type ConcurrentOption struct {
255255
// ThresholdSize indicates the threshold to download pieces concurrently
256256
ThresholdSize util.Size `mapstructure:"thresholdSize" yaml:"thresholdSize"`
257+
// ThresholdSpeed indicates the threshold download speed to download pieces concurrently
258+
ThresholdSpeed unit.Bytes `mapstructure:"thresholdSpeed" yaml:"thresholdSpeed"`
257259
// GoroutineCount indicates the concurrent goroutine count for every task
258260
GoroutineCount int `mapstructure:"goroutineCount" yaml:"goroutineCount"`
259261
// InitBackoff second for every piece failed, default: 0.5

client/config/peerhost_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ func TestPeerHostOption_Load(t *testing.T) {
337337
ThresholdSize: util.Size{
338338
Limit: 1,
339339
},
340+
ThresholdSpeed: unit.Bytes(1),
340341
GoroutineCount: 1,
341342
InitBackoff: 1,
342343
MaxBackoff: 1,

client/config/testdata/config/daemon.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ download:
7979
watchdogTimeout: 1s
8080
concurrent:
8181
thresholdSize: 1
82+
thresholdSpeed: 1
8283
goroutineCount: 1
8384
initBackoff: 1
8485
maxBackoff: 1

client/daemon/peer/piece_manager.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -291,19 +291,21 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, peerTaskReq
291291
if err != nil {
292292
return err
293293
}
294-
294+
var (
295+
metadata *source.Metadata
296+
supportConcurrent bool
297+
targetContentLength int64
298+
)
295299
if pm.concurrentOption != nil {
296300
// check metadata
297301
// 1. support range request
298302
// 2. target content length is greater than concurrentOption.ThresholdSize
299-
metadata, err := source.GetMetadata(backSourceRequest)
303+
metadata, err = source.GetMetadata(backSourceRequest)
300304
if err == nil && metadata.Validate != nil && metadata.Validate() == nil {
301305
if !metadata.SupportRange || metadata.TotalContentLength == -1 {
302306
goto singleDownload
303307
}
304-
var (
305-
targetContentLength int64
306-
)
308+
supportConcurrent = true
307309
if parsedRange != nil {
308310
if parsedRange.Length > -1 {
309311
targetContentLength = parsedRange.Length
@@ -337,7 +339,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, peerTaskReq
337339
return err
338340
}
339341
// use concurrent piece download mode
340-
return pm.concurrentDownloadSource(ctx, pt, peerTaskRequest, parsedRange, metadata)
342+
return pm.concurrentDownloadSource(ctx, pt, peerTaskRequest, parsedRange, metadata, 0)
341343
}
342344
}
343345
}
@@ -417,10 +419,10 @@ singleDownload:
417419
return pm.downloadUnknownLengthSource(pt, pieceSize, reader)
418420
}
419421

420-
return pm.downloadKnownLengthSource(pt, contentLength, pieceSize, reader)
422+
return pm.downloadKnownLengthSource(ctx, pt, contentLength, pieceSize, reader, response, peerTaskRequest, parsedRange, metadata, supportConcurrent, targetContentLength)
421423
}
422424

423-
func (pm *pieceManager) downloadKnownLengthSource(pt Task, contentLength int64, pieceSize uint32, reader io.Reader) error {
425+
func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task, contentLength int64, pieceSize uint32, reader io.Reader, response *source.Response, peerTaskRequest *scheduler.PeerTaskRequest, parsedRange *clientutil.Range, metadata *source.Metadata, supportConcurrent bool, targetContentLength int64) error {
424426
log := pt.Log()
425427
maxPieceNum := util.ComputePieceCount(contentLength, pieceSize)
426428
pt.SetContentLength(contentLength)
@@ -466,6 +468,26 @@ func (pm *pieceManager) downloadKnownLengthSource(pt Task, contentLength int64,
466468

467469
pt.ReportPieceResult(request, result, nil)
468470
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
471+
if supportConcurrent {
472+
speed := int64(pieceSize) / (result.FinishTime - result.BeginTime)
473+
if speed < int64(pm.concurrentOption.ThresholdSpeed) {
474+
err = pt.GetStorage().UpdateTask(ctx,
475+
&storage.UpdateTaskRequest{
476+
PeerTaskMetadata: storage.PeerTaskMetadata{
477+
PeerID: pt.GetPeerID(),
478+
TaskID: pt.GetTaskID(),
479+
},
480+
ContentLength: targetContentLength,
481+
Header: &metadata.Header,
482+
})
483+
if err != nil {
484+
log.Errorf("update task error: %s", err)
485+
return err
486+
}
487+
response.Body.Close()
488+
return pm.concurrentDownloadSource(ctx, pt, peerTaskRequest, parsedRange, metadata, pieceNum+1)
489+
}
490+
}
469491
}
470492

471493
log.Infof("download from source ok")
@@ -724,7 +746,7 @@ func (pm *pieceManager) Import(ctx context.Context, ptm storage.PeerTaskMetadata
724746
return nil
725747
}
726748

727-
func (pm *pieceManager) concurrentDownloadSource(ctx context.Context, pt Task, peerTaskRequest *scheduler.PeerTaskRequest, parsedRange *clientutil.Range, metadata *source.Metadata) error {
749+
func (pm *pieceManager) concurrentDownloadSource(ctx context.Context, pt Task, peerTaskRequest *scheduler.PeerTaskRequest, parsedRange *clientutil.Range, metadata *source.Metadata, startPieceNum int32) error {
728750
// parsedRange is always exist
729751
pieceSize := pm.computePieceSize(parsedRange.Length)
730752
pieceCount := util.ComputePieceCount(parsedRange.Length, pieceSize)
@@ -744,7 +766,7 @@ func (pm *pieceManager) concurrentDownloadSource(ctx context.Context, pt Task, p
744766
var pieceCh = make(chan int32, con)
745767

746768
wg := sync.WaitGroup{}
747-
wg.Add(int(pieceCount))
769+
wg.Add(int(pieceCount - startPieceNum))
748770

749771
downloadedPieceCount := atomic.NewInt32(0)
750772

@@ -784,7 +806,7 @@ func (pm *pieceManager) concurrentDownloadSource(ctx context.Context, pt Task, p
784806
}(i)
785807
}
786808

787-
for i := int32(0); i < pieceCount; i++ {
809+
for i := startPieceNum; i < pieceCount; i++ {
788810
select {
789811
case <-ctx.Done():
790812
log.Warnf("context cancelled")

0 commit comments

Comments
 (0)