-
Notifications
You must be signed in to change notification settings - Fork 33
[P2P] refactor: message handling #763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
0c06d66
92ea049
593156c
2342035
3536afd
b942fed
a87e5cd
7f29e2b
dbd4965
47af1f2
ef21e79
77d91f1
1d0e1bf
33b5b58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,16 +3,11 @@ package p2p | |
| import ( | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "time" | ||
|
|
||
| "github.com/libp2p/go-libp2p" | ||
| libp2pHost "github.com/libp2p/go-libp2p/core/host" | ||
| libp2pNetwork "github.com/libp2p/go-libp2p/core/network" | ||
| "github.com/multiformats/go-multiaddr" | ||
| "github.com/pokt-network/pocket/logger" | ||
| "github.com/pokt-network/pocket/p2p/config" | ||
| "github.com/pokt-network/pocket/p2p/protocol" | ||
| "github.com/pokt-network/pocket/p2p/providers" | ||
| "github.com/pokt-network/pocket/p2p/providers/current_height_provider" | ||
| "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" | ||
|
|
@@ -31,12 +26,6 @@ import ( | |
| "google.golang.org/protobuf/types/known/anypb" | ||
| ) | ||
|
|
||
| // TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions. | ||
| // TECHDEBT(#629): parameterize and expose via config. | ||
| // readStreamTimeout is the duration to wait for a read operation on a | ||
| // stream to complete, after which the stream is closed ("timed out"). | ||
| const readStreamTimeout = time.Second * 10 | ||
|
|
||
| var _ modules.P2PModule = &p2pModule{} | ||
|
|
||
| type p2pModule struct { | ||
|
|
@@ -172,11 +161,6 @@ func (m *p2pModule) Start() (err error) { | |
| return fmt.Errorf("setting up router: %w", err) | ||
| } | ||
|
|
||
| // Don't handle incoming streams in client debug mode. | ||
| if !m.isClientDebugMode() { | ||
| m.host.SetStreamHandler(protocol.PoktProtocolID, m.handleStream) | ||
| } | ||
|
|
||
| m.GetBus(). | ||
| GetTelemetryModule(). | ||
| GetTimeSeriesAgent(). | ||
|
|
@@ -291,6 +275,7 @@ func (m *p2pModule) setupRouter() (err error) { | |
| CurrentHeightProvider: m.currentHeightProvider, | ||
| PeerstoreProvider: m.pstoreProvider, | ||
| Host: m.host, | ||
| Handler: m.handleAppData, | ||
| MaxNonces: m.cfg.MaxNonces, | ||
| }, | ||
| ) | ||
|
|
@@ -341,95 +326,11 @@ func (m *p2pModule) isClientDebugMode() bool { | |
| return m.GetBus().GetRuntimeMgr().GetConfig().ClientDebugMode | ||
| } | ||
|
|
||
| // handleStream is called each time a peer establishes a new stream with this | ||
| // module's libp2p `host.Host`. | ||
| func (m *p2pModule) handleStream(stream libp2pNetwork.Stream) { | ||
| m.logger.Debug().Msg("handling incoming stream") | ||
| peer, err := utils.PeerFromLibp2pStream(stream) | ||
| if err != nil { | ||
| m.logger.Error().Err(err). | ||
| Str("address", peer.GetAddress().String()). | ||
| Msg("parsing remote peer identity") | ||
|
|
||
| if err = stream.Reset(); err != nil { | ||
| m.logger.Error().Err(err).Msg("resetting stream") | ||
| } | ||
| return | ||
| } | ||
|
|
||
| if err := m.router.AddPeer(peer); err != nil { | ||
| m.logger.Error().Err(err). | ||
| Str("address", peer.GetAddress().String()). | ||
| Msg("adding remote peer to router") | ||
| } | ||
|
|
||
| go m.readStream(stream) | ||
| } | ||
|
|
||
| // readStream is intended to be called in a goroutine. It continuously reads from | ||
| // the given stream for handling at the network level. Used for handling "direct" | ||
| // messages (i.e. one specific target node). | ||
| func (m *p2pModule) readStream(stream libp2pNetwork.Stream) { | ||
| // Time out if no data is sent to free resources. | ||
| if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil { | ||
| // NB: tests using libp2p's `mocknet` rely on this not returning an error. | ||
| // `SetReadDeadline` not supported by `mocknet` streams. | ||
| m.logger.Debug().Err(err).Msg("setting stream read deadline") | ||
| } | ||
|
|
||
| // debug logging: stream scope stats | ||
| // (see: https://pkg.go.dev/github.com/libp2p/[email protected]/core/network#StreamScope) | ||
| if err := utils.LogScopeStatFactory( | ||
| &logger.Global.Logger, | ||
| "stream scope (read-side)", | ||
| )(stream.Scope()); err != nil { | ||
| m.logger.Debug().Err(err).Msg("logging stream scope stats") | ||
| } | ||
| // --- | ||
|
|
||
| data, err := io.ReadAll(stream) | ||
| if err != nil { | ||
| m.logger.Error().Err(err).Msg("reading from stream") | ||
| if err := stream.Reset(); err != nil { | ||
| m.logger.Debug().Err(err).Msg("resetting stream (read-side)") | ||
| } | ||
| return | ||
| } | ||
|
|
||
| if err := stream.Reset(); err != nil { | ||
| m.logger.Debug().Err(err).Msg("resetting stream (read-side)") | ||
| } | ||
|
|
||
| // debug logging | ||
| remotePeer, err := utils.PeerFromLibp2pStream(stream) | ||
| if err != nil { | ||
| m.logger.Debug().Err(err).Msg("getting remote remotePeer") | ||
| } else { | ||
| utils.LogIncomingMsg(m.logger, m.cfg.Hostname, remotePeer) | ||
| } | ||
| // --- | ||
|
|
||
| if err := m.handleNetworkData(data); err != nil { | ||
| m.logger.Error().Err(err).Msg("handling network data") | ||
| } | ||
| } | ||
|
|
||
| // handleNetworkData passes a network message to the configured | ||
| // `Router`implementation for routing. | ||
| func (m *p2pModule) handleNetworkData(data []byte) error { | ||
| appMsgData, err := m.router.HandleNetworkData(data) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // There was no error, but we don't need to forward this to the app-specific bus. | ||
| // For example, the message has already been handled by the application. | ||
| if appMsgData == nil { | ||
| return nil | ||
| } | ||
|
|
||
| // handleAppData deserializes the received `PocketEnvelope` data and publishes | ||
| // a copy of its `Content` to the application event bus. | ||
| func (m *p2pModule) handleAppData(data []byte) error { | ||
bryanchriswhite marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| networkMessage := messaging.PocketEnvelope{} | ||
bryanchriswhite marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if err := proto.Unmarshal(appMsgData, &networkMessage); err != nil { | ||
| if err := proto.Unmarshal(data, &networkMessage); err != nil { | ||
| return fmt.Errorf("decoding network message: %w", err) | ||
| } | ||
|
|
||
|
|
@@ -449,9 +350,3 @@ func (m *p2pModule) getMultiaddr() (multiaddr.Multiaddr, error) { | |
| "%s:%d", m.cfg.Hostname, m.cfg.Port, | ||
| )) | ||
| } | ||
|
|
||
| // newReadStreamDeadline returns a future deadline | ||
| // based on the read stream timeout duration. | ||
| func newReadStreamDeadline() time.Time { | ||
| return time.Now().Add(readStreamTimeout) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| //go:build test | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add a linter for this? Dima recently added a custom one in Can be done in a separate commit, not a blocker.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's only necessary to add the test build tag to:
Are you suggesting a linter which enforces the I don't suppose there would really be a prohibitive downside to adding the test tag in places where it's not strictly necessary. I can imagine an argument for either but am leaning towards "ubiquitous build tags" at the moment (unexpectedly):
|
||
|
|
||
| package p2p | ||
|
|
||
| import ( | ||
|
|
@@ -13,10 +15,12 @@ import ( | |
|
|
||
| libp2pNetwork "github.com/libp2p/go-libp2p/core/network" | ||
| mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" | ||
| "github.com/pokt-network/pocket/internal/testutil" | ||
| "github.com/pokt-network/pocket/p2p/protocol" | ||
| "github.com/stretchr/testify/require" | ||
| "google.golang.org/protobuf/types/known/anypb" | ||
|
|
||
| "github.com/pokt-network/pocket/internal/testutil" | ||
| "github.com/pokt-network/pocket/p2p/protocol" | ||
| "github.com/pokt-network/pocket/p2p/raintree" | ||
| ) | ||
|
|
||
| // TODO(#314): Add the tooling and instructions on how to generate unit tests in this file. | ||
|
|
@@ -272,7 +276,7 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te | |
| mod := *p2pMod | ||
| p2pMod.host.SetStreamHandler(protocol.PoktProtocolID, func(stream libp2pNetwork.Stream) { | ||
| log.Printf("[valID: %s] Read\n", sURL) | ||
| (&mod).handleStream(stream) | ||
| (&mod).router.(*raintree.RainTreeRouter).HandleStream(stream) | ||
| wg.Done() | ||
| }) | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code patch provided seems reasonable, but I have a few minor suggestions for improvement. Please consider the following:
Overall, the code patch is generally fine, but addressing these points would lead to better quality code review. |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| package raintree | ||
|
|
||
| import ( | ||
| libp2pNetwork "github.com/libp2p/go-libp2p/core/network" | ||
|
|
||
| "github.com/pokt-network/pocket/logger" | ||
| "github.com/pokt-network/pocket/p2p/utils" | ||
| ) | ||
|
|
||
| // logStream logs the incoming stream and its scope stats | ||
| func (rtr *rainTreeRouter) logStream(stream libp2pNetwork.Stream) { | ||
| rtr.logStreamScopeStats(stream) | ||
|
|
||
| remotePeer, err := utils.PeerFromLibp2pStream(stream) | ||
| if err != nil { | ||
| rtr.logger.Debug().Err(err).Msg("getting remote remotePeer") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you go with Ditto below
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (see below) In this case, my thinking was that this error only happens in the context of this logging helper function which is not a critical function. I didn't imagine that this logging helper producing an error would be useful to the end user (i.e. not actionable nor a useful signal). Perhaps I assume too much. |
||
| } else { | ||
| utils.LogIncomingMsg(rtr.logger, rtr.getHostname(), remotePeer) | ||
| } | ||
| } | ||
|
|
||
| // logStreamScopeStats logs the incoming stream's scope stats | ||
| // (see: https://pkg.go.dev/github.com/libp2p/[email protected]/core/network#StreamScope) | ||
| func (rtr *rainTreeRouter) logStreamScopeStats(stream libp2pNetwork.Stream) { | ||
| if err := utils.LogScopeStatFactory( | ||
| &logger.Global.Logger, | ||
| "stream scope (read-side)", | ||
| )(stream.Scope()); err != nil { | ||
| rtr.logger.Debug().Err(err).Msg("logging stream scope stats") | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.