Skip to content
This repository was archived by the owner on Dec 20, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions dfget/core/downloader/p2p_downloader/client_stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type ClientStreamWriter struct {
// The downloader will put the piece into this queue after it downloaded a piece successfully.
// And clientWriter will poll values from this queue constantly and write to disk.
clientQueue queue.Queue

// notifyQueue sends a notification when all operation about a piece have
// been completed successfully.
notifyQueue queue.Queue

// finish indicates whether the task written is completed.
finish chan struct{}

Expand Down Expand Up @@ -68,11 +73,12 @@ type ClientStreamWriter struct {
}

// NewClientStreamWriter creates and initialize a ClientStreamWriter instance.
func NewClientStreamWriter(clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) *ClientStreamWriter {
func NewClientStreamWriter(clientQueue, notifyQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) *ClientStreamWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should abstract an interface for client writer, WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, there is an ambitious plan to do this, but not in this pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

pr, pw := io.Pipe()
limitReader := limitreader.NewLimitReader(pr, int64(cfg.LocalLimit), cfg.Md5 != "")
clientWriter := &ClientStreamWriter{
clientQueue: clientQueue,
notifyQueue: notifyQueue,
pipeReader: pr,
pipeWriter: pw,
limitReader: limitReader,
Expand Down Expand Up @@ -139,7 +145,7 @@ func (csw *ClientStreamWriter) write(piece *Piece) error {

err := csw.writePieceToPipe(piece)
if err == nil {
go sendSuccessPiece(csw.api, csw.cfg.RV.Cid, piece, time.Since(startTime))
go sendSuccessPiece(csw.api, csw.cfg.RV.Cid, piece, time.Since(startTime), csw.notifyQueue)
}
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
copy(cases2, cases)

cfg := &config.Config{}
csw := NewClientStreamWriter(nil, nil, cfg)
csw := NewClientStreamWriter(nil, nil, nil, cfg)
go func() {
for _, v := range cases2 {
err := csw.writePieceToPipe(v.piece)
Expand Down
16 changes: 13 additions & 3 deletions dfget/core/downloader/p2p_downloader/client_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type ClientWriter struct {
// The downloader will put the piece into this queue after it downloaded a piece successfully.
// And clientWriter will poll values from this queue constantly and write to disk.
clientQueue queue.Queue

// notifyQueue sends a notification when all operation about a piece have
// been completed successfully.
notifyQueue queue.Queue

// finish indicates whether the task written is completed.
finish chan struct{}

Expand Down Expand Up @@ -95,9 +100,11 @@ type ClientWriter struct {

// NewClientWriter creates and initialize a ClientWriter instance.
func NewClientWriter(clientFilePath, serviceFilePath string,
clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config, cdnSource apiTypes.CdnSource) PieceWriter {
clientQueue, notifyQueue queue.Queue,
api api.SupernodeAPI, cfg *config.Config, cdnSource apiTypes.CdnSource) PieceWriter {
clientWriter := &ClientWriter{
clientQueue: clientQueue,
notifyQueue: notifyQueue,
clientFilePath: clientFilePath,
serviceFilePath: serviceFilePath,
api: api,
Expand Down Expand Up @@ -219,7 +226,7 @@ func (cw *ClientWriter) write(piece *Piece) error {
cw.pieceIndex++
err := writePieceToFile(piece, cw.serviceFile, cw.cdnSource)
if err == nil {
go sendSuccessPiece(cw.api, cw.cfg.RV.Cid, piece, time.Since(startTime))
go sendSuccessPiece(cw.api, cw.cfg.RV.Cid, piece, time.Since(startTime), cw.notifyQueue)
}
return err
}
Expand Down Expand Up @@ -247,7 +254,7 @@ func startSyncWriter(q queue.Queue) queue.Queue {
return nil
}

func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.Duration) {
func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.Duration, notifyQueue queue.Queue) {
reportPieceRequest := &types.ReportPieceRequest{
TaskID: piece.TaskID,
Cid: cid,
Expand All @@ -265,6 +272,9 @@ func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.

_, err := api.ReportPiece(piece.SuperNode, reportPieceRequest)
if err == nil {
if notifyQueue != nil {
notifyQueue.Put("success")
}
if retry > 0 {
logrus.Warnf("success to report piece with request(%+v) after retrying (%d) times", reportPieceRequest, retry)
}
Expand Down
39 changes: 29 additions & 10 deletions dfget/core/downloader/p2p_downloader/p2p_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type P2PDownloader struct {
// And clientWriter will poll values from this queue constantly and write to disk.
clientQueue queue.Queue

// notifyQueue maintains a queue for notifying p2p downloader to pull next download tasks.
notifyQueue queue.Queue

// clientFilePath is the full path of the temp file.
clientFilePath string
// serviceFilePath is the full path of the temp service file which
Expand Down Expand Up @@ -150,6 +153,7 @@ func (p2p *P2PDownloader) init() {
p2p.queue.Put(NewPieceSimple(p2p.taskID, p2p.node, constants.TaskStatusStart, p2p.RegisterResult.CDNSource))

p2p.clientQueue = queue.NewQueue(p2p.cfg.ClientQueueSize)
p2p.notifyQueue = queue.NewQueue(p2p.cfg.ClientQueueSize)

p2p.clientFilePath = helper.GetTaskFile(p2p.taskFileName, p2p.cfg.RV.DataDir)
p2p.serviceFilePath = helper.GetServiceFile(p2p.taskFileName, p2p.cfg.RV.DataDir)
Expand All @@ -165,7 +169,9 @@ func (p2p *P2PDownloader) Run(ctx context.Context) error {
if p2p.streamMode {
return fmt.Errorf("streamMode enabled, should be disable")
}
clientWriter := NewClientWriter(p2p.clientFilePath, p2p.serviceFilePath, p2p.clientQueue, p2p.API, p2p.cfg, p2p.RegisterResult.CDNSource)
clientWriter := NewClientWriter(p2p.clientFilePath, p2p.serviceFilePath,
p2p.clientQueue, p2p.notifyQueue,
p2p.API, p2p.cfg, p2p.RegisterResult.CDNSource)
return p2p.run(ctx, clientWriter)
}

Expand All @@ -174,7 +180,7 @@ func (p2p *P2PDownloader) RunStream(ctx context.Context) (io.Reader, error) {
if !p2p.streamMode {
return nil, fmt.Errorf("streamMode disable, should be enabled")
}
clientStreamWriter := NewClientStreamWriter(p2p.clientQueue, p2p.API, p2p.cfg)
clientStreamWriter := NewClientStreamWriter(p2p.clientQueue, p2p.notifyQueue, p2p.API, p2p.cfg)
go func() {
err := p2p.run(ctx, clientStreamWriter)
if err != nil {
Expand Down Expand Up @@ -280,14 +286,10 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) (
break
}

sleepTime := time.Duration(rand.Intn(p2p.maxTimeout-p2p.minTimeout)+p2p.minTimeout) * time.Millisecond
logrus.Infof("pull piece task(%+v) result:%s and sleep %.3fs", item, res, sleepTime.Seconds())
time.Sleep(sleepTime)

// gradually increase the sleep time, up to [800-1600]
if p2p.minTimeout < 800 {
p2p.minTimeout *= 2
p2p.maxTimeout *= 2
actual, expected := p2p.sleepInterval()
if expected > actual || logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.Infof("pull piece task(%+v) result:%s and sleep actual:%.3fs expected:%.3fs",
item, res, actual.Seconds(), expected.Seconds())
}
}

Expand All @@ -314,6 +316,23 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) (
return p2p.pullPieceTask(item)
}

// sleepInterval sleep for a while to wait for next pulling piece task until
// receiving a notification which indicating that all the previous works have
// been completed.
func (p2p *P2PDownloader) sleepInterval() (actual, expected time.Duration) {
expected = time.Duration(rand.Intn(p2p.maxTimeout-p2p.minTimeout)+p2p.minTimeout) * time.Millisecond
start := time.Now()
p2p.notifyQueue.PollTimeout(expected)
actual = time.Now().Sub(start)

// gradually increase the sleep time, up to [800-1600]
if p2p.minTimeout < 800 {
p2p.minTimeout *= 2
p2p.maxTimeout *= 2
}
return actual, expected
}

// getPullRate gets download rate limit dynamically.
func (p2p *P2PDownloader) getPullRate(data *types.PullPieceTaskResponseContinueData) {
if time.Since(p2p.pullRateTime).Seconds() < 3 {
Expand Down