-
Notifications
You must be signed in to change notification settings - Fork 33
[P2P] Integrate background router #732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 6 commits
Commits
Show all changes
70 commits
Select commit
Hold shift + click to select a range
fe24824
refactor: unicast router
bryanchriswhite 1277859
chore: cleanup TODOs
bryanchriswhite 871af48
chore: add background message
bryanchriswhite 013c433
chore: add `Router#Close()`
bryanchriswhite 4f998ee
chore: separate raintree & bg protocol IDs
bryanchriswhite 1c9c18c
chore: generate `PocketEnvelope` nonce in `PackMessage()`
bryanchriswhite f9a0c10
refactor: add `Handler` to router config validation
bryanchriswhite 0bdffbd
refactor: raintree router
bryanchriswhite 2b40776
refactor: background router
bryanchriswhite 213f294
refactor: integrate bg router
bryanchriswhite 437afc8
refactor: staked actor router peer discovery
bryanchriswhite 43cf671
test: post-refactor updates
bryanchriswhite 4b19d8f
Merge remote-tracking branch 'pokt/main' into refactor/unicast-router
bryanchriswhite cf886a7
Merge branch 'refactor/unicast-router' into feat/integrate-bg-router
bryanchriswhite 048e306
fix: gofmt
bryanchriswhite d7278b8
Merge remote-tracking branch 'pokt/main' into refactor/unicast-router
bryanchriswhite acc1d59
Merge branch 'refactor/unicast-router' into feat/integrate-bg-router
bryanchriswhite 9ab2a5d
chore: fix typo in comment
bryanchriswhite dce1bac
chore: add debug log
bryanchriswhite 8dc2852
chore: fix field comment out of place
bryanchriswhite a6d4b52
fix: imports
bryanchriswhite 87d1fa9
Merge branch 'refactor/unicast-router' into feat/integrate-bg-router
bryanchriswhite d24407b
chore: bootstrap refactor / TECHDEBT
bryanchriswhite 70ca573
fix: imports
bryanchriswhite 8467f3a
chore: remove unused field
bryanchriswhite c3bc4c7
chore: cleanup unused test utils
bryanchriswhite 3fdada6
chore: comment cleanup
bryanchriswhite 39a7877
chore: add submodule TECHDEBT comments
bryanchriswhite 049cbf5
chore: add missing godoc comments
bryanchriswhite 79a1c6e
chore: cleanup unused garbage
bryanchriswhite a7c4bf6
fix: return error
bryanchriswhite 904f17b
Merge remote-tracking branch 'pokt/main' into refactor/unicast-router
bryanchriswhite 70b020b
Merge branch 'refactor/unicast-router' into feat/integrate-bg-router
bryanchriswhite 8a54f1a
chore: router logging improvements
bryanchriswhite 6795c96
fix: interim background router bootstrapping
bryanchriswhite 0a3fac1
fix: `p2pModule#Send()` routing logic
bryanchriswhite b8f9a1a
chore: improve variable naming
bryanchriswhite de63d6d
chore: improve comments
bryanchriswhite 1282e1a
chore: improve debug logging
bryanchriswhite 5793b7f
chore: return early
bryanchriswhite 95a3948
chore: add TECHDEBT comment
bryanchriswhite 7cdc9e7
Merge remote-tracking branch 'pokt/main' into HEAD
bryanchriswhite fe42ab3
test: fix raintree message target test
bryanchriswhite 60cd2bd
docs: update P2P readme
bryanchriswhite 8354d79
docs: update table of contents
bryanchriswhite f7b0202
docs: tweak P2P readme
bryanchriswhite d2f33a4
chore: add godoc comment
bryanchriswhite b527d91
chore: remove warning log
bryanchriswhite 73da86c
chore: convert `DISCUSS_THIS_COMMIT` to `TECHDEBT`
bryanchriswhite bf96542
Merge remote-tracking branch 'pokt/main' into feat/integrate-bg-router
bryanchriswhite 4602283
fix: typo
bryanchriswhite 65a8c94
chore: review suggestion improvements
bryanchriswhite 519db25
fix: gofmt
bryanchriswhite 7e7e6e7
docs: README improvements (review feedback)
bryanchriswhite f3437cb
docs: add architecture design language section
bryanchriswhite 4f87921
chore: background router comment and var name cleanup
bryanchriswhite c113a36
chore: review feedback improvements
bryanchriswhite 3356f63
chore: add issue # to TECHDEBT comment
bryanchriswhite 157ecb6
docs: update TOC
bryanchriswhite 30cf145
chore: add TODO README
bryanchriswhite 40628c4
docs: improve legend definitions
bryanchriswhite 8c0b8c3
docs: clarify broadcast table
bryanchriswhite 5a4cc80
docs: fix mistake in peer discovery section
bryanchriswhite 3e997f5
test: improve background router validation test
bryanchriswhite abd4789
chore: add error log
bryanchriswhite 8c6ac68
fix: unstaked actor bootstrapping FSM transition
bryanchriswhite b76efdf
fix: goimports
bryanchriswhite 66afa18
Merge remote-tracking branch 'pokt/main' into feat/integrate-bg-router
bryanchriswhite 5aa0a5c
docs: update unicast/broadcast table
bryanchriswhite 6fde86f
Merge branch 'main' into feat/integrate-bg-router
bryanchriswhite File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,7 +33,10 @@ import ( | |
| ) | ||
|
|
||
| // https://www.rfc-editor.org/rfc/rfc3986#section-3.2.2 | ||
| const testIP6ServiceURL = "[2a00:1450:4005:802::2004]:8080" | ||
| const ( | ||
| testIP6ServiceURL = "[2a00:1450:4005:802::2004]:8080" | ||
| invalidReceiveTimeout = time.Millisecond * 500 | ||
| ) | ||
|
|
||
| // TECHDEBT(#609): move & de-dup. | ||
| var ( | ||
|
|
@@ -42,7 +45,7 @@ var ( | |
| ) | ||
|
|
||
| func TestBackgroundRouter_AddPeer(t *testing.T) { | ||
| testRouter := newTestRouter(t, nil, noopHandler) | ||
| testRouter := newTestRouter(t, nil, nil) | ||
| libp2pPStore := testRouter.host.Peerstore() | ||
|
|
||
| // NB: assert initial state | ||
|
|
@@ -90,7 +93,7 @@ func TestBackgroundRouter_AddPeer(t *testing.T) { | |
| } | ||
|
|
||
| func TestBackgroundRouter_RemovePeer(t *testing.T) { | ||
| testRouter := newTestRouter(t, nil, noopHandler) | ||
| testRouter := newTestRouter(t, nil, nil) | ||
| peerstore := testRouter.host.Peerstore() | ||
|
|
||
| // NB: assert initial state | ||
|
|
@@ -124,69 +127,112 @@ func TestBackgroundRouter_RemovePeer(t *testing.T) { | |
| } | ||
|
|
||
| func TestBackgroundRouter_Validation(t *testing.T) { | ||
| ctx := context.Background() | ||
| libp2pMockNet := mocknet.New() | ||
|
|
||
| invalidWireFormatData := []byte("test message") | ||
| invalidPocketEnvelope := &anypb.Any{ | ||
| TypeUrl: "/test", | ||
| Value: invalidWireFormatData, | ||
| invalidProtoMessage := anypb.Any{ | ||
| TypeUrl: "/notADefinedProtobufType", | ||
| Value: []byte("not a serialized protobuf"), | ||
| } | ||
| invalidPocketEnvelopeBz, err := proto.Marshal(invalidPocketEnvelope) | ||
| require.NoError(t, err) | ||
|
|
||
| invalidMessages := [][]byte{ | ||
| invalidWireFormatData, | ||
| invalidPocketEnvelopeBz, | ||
| testCases := []struct { | ||
| name string | ||
| msgBz []byte | ||
| }{ | ||
| { | ||
| name: "invalid BackgroundMessage", | ||
| // NB: `msgBz` would normally be a serialized `BackgroundMessage`. | ||
| msgBz: mustMarshal(t, &invalidProtoMessage), | ||
| }, | ||
| { | ||
| name: "empty PocketEnvelope", | ||
| msgBz: mustMarshal(t, &typesP2P.BackgroundMessage{ | ||
| // NB: `Data` is normally a serialized `PocketEnvelope`. | ||
| Data: nil, | ||
| }), | ||
| }, | ||
| { | ||
| name: "invalid PoketEnvelope", | ||
| msgBz: mustMarshal(t, &typesP2P.BackgroundMessage{ | ||
| // NB: `Data` is normally a serialized `PocketEnvelope`. | ||
| Data: mustMarshal(t, &invalidProtoMessage), | ||
| }), | ||
| }, | ||
| } | ||
|
|
||
| receivedChan := make(chan struct{}) | ||
| // Set up test router as the receiver. | ||
| ctx := context.Background() | ||
| libp2pMockNet := mocknet.New() | ||
|
|
||
| receivedChan := make(chan []byte, 1) | ||
| receiverPrivKey, receiverPeer := newTestPeer(t) | ||
| receiverHost := newTestHost(t, libp2pMockNet, receiverPrivKey) | ||
| receiverRouter := newRouterWithSelfPeerAndHost(t, receiverPeer, receiverHost, func(data []byte) error { | ||
| receivedChan <- struct{}{} | ||
| return nil | ||
| }) | ||
| receiverRouter := newRouterWithSelfPeerAndHost( | ||
| t, receiverPeer, | ||
| receiverHost, | ||
| func(data []byte) error { | ||
| receivedChan <- data | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice change |
||
| return nil | ||
| }, | ||
| ) | ||
|
|
||
| t.Cleanup(func() { | ||
| err := receiverRouter.Close() | ||
| require.NoError(t, err) | ||
| }) | ||
|
|
||
| senderPrivKey, _ := newTestPeer(t) | ||
| senderHost := newTestHost(t, libp2pMockNet, senderPrivKey) | ||
| gossipPubsub, err := pubsub.NewGossipSub(ctx, senderHost) | ||
| // Wrap `receiverRouter#topicValidator` to make assertions by. | ||
| // Existing topic validator must be unregistered first. | ||
| err := receiverRouter.gossipSub.UnregisterTopicValidator(protocol.BackgroundTopicStr) | ||
| require.NoError(t, err) | ||
|
|
||
| err = libp2pMockNet.LinkAll() | ||
| require.NoError(t, err) | ||
| // Register topic validator wrapper. | ||
| err = receiverRouter.gossipSub.RegisterTopicValidator( | ||
| protocol.BackgroundTopicStr, | ||
| func(ctx context.Context, peerID libp2pPeer.ID, msg *pubsub.Message) bool { | ||
| msgIsValid := receiverRouter.topicValidator(ctx, peerID, msg) | ||
| require.Falsef(t, msgIsValid, "expected message to be invalid") | ||
|
|
||
| receiverAddrInfo, err := utils.Libp2pAddrInfoFromPeer(receiverPeer) | ||
| require.NoError(t, err) | ||
|
|
||
| err = senderHost.Connect(ctx, receiverAddrInfo) | ||
| require.NoError(t, err) | ||
|
|
||
| topic, err := gossipPubsub.Join(protocol.BackgroundTopicStr) | ||
| return msgIsValid | ||
| }, | ||
| ) | ||
| require.NoError(t, err) | ||
|
|
||
| for _, invalidMessageBz := range invalidMessages { | ||
| err = topic.Publish(ctx, invalidMessageBz) | ||
| require.NoError(t, err) | ||
| } | ||
|
|
||
| select { | ||
| case <-time.After(time.Second * 2): | ||
| // TECHDEBT: find a better way to prove that pre-propagation validation | ||
| // works as expected. Ideally, we should be able to distinguish which | ||
| // invalid message was received in the event of failure. | ||
| // | ||
| // CONSIDERATION: we could use the telemetry module mock to set expectations | ||
| // for validation failure telemetry calls, which would probably be useful in | ||
| // their own right. | ||
| case <-receivedChan: | ||
| t.Fatal("expected message to not be received") | ||
| for _, testCase := range testCases { | ||
| t.Run(testCase.name, func(t *testing.T) { | ||
| senderPrivKey, _ := newTestPeer(t) | ||
| senderHost := newTestHost(t, libp2pMockNet, senderPrivKey) | ||
| gossipPubsub, err := pubsub.NewGossipSub(ctx, senderHost) | ||
| require.NoError(t, err) | ||
|
|
||
| err = libp2pMockNet.LinkAll() | ||
| require.NoError(t, err) | ||
|
|
||
| receiverAddrInfo, err := utils.Libp2pAddrInfoFromPeer(receiverPeer) | ||
| require.NoError(t, err) | ||
|
|
||
| err = senderHost.Connect(ctx, receiverAddrInfo) | ||
| require.NoError(t, err) | ||
|
|
||
| topic, err := gossipPubsub.Join(protocol.BackgroundTopicStr) | ||
| require.NoError(t, err) | ||
|
|
||
| err = topic.Publish(ctx, testCase.msgBz) | ||
| require.NoError(t, err) | ||
|
|
||
| // Destroy previous topic and sender instances to start with new ones | ||
| // for each test case. | ||
| t.Cleanup(func() { | ||
| _ = topic.Close() | ||
| _ = senderHost.Close() | ||
| }) | ||
|
|
||
| // Ensure no messages were handled at the end of each test case for | ||
| // async errors. | ||
| select { | ||
| case <-receivedChan: | ||
| t.Fatal("no messages should have been handled by receiver router") | ||
| case <-time.After(invalidReceiveTimeout): | ||
| // no error, continue | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -234,9 +280,9 @@ func TestBackgroundRouter_Broadcast(t *testing.T) { | |
| expectedPeerIDs[i] = host.ID().String() | ||
| newRouterWithSelfPeerAndHost(t, peer, host, func(data []byte) error { | ||
| seenMessagesMutext.Lock() | ||
| broadcastWaitgroup.Done() | ||
| defer seenMessagesMutext.Unlock() | ||
| seenMessages[host.ID().String()] = struct{}{} | ||
| seenMessagesMutext.Unlock() | ||
| broadcastWaitgroup.Done() | ||
| return nil | ||
| }) | ||
| } | ||
|
|
@@ -246,7 +292,7 @@ func TestBackgroundRouter_Broadcast(t *testing.T) { | |
|
|
||
| // set up a test backgroundRouter | ||
| testRouterHost := newTestHost(t, libp2pMockNet, privKey) | ||
| testRouter := newRouterWithSelfPeerAndHost(t, selfPeer, testRouterHost, noopHandler) | ||
| testRouter := newRouterWithSelfPeerAndHost(t, selfPeer, testRouterHost, nil) | ||
| testHosts = append(testHosts, testRouterHost) | ||
|
|
||
| // simulate network links between each to every other | ||
|
|
@@ -331,7 +377,11 @@ func bootstrap(t *testing.T, ctx context.Context, testHosts []libp2pHost.Host) { | |
| } | ||
|
|
||
| // TECHDEBT(#609): move & de-duplicate | ||
| func newTestRouter(t *testing.T, libp2pMockNet mocknet.Mocknet, handler typesP2P.MessageHandler) *backgroundRouter { | ||
| func newTestRouter( | ||
| t *testing.T, | ||
| libp2pMockNet mocknet.Mocknet, | ||
| handler typesP2P.MessageHandler, | ||
| ) *backgroundRouter { | ||
| t.Helper() | ||
|
|
||
| privKey, selfPeer := newTestPeer(t) | ||
|
|
@@ -349,7 +399,12 @@ func newTestRouter(t *testing.T, libp2pMockNet mocknet.Mocknet, handler typesP2P | |
| return newRouterWithSelfPeerAndHost(t, selfPeer, host, handler) | ||
| } | ||
|
|
||
| func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host libp2pHost.Host, handler typesP2P.MessageHandler) *backgroundRouter { | ||
| func newRouterWithSelfPeerAndHost( | ||
| t *testing.T, | ||
| selfPeer typesP2P.Peer, | ||
| host libp2pHost.Host, | ||
| handler typesP2P.MessageHandler, | ||
| ) *backgroundRouter { | ||
| t.Helper() | ||
|
|
||
| ctrl := gomock.NewController(t) | ||
|
|
@@ -374,6 +429,10 @@ func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host lib | |
| err := pstore.AddPeer(selfPeer) | ||
| require.NoError(t, err) | ||
|
|
||
| if handler == nil { | ||
| handler = noopHandler | ||
| } | ||
|
|
||
| router, err := Create(busMock, &config.BackgroundConfig{ | ||
Olshansk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Addr: selfPeer.GetAddress(), | ||
| PeerstoreProvider: pstoreProviderMock, | ||
|
|
@@ -423,7 +482,11 @@ func newMockNetHostFromPeer( | |
| return host | ||
| } | ||
|
|
||
| func newTestHost(t *testing.T, mockNet mocknet.Mocknet, privKey cryptoPocket.PrivateKey) libp2pHost.Host { | ||
| func newTestHost( | ||
| t *testing.T, | ||
| mockNet mocknet.Mocknet, | ||
| privKey cryptoPocket.PrivateKey, | ||
| ) libp2pHost.Host { | ||
| t.Helper() | ||
|
|
||
| // listen on random port on loopback interface | ||
|
|
@@ -436,3 +499,12 @@ func newTestHost(t *testing.T, mockNet mocknet.Mocknet, privKey cryptoPocket.Pri | |
| // construct mock host | ||
| return newMockNetHostFromPeer(t, mockNet, privKey, peer) | ||
| } | ||
|
|
||
| func mustMarshal(t *testing.T, msg proto.Message) []byte { | ||
| t.Helper() | ||
|
|
||
| msgBz, err := proto.Marshal(msg) | ||
| require.NoError(t, err) | ||
|
|
||
| return msgBz | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate the comments 🙌