@@ -84,6 +84,9 @@ type P2PDownloader struct {
8484	// And clientWriter will poll values from this queue constantly and write to disk. 
8585	clientQueue  queue.Queue 
8686
87+ 	// notifyQueue maintains a queue for notifying p2p downloader to pull next download tasks. 
88+ 	notifyQueue  queue.Queue 
89+ 
8790	// clientFilePath is the full path of the temp file. 
8891	clientFilePath  string 
8992	// serviceFilePath is the full path of the temp service file which 
@@ -150,6 +153,7 @@ func (p2p *P2PDownloader) init() {
150153	p2p .queue .Put (NewPieceSimple (p2p .taskID , p2p .node , constants .TaskStatusStart , p2p .RegisterResult .CDNSource ))
151154
152155	p2p .clientQueue  =  queue .NewQueue (p2p .cfg .ClientQueueSize )
156+ 	p2p .notifyQueue  =  queue .NewQueue (p2p .cfg .ClientQueueSize )
153157
154158	p2p .clientFilePath  =  helper .GetTaskFile (p2p .taskFileName , p2p .cfg .RV .DataDir )
155159	p2p .serviceFilePath  =  helper .GetServiceFile (p2p .taskFileName , p2p .cfg .RV .DataDir )
@@ -165,7 +169,9 @@ func (p2p *P2PDownloader) Run(ctx context.Context) error {
165169	if  p2p .streamMode  {
166170		return  fmt .Errorf ("streamMode enabled, should be disable" )
167171	}
168- 	clientWriter  :=  NewClientWriter (p2p .clientFilePath , p2p .serviceFilePath , p2p .clientQueue , p2p .API , p2p .cfg , p2p .RegisterResult .CDNSource )
172+ 	clientWriter  :=  NewClientWriter (p2p .clientFilePath , p2p .serviceFilePath ,
173+ 		p2p .clientQueue , p2p .notifyQueue ,
174+ 		p2p .API , p2p .cfg , p2p .RegisterResult .CDNSource )
169175	return  p2p .run (ctx , clientWriter )
170176}
171177
@@ -174,7 +180,7 @@ func (p2p *P2PDownloader) RunStream(ctx context.Context) (io.Reader, error) {
174180	if  ! p2p .streamMode  {
175181		return  nil , fmt .Errorf ("streamMode disable, should be enabled" )
176182	}
177- 	clientStreamWriter  :=  NewClientStreamWriter (p2p .clientQueue , p2p .API , p2p .cfg )
183+ 	clientStreamWriter  :=  NewClientStreamWriter (p2p .clientQueue , p2p .notifyQueue ,  p2p . API , p2p .cfg )
178184	go  func () {
179185		err  :=  p2p .run (ctx , clientStreamWriter )
180186		if  err  !=  nil  {
@@ -280,14 +286,10 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) (
280286			break 
281287		}
282288
283- 		sleepTime  :=  time .Duration (rand .Intn (p2p .maxTimeout - p2p .minTimeout )+ p2p .minTimeout ) *  time .Millisecond 
284- 		logrus .Infof ("pull piece task(%+v) result:%s and sleep %.3fs" , item , res , sleepTime .Seconds ())
285- 		time .Sleep (sleepTime )
286- 
287- 		// gradually increase the sleep time, up to [800-1600] 
288- 		if  p2p .minTimeout  <  800  {
289- 			p2p .minTimeout  *=  2 
290- 			p2p .maxTimeout  *=  2 
289+ 		actual , expected  :=  p2p .sleepInterval ()
290+ 		if  expected  >  actual  ||  logrus .IsLevelEnabled (logrus .DebugLevel ) {
291+ 			logrus .Infof ("pull piece task(%+v) result:%s and sleep actual:%.3fs expected:%.3fs" ,
292+ 				item , res , actual .Seconds (), expected .Seconds ())
291293		}
292294	}
293295
@@ -314,6 +316,23 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) (
314316	return  p2p .pullPieceTask (item )
315317}
316318
319+ // sleepInterval sleep for a while to wait for next pulling piece task until 
320+ // receiving a notification which indicating that all the previous works have 
321+ // been completed. 
322+ func  (p2p  * P2PDownloader ) sleepInterval () (actual , expected  time.Duration ) {
323+ 	expected  =  time .Duration (rand .Intn (p2p .maxTimeout - p2p .minTimeout )+ p2p .minTimeout ) *  time .Millisecond 
324+ 	start  :=  time .Now ()
325+ 	p2p .notifyQueue .PollTimeout (expected )
326+ 	actual  =  time .Now ().Sub (start )
327+ 
328+ 	// gradually increase the sleep time, up to [800-1600] 
329+ 	if  p2p .minTimeout  <  800  {
330+ 		p2p .minTimeout  *=  2 
331+ 		p2p .maxTimeout  *=  2 
332+ 	}
333+ 	return  actual , expected 
334+ }
335+ 
317336// getPullRate gets download rate limit dynamically. 
318337func  (p2p  * P2PDownloader ) getPullRate (data  * types.PullPieceTaskResponseContinueData ) {
319338	if  time .Since (p2p .pullRateTime ).Seconds () <  3  {
0 commit comments