Skip to content

Commit 692cfc8

Browse files
Thread-Safe Stream Removal Info (#4905)
* thread-safe removal info * check removed streams and removal info before discovery process tries to add new stream * update proto counter safely only after valid addition or deletion * remove stream from reserved list as well once it's deleted * fix stream manager protocol id test
1 parent f902ebf commit 692cfc8

3 files changed

Lines changed: 43 additions & 3 deletions

File tree

p2p/stream/common/streammanager/streammanager.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,14 @@ type RemovalInfo struct {
7777
count uint64
7878
removedAt time.Time
7979
expireAt time.Time
80+
mu sync.RWMutex
8081
}
8182

8283
// MarkAsRemoved resets the removal time and increments the removal count.
8384
func (rm *RemovalInfo) MarkAsRemoved(criticalErr bool) {
85+
rm.mu.Lock()
86+
defer rm.mu.Unlock()
87+
8488
now := time.Now()
8589
if rm.count > 0 && now.Sub(rm.removedAt) > MaxRemovalCooldownDuration {
8690
rm.count = 0
@@ -101,19 +105,36 @@ func (rm *RemovalInfo) MarkAsRemoved(criticalErr bool) {
101105

102106
// RemovedAt returns the timestamp when the stream was removed.
103107
func (rm *RemovalInfo) RemovedAt() time.Time {
108+
rm.mu.RLock()
109+
defer rm.mu.RUnlock()
110+
104111
return rm.removedAt
105112
}
106113

107114
// HasExpired checks if the cooldown period has passed, allowing the stream to reconnect.
108115
func (rm *RemovalInfo) HasExpired() bool {
116+
rm.mu.RLock()
117+
defer rm.mu.RUnlock()
118+
109119
return time.Now().After(rm.expireAt)
110120
}
111121

112122
// BumpCount increases the removal count.
113123
func (rm *RemovalInfo) BumpCount() {
124+
rm.mu.Lock()
125+
defer rm.mu.Unlock()
126+
114127
rm.count++
115128
}
116129

130+
// ResetCount resets the removal count.
131+
func (rm *RemovalInfo) ResetCount() {
132+
rm.mu.Lock()
133+
defer rm.mu.Unlock()
134+
135+
rm.count = 0
136+
}
137+
117138
// NewStreamManager creates a new stream manager for the given proto ID
118139
func NewStreamManager(pid sttypes.ProtoID, host host, pf peerFinder, handleStream func(network.Stream), c Config) StreamManager {
119140
return newStreamManager(pid, host, pf, handleStream, c)
@@ -320,6 +341,9 @@ func (sm *streamManager) handleAddStream(st sttypes.Stream) error {
320341
if _, ok := sm.streams.get(id); ok {
321342
return ErrStreamAlreadyExist
322343
}
344+
if _, ok := sm.reservedStreams.get(id); ok {
345+
return ErrStreamAlreadyExist
346+
}
323347
// Check if stream was recently removed
324348
if removalInfo, exists := sm.removedStreams.Get(id); exists {
325349
if !removalInfo.HasExpired() {
@@ -386,6 +410,7 @@ func (sm *streamManager) handleRemoveStream(id sttypes.StreamID, reason string,
386410
return ErrStreamAlreadyRemoved
387411
}
388412
sm.streams.deleteStream(st)
413+
sm.reservedStreams.deleteStream(st)
389414

390415
sm.logger.Info().
391416
Int("NumStreams", sm.streams.size()).
@@ -476,12 +501,19 @@ func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) (int, e
476501
// If the peer has the same ID and was just connected, skip.
477502
continue
478503
}
479-
if _, ok := sm.streams.get(sttypes.StreamID(peer.ID)); ok {
504+
newStreamID := sttypes.StreamID(peer.ID)
505+
if _, ok := sm.streams.get(newStreamID); ok {
480506
continue
481507
}
482-
if _, ok := sm.reservedStreams.get(sttypes.StreamID(peer.ID)); ok {
508+
if _, ok := sm.reservedStreams.get(newStreamID); ok {
483509
continue
484510
}
511+
// Check if stream was recently removed
512+
if removalInfo, exists := sm.removedStreams.Get(newStreamID); exists {
513+
if !removalInfo.HasExpired() {
514+
continue
515+
}
516+
}
485517
discoveredPeersCounterVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Inc()
486518
connecting += 1
487519
sm.setupSem <- struct{}{}

p2p/stream/common/streammanager/streammanager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func TestStreamSet_numStreamsWithMinProtoID(t *testing.T) {
273273
for i := 0; i != numPid1; i++ {
274274
ss.addStream(newTestStream(makeStreamID(i), pid1))
275275
}
276-
for i := 0; i != numPid2; i++ {
276+
for i := numPid1; i != numPid1+numPid2; i++ {
277277
ss.addStream(newTestStream(makeStreamID(i), pid2))
278278
}
279279

p2p/stream/common/streammanager/streamset.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ func (ss *streamSet) addStream(st sttypes.Stream) {
5858
ss.lock.Lock()
5959
defer ss.lock.Unlock()
6060

61+
if _, exists := ss.streams[st.ID()]; exists {
62+
return
63+
}
64+
6165
if spec, err := st.ProtoSpec(); err != nil {
6266
return
6367
} else {
@@ -71,6 +75,10 @@ func (ss *streamSet) deleteStream(st sttypes.Stream) {
7175
ss.lock.Lock()
7276
defer ss.lock.Unlock()
7377

78+
if _, exists := ss.streams[st.ID()]; !exists {
79+
return
80+
}
81+
7482
delete(ss.streams, st.ID())
7583

7684
spec, _ := st.ProtoSpec()

0 commit comments

Comments
 (0)