Skip to content
This repository was archived by the owner on Dec 20, 2024. It is now read-only.

Commit 0fdd978

Browse files
authored
Merge pull request #620 from lowzj/increase-load
bugfix: increase load of the peer serving for others
2 parents a5e769d + 6acbac4 commit 0fdd978

File tree

4 files changed

+21
-20
lines changed

4 files changed

+21
-20
lines changed

supernode/daemon/mgr/progress/progress_manager.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,10 @@ func (pm *Manager) GetPeerStateByPeerID(ctx context.Context, peerID string) (*mg
254254

255255
return &mgr.PeerState{
256256
PeerID: peerID,
257-
ServiceDownTime: peerState.serviceDownTime,
258-
ClientErrorCount: peerState.clientErrorCount.Get(),
259-
ServiceErrorCount: peerState.serviceErrorCount.Get(),
260-
ProducerLoad: peerState.producerLoad.Get(),
257+
ServiceDownTime: &peerState.serviceDownTime,
258+
ClientErrorCount: peerState.clientErrorCount,
259+
ServiceErrorCount: peerState.serviceErrorCount,
260+
ProducerLoad: peerState.producerLoad,
261261
}, nil
262262
}
263263

supernode/daemon/mgr/progress/progress_util.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ func (pm *Manager) updatePeerProgress(taskID, srcPID, dstPID string, pieceNum, p
123123
return err
124124
}
125125
if err == nil {
126+
if dstPeerState.producerLoad == nil {
127+
dstPeerState.producerLoad = cutil.NewAtomicInt(0)
128+
}
126129
updateProducerLoad(dstPeerState.producerLoad, taskID, dstPID, pieceNum, pieceStatus)
127130
}
128131
}
@@ -214,12 +217,6 @@ func processPeerFailInfo(srcPeerState, dstPeerState *peerState) {
214217
// updateProducerLoad update the load of the clientID.
215218
// TODO: avoid multiple calls
216219
func updateProducerLoad(load *cutil.AtomicInt, taskID, peerID string, pieceNum, pieceStatus int) {
217-
if load == nil {
218-
logrus.Warnf("client load maybe not initialized,taskID: %s,peerID: %s,pieceNum: %d",
219-
taskID, peerID, pieceNum)
220-
load = cutil.NewAtomicInt(0)
221-
}
222-
223220
// increase the load of peerID when pieceStatus equals PieceRUNNING
224221
if pieceStatus == config.PieceRUNNING {
225222
load.Add(1)

supernode/daemon/mgr/progress_mgr.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@ type PeerState struct {
1313
PeerID string
1414

1515
// ProducerLoad is the load of download services provided by the current node.
16-
ProducerLoad int32
16+
ProducerLoad *cutil.AtomicInt
1717

1818
// ClientErrorCount maintains the number of times that PeerID failed to downloaded from the other peer nodes.
19-
ClientErrorCount int32
19+
ClientErrorCount *cutil.AtomicInt
2020

2121
// ServiceErrorCount maintains the number of times that the other peer nodes failed to downloaded from the PeerID.
22-
ServiceErrorCount int32
22+
ServiceErrorCount *cutil.AtomicInt
2323

2424
// ServiceDownTime the down time of the peer service.
25-
ServiceDownTime int64
25+
ServiceDownTime *int64
2626
}
2727

2828
// ProgressMgr is responsible for maintaining the correspondence between peer and pieces.

supernode/daemon/mgr/scheduler/manager.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,9 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, clientID, peerID
131131
if err != nil {
132132
return nil, err
133133
}
134-
if srcPeerState.ClientErrorCount > config.FailCountLimit {
135-
logrus.Warnf("peerID: %s got errors for %d times which reaches error limit: %d for taskID(%s)", peerID, srcPeerState.ClientErrorCount, config.FailCountLimit, taskID)
134+
if srcPeerState.ClientErrorCount.Get() > config.FailCountLimit {
135+
logrus.Warnf("peerID: %s got errors for %d times which reaches error limit: %d for taskID(%s)",
136+
peerID, srcPeerState.ClientErrorCount.Get(), config.FailCountLimit, taskID)
136137
useSupernode = true
137138
}
138139

@@ -191,13 +192,13 @@ func (sm *Manager) tryGetPID(ctx context.Context, taskID string, pieceNum int, p
191192
}
192193

193194
// if the service has been down, and then it should not be needed.
194-
if peerState.ServiceDownTime > 0 {
195+
if peerState.ServiceDownTime != nil && *(peerState.ServiceDownTime) > 0 {
195196
sm.deletePeerIDByPieceNum(ctx, taskID, pieceNum, peerIDs[i])
196197
continue
197198
}
198199

199200
// if service has failed for EliminationLimit times, and then it should not be needed.
200-
if peerState.ServiceErrorCount >= config.EliminationLimit {
201+
if peerState.ServiceErrorCount != nil && peerState.ServiceErrorCount.Get() >= config.EliminationLimit {
201202
sm.deletePeerIDByPieceNum(ctx, taskID, pieceNum, peerIDs[i])
202203
continue
203204
}
@@ -212,8 +213,11 @@ func (sm *Manager) tryGetPID(ctx context.Context, taskID string, pieceNum int, p
212213
continue
213214
}
214215

215-
if peerState.ProducerLoad < config.PeerUpLimit {
216-
return peerIDs[i]
216+
if peerState.ProducerLoad != nil {
217+
if peerState.ProducerLoad.Add(1) <= config.PeerUpLimit {
218+
return peerIDs[i]
219+
}
220+
peerState.ProducerLoad.Add(-1)
217221
}
218222
}
219223
return

0 commit comments

Comments
 (0)