Skip to content

Commit 1e4aa1b

Browse files
authored
Merge pull request dragonflyoss#621 from Starnop/update-after-schedluer
bugfix: update the client progress after success to schedule
2 parents 3513397 + 2ceac4d commit 1e4aa1b

File tree

5 files changed

+58
-15
lines changed

5 files changed

+58
-15
lines changed

supernode/daemon/mgr/mock/mock_progress_mgr.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

supernode/daemon/mgr/progress/progress_manager.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func NewManager(cfg *config.Config) (*Manager, error) {
6767
}, nil
6868
}
6969

70-
// InitProgress init the correlation information between peers and pieces, etc.
70+
// InitProgress inits the correlation information between peers and pieces, etc.
7171
func (pm *Manager) InitProgress(ctx context.Context, taskID, peerID, clientID string) (err error) {
7272
// validate the param
7373
if cutil.IsEmptyStr(taskID) {
@@ -100,7 +100,7 @@ func (pm *Manager) InitProgress(ctx context.Context, taskID, peerID, clientID st
100100
return pm.peerProgress.add(peerID, newPeerState())
101101
}
102102

103-
// UpdateProgress update the correlation information between peers and pieces.
103+
// UpdateProgress updates the correlation information between peers and pieces.
104104
// NOTE: What if the update failed?
105105
func (pm *Manager) UpdateProgress(ctx context.Context, taskID, srcCID, srcPID, dstPID string, pieceNum, pieceStatus int) error {
106106
if cutil.IsEmptyStr(taskID) {
@@ -152,7 +152,28 @@ func (pm *Manager) UpdateProgress(ctx context.Context, taskID, srcCID, srcPID, d
152152
return nil
153153
}
154154

155-
// GetPieceProgressByCID get all pieces with specified clientID.
155+
// UpdateClientProgress updates the clientProgress and superProgress.
156+
func (pm *Manager) UpdateClientProgress(ctx context.Context, taskID, srcCID, dstPID string, pieceNum, pieceStatus int) error {
157+
if cutil.IsEmptyStr(taskID) {
158+
return errors.Wrap(errorType.ErrEmptyValue, "taskID")
159+
}
160+
if cutil.IsEmptyStr(srcCID) {
161+
return errors.Wrapf(errorType.ErrEmptyValue, "srcCID for taskID:%s", taskID)
162+
}
163+
164+
result, err := pm.updateClientProgress(taskID, srcCID, dstPID, pieceNum, pieceStatus)
165+
if err != nil {
166+
logrus.Errorf("failed to update ClientProgress taskID(%s) srcCID(%s) dstPID(%s) pieceNum(%d) pieceStatus(%d): %v",
167+
taskID, srcCID, dstPID, pieceNum, pieceStatus, err)
168+
return err
169+
}
170+
logrus.Debugf("success to update ClientProgress taskID(%s) srcCID(%s) dstPID(%s) pieceNum(%d) pieceStatus(%d) with result: %t",
171+
taskID, srcCID, dstPID, pieceNum, pieceStatus, result)
172+
173+
return nil
174+
}
175+
176+
// GetPieceProgressByCID gets all pieces with specified clientID.
156177
//
157178
// And the pieceStatus should be one of the `PieceRunning`,`PieceSuccess` and `PieceAvailable`.
158179
// If not, the `PieceAvailable` will be as the default value.
@@ -185,7 +206,7 @@ func (pm *Manager) GetPieceProgressByCID(ctx context.Context, taskID, clientID,
185206
return getAvailablePieces(clientBitset, cdnBitset, runningPieces)
186207
}
187208

188-
// DeletePieceProgressByCID delete the pieces progress with specified clientID.
209+
// DeletePieceProgressByCID deletes the pieces progress with specified clientID.
189210
func (pm *Manager) DeletePieceProgressByCID(ctx context.Context, taskID, clientID string) (err error) {
190211
if pm.cfg.IsSuperCID(clientID) {
191212
return pm.superProgress.remove(taskID)
@@ -250,12 +271,12 @@ func (pm *Manager) DeletePeerStateByPeerID(ctx context.Context, peerID string) e
250271
return pm.peerProgress.remove(peerID)
251272
}
252273

253-
// GetPeersByTaskID get all peers info with specified taskID.
274+
// GetPeersByTaskID gets all peers info with specified taskID.
254275
func (pm *Manager) GetPeersByTaskID(ctx context.Context, taskID string) (peersInfo []*types.PeerInfo, err error) {
255276
return nil, nil
256277
}
257278

258-
// GetBlackInfoByPeerID get black info with specified peerID.
279+
// GetBlackInfoByPeerID gets black info with specified peerID.
259280
func (pm *Manager) GetBlackInfoByPeerID(ctx context.Context, peerID string) (dstPIDMap *cutil.SyncMap, err error) {
260281
return pm.clientBlackInfo.GetAsMap(peerID)
261282
}

supernode/daemon/mgr/progress/progress_util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (pm *Manager) updatePieceProgress(taskID, srcPID string, pieceNum int) erro
4646
return pstate.add(srcPID)
4747
}
4848

49-
// updateClientProgress update the client progress when clientID is not a supernode,
49+
// updateClientProgress updates the client progress when clientID is not a supernode,
5050
// otherwise update the super progress.
5151
func (pm *Manager) updateClientProgress(taskID, srcCID, dstPID string, pieceNum, pieceStatus int) (bool, error) {
5252
// update piece bitSet

supernode/daemon/mgr/progress_mgr.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,23 @@ type PeerState struct {
2727

2828
// ProgressMgr is responsible for maintaining the correspondence between peer and pieces.
2929
type ProgressMgr interface {
30-
// InitProgress init the correlation information between peers and pieces, etc.
30+
// InitProgress inits the correlation information between peers and pieces, etc.
3131
InitProgress(ctx context.Context, taskID, peerID, clientID string) error
3232

33-
// UpdateProgress update the correlation information between peers and pieces.
33+
// UpdateProgress updates the correlation information between peers and pieces.
3434
// 1. update the info about srcCID to tell the scheduler that corresponding peer has the piece now.
3535
// 2. update the info about dstPID to tell the scheduler that someone has downloaded the piece form here.
3636
// Scheduler will calculate the load and times of error/success for every peer to make better decisions.
3737
UpdateProgress(ctx context.Context, taskID, srcCID, srcPID, dstPID string, pieceNum, pieceStatus int) error
3838

39-
// GetPieceProgressByCID get all pieces progress with specified clientID.
39+
// UpdateClientProgress updates the info when success to schedule peer srcCID to download from dstPID.
40+
UpdateClientProgress(ctx context.Context, taskID, srcCID, dstPID string, pieceNum, pieceStatus int) error
41+
42+
// GetPieceProgressByCID gets all pieces progress with specified clientID.
4043
// The filter parameter depends on the specific implementation.
4144
GetPieceProgressByCID(ctx context.Context, taskID, clientID, filter string) (pieceNums []int, err error)
4245

43-
// DeletePieceProgressByCID delete the pieces progress with specified clientID.
46+
// DeletePieceProgressByCID deletes the pieces progress with specified clientID.
4447
DeletePieceProgressByCID(ctx context.Context, taskID, clientID string) (err error)
4548

4649
// GetPeerIDsByPieceNum gets all peerIDs with specified taskID and pieceNum.
@@ -56,9 +59,9 @@ type ProgressMgr interface {
5659
// DeletePeerStateByPeerID deletes the peerState by PeerID.
5760
DeletePeerStateByPeerID(ctx context.Context, peerID string) error
5861

59-
// GetPeersByTaskID get all peers info with specified taskID.
62+
// GetPeersByTaskID gets all peers info with specified taskID.
6063
GetPeersByTaskID(ctx context.Context, taskID string) (peersInfo []*types.PeerInfo, err error)
6164

62-
// GetBlackInfoByPeerID get black info with specified peerID.
65+
// GetBlackInfoByPeerID gets black info with specified peerID.
6366
GetBlackInfoByPeerID(ctx context.Context, peerID string) (dstPIDMap *cutil.SyncMap, err error)
6467
}

supernode/daemon/mgr/scheduler/manager.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (sm *Manager) Schedule(ctx context.Context, taskID, clientID, peerID string
6565
}
6666
logrus.Debugf("scheduler get pieces %v with prioritize for taskID(%s)", pieceNums, taskID)
6767

68-
return sm.getPieceResults(ctx, taskID, peerID, pieceNums, runningCount)
68+
return sm.getPieceResults(ctx, taskID, clientID, peerID, pieceNums, runningCount)
6969
}
7070

7171
func (sm *Manager) sort(ctx context.Context, pieceNums, runningPieces []int, taskID string) ([]int, error) {
@@ -124,7 +124,7 @@ func (sm *Manager) sortExecutor(ctx context.Context, pieceNums []int, centerNum
124124
})
125125
}
126126

127-
func (sm *Manager) getPieceResults(ctx context.Context, taskID, peerID string, pieceNums []int, runningCount int) ([]*mgr.PieceResult, error) {
127+
func (sm *Manager) getPieceResults(ctx context.Context, taskID, clientID, peerID string, pieceNums []int, runningCount int) ([]*mgr.PieceResult, error) {
128128
// validate ClientErrorCount
129129
var useSupernode bool
130130
srcPeerState, err := sm.progressMgr.GetPeerStateByPeerID(ctx, peerID)
@@ -154,6 +154,11 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, peerID string, p
154154
continue
155155
}
156156

157+
if err := sm.progressMgr.UpdateClientProgress(ctx, taskID, clientID, dstPID, pieceNums[i], config.PieceRUNNING); err != nil {
158+
logrus.Warnf("failed to update client progress running for pieceNum(%d) taskID(%s) clientID(%s) dstPID(%s)", pieceNums[i], taskID, clientID, dstPID)
159+
continue
160+
}
161+
157162
pieceResults = append(pieceResults, &mgr.PieceResult{
158163
TaskID: taskID,
159164
PieceNum: pieceNums[i],

0 commit comments

Comments
 (0)