fix(dot/network): fix data race and add t.Parallel to all tests#2105
fix(dot/network): fix data race and add t.Parallel to all tests#2105EclesioMeloJunior wants to merge 47 commits intodevelopmentfrom
t.Parallel to all tests#2105Conversation
qdm12
left a comment
There was a problem hiding this comment.
An early review, sorry I couldn't hold myself as I got excited with all this test parallelism 💯 😄
|
blocked by PR #2106 |
Co-authored-by: Quentin McGaw <quentin.mcgaw@gmail.com>
Codecov Report
@@ Coverage Diff @@
## development #2105 +/- ##
===============================================
+ Coverage 57.40% 60.06% +2.65%
===============================================
Files 217 214 -3
Lines 28587 27807 -780
===============================================
+ Hits 16411 16701 +290
+ Misses 10487 9381 -1106
- Partials 1689 1725 +36
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
t.Parallelt.Parallel in all tests
t.Parallel in all testst.Parallel in all tests
t.Parallel in all testst.Parallel to all tests
jimjbrettj
left a comment
There was a problem hiding this comment.
A few comments but overall looks good, great work! Network is a good place to remove race conditions so good stuff
| } | ||
| require.Equal(t, 6, len(handler.messages[nodeA.host.id()])) | ||
|
|
||
| time.Sleep(time.Millisecond * 200) |
There was a problem hiding this comment.
seconding this question
dot/network/stream_manager.go
Outdated
| ctx context.Context | ||
| streamDataMap *sync.Map //map[string]*streamData | ||
| ctx context.Context | ||
| streamDataMap *sync.Map //map[string]*streamData |
There was a problem hiding this comment.
SYNC.MAP??? Probably out of scope I would think. If we really are anti sync map maybe we should open an issue/epic to remove them from codebase?
|
|
||
| defer h.writersWG.Done() | ||
|
|
||
| select { |
There was a problem hiding this comment.
I'm a bit confused what the purpose of these two selects are, why cant they just be one? Would you mind explaining just for my own learning?
There was a problem hiding this comment.
basically, the first one prevents later goroutines to write on the already closed channel, the second is when the close case is triggered while we are blocked trying to write in a full channel, but this code is a way to gracefully close a channel that is being used by other goroutines (since closing a channel while writing causes race condition).
There was a problem hiding this comment.
Ooofff this bad. You don't have any guarantee h.closeCh won't get closed between the two select blocks (and hence closing h.actionQueue). Please change somehow.
dot/peerset/peerstate.go
Outdated
| ps.mu.Lock() | ||
| defer ps.mu.Unlock() | ||
|
|
||
| n, err := ps.getNode(pid) |
There was a problem hiding this comment.
we don't need to lock before this call?
There was a problem hiding this comment.
The PeersState causes race condition into network pkg because the getNode function access the ps.nodes map at the same time addReputation updates the map. I added Mutex to PeersState struct so getNode uses the Lock/Unlock to lock while reading and after getNode executes addReputation locks to write. It is not possible locking after getNode since getNode will starve waiting the unlock that will never be called, makes sense? 😅
However the previous implementation is not good so I updated to use a RWMutex instead and remove the getNode call from addReputation which uses the Lock/Unlock and now getNode uses RLock/RUnlock, basically getNode will blocks if another goroutine calls the addReputation first
| Telemetry telemetry.Client | ||
| Metrics metrics.IntervalConfig | ||
|
|
||
| MessageCacheTTL time.Duration |
There was a problem hiding this comment.
nit I think you can have this unexported
| ) | ||
|
|
||
| type testStreamHandler struct { | ||
| sync.Mutex |
There was a problem hiding this comment.
nit Maybe name the mutex so we know what it is protecting? Is it the messages map? Or something else?
| } | ||
|
|
||
| func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error { | ||
|
|
| for { | ||
| tot, err := readStream(stream, msgBytes) | ||
| if errors.Is(err, io.EOF) { | ||
| s.eofCh <- struct{}{} |
There was a problem hiding this comment.
Can you close it instead? I doubt you would get EOF twice right?
| expected := []ma.Multiaddr{ | ||
| mustNewMultiAddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)), | ||
| mustNewMultiAddr(fmt.Sprintf("/ip4/10.0.5.2/tcp/%d", port)), | ||
| require.True(t, multiAddrRegex.MatchString(addr.String())) |
There was a problem hiding this comment.
nit you can use require.Regexp(t, "myregex", "mystring")
| h.writerWGMutex.Lock() | ||
| h.writersWG.Add(1) | ||
| h.writerWGMutex.Unlock() |
There was a problem hiding this comment.
You should add to the writes wait group before launching the goroutine. And also the mutex protecting the waitgroup looks sketchy, is it really needed?
|
|
||
| defer h.writersWG.Done() | ||
|
|
||
| select { |
There was a problem hiding this comment.
Ooofff this bad. You don't have any guarantee h.closeCh won't get closed between the two select blocks (and hence closing h.actionQueue). Please change somehow.
| func (h *Handler) SortedPeers(setIdx int) chan peer.IDSlice { | ||
| resultPeersCh := make(chan peer.IDSlice) | ||
| h.actionQueue <- action{ | ||
| h.setActionQueue(action{ |
There was a problem hiding this comment.
perhaps rename to addToActionQueue?
| h.writerWGMutex.Lock() | ||
| h.writersWG.Wait() | ||
| h.writerWGMutex.Unlock() |
There was a problem hiding this comment.
A mutex for the wait group looks very strange, try to find another way 🤔 ?
|
|
||
| // peers return the list of all the peers we know of. | ||
| func (ps *PeersState) peers() []peer.ID { | ||
| ps.mu.RLock() |
There was a problem hiding this comment.
perhaps name mu with a name so we know what it's protecting? I guess peerStateMutex??
| @@ -273,24 +271,32 @@ func TestExistingStream(t *testing.T) { | |||
| require.NoError(t, err) | |||
|
|
|||
| time.Sleep(TestMessageTimeout) | |||
There was a problem hiding this comment.
can we refactor this test to not use sleeps anymore? Now that you have testStreamHandler, you can write messages into an internal channel from testStreamHandler.handleMessage and just listen on that channel.
| require.NoError(t, err) | ||
|
|
||
| time.Sleep(TestBackoffTimeout) | ||
| timeout := time.NewTimer(TestBackoffTimeout) |
| @@ -237,35 +237,36 @@ func TestBroadcastMessages(t *testing.T) { | |||
| // simulate message sent from core service | |||
| nodeA.GossipMessage(anounceMessage) | |||
| time.Sleep(time.Second * 2) | |||
| // All 5 message will be sent since cache is disabled. | ||
| for i := 0; i < 5; i++ { | ||
| nodeA.GossipMessage(announceMessage) | ||
| time.Sleep(time.Millisecond * 10) |
There was a problem hiding this comment.
why do we need to sleep here?
| addrInfoB := nodeB.host.addrInfo() | ||
| err := nodeA.host.connect(addrInfoB) | ||
| // retry connect if "failed to dial" error | ||
| if failedToDial(err) { |
There was a problem hiding this comment.
why would this error out on connect? I see this multiple times in this file. I would assume this thing should be ready? Otherwise, we should listen on some sort of internal ready channel to know that it's ready.
|
@EclesioMeloJunior can you resolve the conflicts? |
Of course, once I finish PR #2267 which this one depends on |
|
@EclesioMeloJunior can you change this to draft if you don't intend to merge this. |
Changes
dot/networkpackageTests
On apple silicon processors
Issues
Primary Reviewer