diff --git a/.gitignore b/.gitignore index 809a11ff6..4ef97ebda 100644 --- a/.gitignore +++ b/.gitignore @@ -31,9 +31,14 @@ prototype/vendor/ # Generated Mocks (for testing) shared/modules/mocks/* !shared/modules/mocks/mocks.go + p2p/types/mocks/* !p2p/types/mocks/mocks.go +persistence/types/mocks/* +!persistence/types/mocks/mocks.go + + # TODO(team): Does the `types` directory contain generated or raw type files? # types/ diff --git a/Makefile b/Makefile index d6e045d20..28cb8b955 100644 --- a/Makefile +++ b/Makefile @@ -222,13 +222,15 @@ mockgen: clean_mocks ## Use `mockgen` to generate mocks used for testing purpose $(eval modules_dir = "shared/modules") go generate ./${modules_dir} echo "Mocks generated in ${modules_dir}/mocks" - - $(eval p2p_dir = "p2p") - $(eval p2p_type_mocks_dir = "p2p/types/mocks") - find ${p2p_type_mocks_dir} -type f ! -name "mocks.go" -exec rm {} \; - go generate ./${p2p_dir}/... - echo "P2P mocks generated in ${p2p_type_mocks_dir}" - + + $(eval DIRS = p2p persistence) + for dir in $(DIRS); do \ + echo "Processing $$dir mocks..."; \ + find $$dir/types/mocks -type f ! -name "mocks.go" -exec rm {} \;; \ + go generate ./${dir_name}/...; \ + echo "$$dir mocks generated in $$dir/types/mocks"; \ + done + # TODO(team): Tested locally with `protoc` version `libprotoc 3.19.4`. In the near future, only the Dockerfiles will be used to compile protos. .PHONY: protogen_show diff --git a/app/client/cli/debug.go b/app/client/cli/debug.go index 33aee2fc3..3f672ab1a 100644 --- a/app/client/cli/debug.go +++ b/app/client/cli/debug.go @@ -19,11 +19,15 @@ import ( // TECHDEBT: Lowercase variables / constants that do not need to be exported. const ( - PromptResetToGenesis string = "ResetToGenesis" - PromptPrintNodeState string = "PrintNodeState" - PromptTriggerNextView string = "TriggerNextView" - PromptTogglePacemakerMode string = "TogglePacemakerMode" + PromptResetToGenesis string = "ResetToGenesis" + PromptPrintNodeState string = "PrintNodeState" + PromptTriggerNextView string = "TriggerNextView" + PromptTogglePacemakerMode string = "TogglePacemakerMode" + PromptShowLatestBlockInStore string = "ShowLatestBlockInStore" + + PromptSendMetadataRequest string = "MetadataRequest" + PromptSendBlockRequest string = "BlockRequest" ) var ( @@ -36,6 +40,8 @@ var ( PromptTriggerNextView, PromptTogglePacemakerMode, PromptShowLatestBlockInStore, + PromptSendMetadataRequest, + PromptSendBlockRequest, } // validators holds the list of the validators at genesis time so that we can use it to create a debug address book provider. @@ -159,6 +165,18 @@ func handleSelect(selection string) { Message: nil, } sendDebugMessage(m) + case PromptSendMetadataRequest: + m := &messaging.DebugMessage{ + Action: messaging.DebugMessageAction_DEBUG_CONSENSUS_SEND_METADATA_REQ, + Message: nil, + } + broadcastDebugMessage(m) + case PromptSendBlockRequest: + m := &messaging.DebugMessage{ + Action: messaging.DebugMessageAction_DEBUG_CONSENSUS_SEND_BLOCK_REQ, + Message: nil, + } + broadcastDebugMessage(m) default: logger.Global.Error().Msg("Selection not yet implemented...") } diff --git a/app/client/doc/CHANGELOG.md b/app/client/doc/CHANGELOG.md index 303142f66..5cd888ca2 100644 --- a/app/client/doc/CHANGELOG.md +++ b/app/client/doc/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.0.0.11] - 2023-02-09 + +- Added debugging prompts to drive state sync requests +- `SendMetadataRequest` to send metadata request by all nodes to all nodes +- `SendBlockRequest` to send get block request by all nodes to all nodes + ## [0.0.0.10] - 2023-02-07 - Added GITHUB_WIKI tags where it was missing diff --git a/app/pocket/main.go b/app/pocket/main.go index 907b119a3..2ae49d495 100644 --- a/app/pocket/main.go +++ b/app/pocket/main.go @@ -31,6 +31,7 @@ func main() { if err != nil { logger.Global.Fatal().Err(err).Msg("Failed to create pocket node") } + pocketNode.GetBus().GetConsensusModule().EnableServerMode() if err = pocketNode.Start(); err != nil { logger.Global.Fatal().Err(err).Msg("Failed to start pocket node") diff --git a/build/config/config1.json b/build/config/config1.json index 85d324772..a4c548cce 100644 --- a/build/config/config1.json +++ b/build/config/config1.json @@ -8,7 +8,8 @@ "manual": true, "debug_time_between_steps_msec": 1000 }, - "private_key": "0ca1a40ddecdab4f5b04fa0bfed1d235beaa2b8082e7554425607516f0862075dfe357de55649e6d2ce889acf15eb77e94ab3c5756fe46d3c7538d37f27f115e" + "private_key": "0ca1a40ddecdab4f5b04fa0bfed1d235beaa2b8082e7554425607516f0862075dfe357de55649e6d2ce889acf15eb77e94ab3c5756fe46d3c7538d37f27f115e", + "server_mode_enabled": true }, "utility": { "max_mempool_transaction_bytes": 1073741824, diff --git a/build/config/config2.json b/build/config/config2.json index 00d59e1d7..0483e0a99 100644 --- a/build/config/config2.json +++ b/build/config/config2.json @@ -8,7 +8,8 @@ "manual": true, "debug_time_between_steps_msec": 1000 }, - "private_key": "ba81e6e56d293895b299bc495ae75d490644429a5e0028fabeb5e1871c1098e7eb2c78364525a210d994a83e02d18b4287ab81f6670cf4510ab6c9f51e296d91" + "private_key": "ba81e6e56d293895b299bc495ae75d490644429a5e0028fabeb5e1871c1098e7eb2c78364525a210d994a83e02d18b4287ab81f6670cf4510ab6c9f51e296d91", + "server_mode_enabled": true }, "utility": { "max_mempool_transaction_bytes": 1073741824, diff --git a/build/config/config3.json b/build/config/config3.json index cae162751..500ed7145 100644 --- a/build/config/config3.json +++ b/build/config/config3.json @@ -8,7 +8,8 @@ "manual": true, "debug_time_between_steps_msec": 1000 }, - "private_key": "25b385b367a827eaafcdb1003bd17a25f2ecc0d10d41f138846f52ae1015aa941041a9c76539791fef9bee5b4fcd5bf4a1a489e0790c44cbdfa776b901e13b50" + "private_key": "25b385b367a827eaafcdb1003bd17a25f2ecc0d10d41f138846f52ae1015aa941041a9c76539791fef9bee5b4fcd5bf4a1a489e0790c44cbdfa776b901e13b50", + "server_mode_enabled": true }, "utility": { "max_mempool_transaction_bytes": 1073741824, diff --git a/build/config/config4.json b/build/config/config4.json index f06e0203f..a5a56843a 100644 --- a/build/config/config4.json +++ b/build/config/config4.json @@ -8,7 +8,8 @@ "manual": true, "debug_time_between_steps_msec": 1000 }, - "private_key": "4ff3292ff14213149446f8208942b35439cb4b2c5e819f41fb612e880b5614bdd6cea8706f6ee6672c1e013e667ec8c46231e0e7abcf97ba35d89fceb8edae45" + "private_key": "4ff3292ff14213149446f8208942b35439cb4b2c5e819f41fb612e880b5614bdd6cea8706f6ee6672c1e013e667ec8c46231e0e7abcf97ba35d89fceb8edae45", + "server_mode_enabled": true }, "utility": { "max_mempool_transaction_bytes": 1073741824, diff --git a/build/docs/CHANGELOG.md b/build/docs/CHANGELOG.md index 9c8ca33c3..1b790e6de 100644 --- a/build/docs/CHANGELOG.md +++ b/build/docs/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.0.0.14] - 2023-02-09 + +- Updated all `config*.json` files with new `server_mode_enabled` field (for state sync) + ## [0.0.0.13] - 2023-02-08 - Fix bug related to installing Tilt in the Docker containers diff --git a/consensus/debugging.go b/consensus/debugging.go index 3f7f15aaf..e6e5eec08 100644 --- a/consensus/debugging.go +++ b/consensus/debugging.go @@ -2,6 +2,7 @@ package consensus import ( typesCons "github.com/pokt-network/pocket/consensus/types" + cryptoPocket "github.com/pokt-network/pocket/shared/crypto" "github.com/pokt-network/pocket/shared/messaging" ) @@ -20,6 +21,10 @@ func (m *consensusModule) HandleDebugMessage(debugMessage *messaging.DebugMessag m.triggerNextView(debugMessage) case messaging.DebugMessageAction_DEBUG_CONSENSUS_TOGGLE_PACE_MAKER_MODE: m.togglePacemakerManualMode(debugMessage) + case messaging.DebugMessageAction_DEBUG_CONSENSUS_SEND_BLOCK_REQ: + m.sendGetBlockStateSyncMessage(debugMessage) + case messaging.DebugMessageAction_DEBUG_CONSENSUS_SEND_METADATA_REQ: + m.sendGetMetadataStateSyncMessage(debugMessage) default: m.logger.Debug().Msgf("Debug message: %s", debugMessage.Message) } @@ -95,3 +100,59 @@ func (m *consensusModule) togglePacemakerManualMode(_ *messaging.DebugMessage) { } m.paceMaker.SetManualMode(newMode) } + +// requests current block from all validators +func (m *consensusModule) sendGetBlockStateSyncMessage(_ *messaging.DebugMessage) { + currentHeight := m.CurrentHeight() + requestHeight := currentHeight - 1 + peerAddress := m.GetNodeAddress() + + stateSyncGetBlockMessage := &typesCons.StateSyncMessage{ + Message: &typesCons.StateSyncMessage_GetBlockReq{ + GetBlockReq: &typesCons.GetBlockRequest{ + PeerAddress: peerAddress, + Height: requestHeight, + }, + }, + } + + validators, err := m.getValidatorsAtHeight(currentHeight) + if err != nil { + m.logger.Debug().Msgf(typesCons.ErrPersistenceGetAllValidators.Error(), err) + } + + for _, val := range validators { + valAddress := cryptoPocket.AddressFromString(val.GetAddress()) + if err := m.stateSync.SendStateSyncMessage(stateSyncGetBlockMessage, valAddress, requestHeight); err != nil { + m.logger.Debug().Msgf(typesCons.SendingStateSyncMessage(valAddress, requestHeight), err) + } + } +} + +// requests metadata from all validators +func (m *consensusModule) sendGetMetadataStateSyncMessage(_ *messaging.DebugMessage) { + currentHeight := m.CurrentHeight() + requestHeight := currentHeight - 1 + peerAddress := m.GetNodeAddress() + + stateSyncMetaDataReqMessage := &typesCons.StateSyncMessage{ + Message: &typesCons.StateSyncMessage_MetadataReq{ + MetadataReq: &typesCons.StateSyncMetadataRequest{ + PeerAddress: peerAddress, + }, + }, + } + + validators, err := m.getValidatorsAtHeight(currentHeight) + if err != nil { + m.logger.Debug().Msgf(typesCons.ErrPersistenceGetAllValidators.Error(), err) + } + + for _, val := range validators { + valAddress := cryptoPocket.AddressFromString(val.GetAddress()) + if err := m.stateSync.SendStateSyncMessage(stateSyncMetaDataReqMessage, valAddress, requestHeight); err != nil { + m.logger.Debug().Msgf(typesCons.SendingStateSyncMessage(valAddress, requestHeight), err) + } + } + +} diff --git a/consensus/doc/CHANGELOG.md b/consensus/doc/CHANGELOG.md index d990e6a7f..332b876b5 100644 --- a/consensus/doc/CHANGELOG.md +++ b/consensus/doc/CHANGELOG.md @@ -7,8 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -## [0.0.0.26] - 2023-02-06 +## [0.0.0.27] - 2023-02-09 + +- Add `state_sync` submodule, with `state_sync` struct +- Implement state sync server to advertise blocks and metadata +- Create new `state_sync_handler.go` source file that handles `StateSyncMessage`s sent to the `Consensus` module +- Add two new tests in `state_sync_test.go`:`TestStateSyncServerGetMetaDataReq` and `TestStateSyncServerGetBlock` +- Update `TestHotstuff4Nodes1BlockHappyPath` test to also retrieve the committed block +## [0.0.0.26] - 2023-02-06 - Address legacy linter errors from `golangci-lint` ## [0.0.0.25] - 2023-02-04 diff --git a/consensus/e2e_tests/hotstuff_test.go b/consensus/e2e_tests/hotstuff_test.go index a0ff24604..3077223ec 100644 --- a/consensus/e2e_tests/hotstuff_test.go +++ b/consensus/e2e_tests/hotstuff_test.go @@ -8,8 +8,10 @@ import ( "github.com/benbjohnson/clock" "github.com/pokt-network/pocket/consensus" typesCons "github.com/pokt-network/pocket/consensus/types" + "github.com/pokt-network/pocket/shared/codec" "github.com/pokt-network/pocket/shared/modules" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/anypb" ) func TestHotstuff4Nodes1BlockHappyPath(t *testing.T) { @@ -188,6 +190,48 @@ func TestHotstuff4Nodes1BlockHappyPath(t *testing.T) { nodeState) require.Equal(t, typesCons.NodeId(0), nodeState.LeaderId, "Leader should be empty") } + + // Test state synchronisation's get block functionality + // At this stage, first round is finished, get block request for block height 1 must return non-nill block + serverNode := pocketNodes[1] + + // We choose node 2 as the requester node. + requesterNode := pocketNodes[2] + requesterNodePeerAddress := requesterNode.GetBus().GetConsensusModule().GetNodeAddress() + + stateSyncGetBlockReq := typesCons.GetBlockRequest{ + PeerAddress: requesterNodePeerAddress, + Height: 1, + } + + stateSyncGetBlockMessage := &typesCons.StateSyncMessage{ + Message: &typesCons.StateSyncMessage_GetBlockReq{ + GetBlockReq: &stateSyncGetBlockReq, + }, + } + + anyProto, err := anypb.New(stateSyncGetBlockMessage) + require.NoError(t, err) + + // Send get block request to the server node + P2PSend(t, serverNode, anyProto) + + // Start waiting for the get block request on server node, + numExpectedMsgs := 1 + errMsg := "StateSync Get Block Request Message" + receivedMsg, err := WaitForNetworkStateSyncEvents(t, clockMock, eventsChannel, errMsg, numExpectedMsgs, 250, false) + require.NoError(t, err) + + msg, err := codec.GetCodec().FromAny(receivedMsg[0]) + require.NoError(t, err) + + stateSyncGetBlockResMessage, ok := msg.(*typesCons.StateSyncMessage) + require.True(t, ok) + + getBlockRes := stateSyncGetBlockResMessage.GetGetBlockRes() + require.NotEmpty(t, getBlockRes) + + require.Equal(t, uint64(1), getBlockRes.Block.GetBlockHeader().Height) } // TODO: Implement these tests and use them as a starting point for new ones. Consider using ChatGPT to help you out :) diff --git a/consensus/e2e_tests/state_sync_test.go b/consensus/e2e_tests/state_sync_test.go new file mode 100644 index 000000000..fb9af2d68 --- /dev/null +++ b/consensus/e2e_tests/state_sync_test.go @@ -0,0 +1,181 @@ +package e2e_tests + +import ( + "reflect" + "testing" + "time" + + "github.com/benbjohnson/clock" + typesCons "github.com/pokt-network/pocket/consensus/types" + "github.com/pokt-network/pocket/shared/codec" + "github.com/pokt-network/pocket/shared/modules" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/anypb" +) + +func TestStateSync_ServerGetMetaDataReq_Success(t *testing.T) { + // Test preparation + clockMock := clock.NewMock() + timeReminder(t, clockMock, time.Second) + + runtimeMgrs := GenerateNodeRuntimeMgrs(t, numValidators, clockMock) + buses := GenerateBuses(t, runtimeMgrs) + + // Create & start test pocket nodes + eventsChannel := make(modules.EventsChannel, 100) + pocketNodes := CreateTestConsensusPocketNodes(t, buses, eventsChannel) + StartAllTestPocketNodes(t, pocketNodes) + + testHeight := uint64(4) + + // Choose node 1 as the server node + // Set server node's height to test height. + serverNode := pocketNodes[1] + serverNodePeerId := serverNode.GetBus().GetConsensusModule().GetNodeAddress() + serverNodeConsensusModImpl := GetConsensusModImpl(serverNode) + serverNodeConsensusModImpl.MethodByName("SetHeight").Call([]reflect.Value{reflect.ValueOf(testHeight)}) + + // We choose node 2 as the requester node. + requesterNode := pocketNodes[2] + requesterNodePeerAddress := requesterNode.GetBus().GetConsensusModule().GetNodeAddress() + + // Test MetaData Req + stateSyncMetaDataReqMessage := &typesCons.StateSyncMessage{ + Message: &typesCons.StateSyncMessage_MetadataReq{ + MetadataReq: &typesCons.StateSyncMetadataRequest{ + PeerAddress: requesterNodePeerAddress, + }, + }, + } + anyProto, err := anypb.New(stateSyncMetaDataReqMessage) + require.NoError(t, err) + + // Send metadata request to the server node + P2PSend(t, serverNode, anyProto) + + // Start waiting for the metadata request on server node, + errMsg := "StateSync Metadata Request" + receivedMsg, err := WaitForNetworkStateSyncEvents(t, clockMock, eventsChannel, errMsg, 1, 250, false) + require.NoError(t, err) + + msg, err := codec.GetCodec().FromAny(receivedMsg[0]) + require.NoError(t, err) + + stateSyncMetaDataResMessage, ok := msg.(*typesCons.StateSyncMessage) + require.True(t, ok) + + metaDataRes := stateSyncMetaDataResMessage.GetMetadataRes() + require.NotEmpty(t, metaDataRes) + + require.Equal(t, uint64(4), metaDataRes.MaxHeight) + require.Equal(t, uint64(1), metaDataRes.MinHeight) + require.Equal(t, serverNodePeerId, metaDataRes.PeerAddress) +} + +func TestStateSync_ServerGetBlock_Success(t *testing.T) { + // Test preparation + clockMock := clock.NewMock() + timeReminder(t, clockMock, time.Second) + + // Test configs + runtimeMgrs := GenerateNodeRuntimeMgrs(t, numValidators, clockMock) + buses := GenerateBuses(t, runtimeMgrs) + + // Create & start test pocket nodes + eventsChannel := make(modules.EventsChannel, 100) + pocketNodes := CreateTestConsensusPocketNodes(t, buses, eventsChannel) + StartAllTestPocketNodes(t, pocketNodes) + + testHeight := uint64(5) + + serverNode := pocketNodes[1] + serverNodeConsensusModImpl := GetConsensusModImpl(serverNode) + serverNodeConsensusModImpl.MethodByName("SetHeight").Call([]reflect.Value{reflect.ValueOf(testHeight)}) + + // Choose node 2 as the requester node + requesterNode := pocketNodes[2] + requesterNodePeerAddress := requesterNode.GetBus().GetConsensusModule().GetNodeAddress() + + // Passing Test + // Test GetBlock Req + stateSyncGetBlockMessage := &typesCons.StateSyncMessage{ + Message: &typesCons.StateSyncMessage_GetBlockReq{ + GetBlockReq: &typesCons.GetBlockRequest{ + PeerAddress: requesterNodePeerAddress, + Height: 1, + }, + }, + } + + anyProto, err := anypb.New(stateSyncGetBlockMessage) + require.NoError(t, err) + + // Send get block request to the server node + P2PSend(t, serverNode, anyProto) + + // Start waiting for the get block request on server node, expect to return error + errMsg := "StateSync Get Block Request Message" + numExpectedMsgs := 1 + receivedMsg, err := WaitForNetworkStateSyncEvents(t, clockMock, eventsChannel, errMsg, numExpectedMsgs, 250, false) + require.NoError(t, err) + + msg, err := codec.GetCodec().FromAny(receivedMsg[0]) + require.NoError(t, err) + + stateSyncGetBlockResMessage, ok := msg.(*typesCons.StateSyncMessage) + require.True(t, ok) + + getBlockRes := stateSyncGetBlockResMessage.GetGetBlockRes() + require.NotEmpty(t, getBlockRes) + + require.Equal(t, uint64(1), getBlockRes.Block.GetBlockHeader().Height) +} + +func TestStateSync_ServerGetBlock_FailNonExistingBlock(t *testing.T) { + // Test preparation + clockMock := clock.NewMock() + timeReminder(t, clockMock, time.Second) + + // Test configs + runtimeMgrs := GenerateNodeRuntimeMgrs(t, numValidators, clockMock) + buses := GenerateBuses(t, runtimeMgrs) + + // Create & start test pocket nodes + eventsChannel := make(modules.EventsChannel, 100) + pocketNodes := CreateTestConsensusPocketNodes(t, buses, eventsChannel) + StartAllTestPocketNodes(t, pocketNodes) + + testHeight := uint64(5) + + serverNode := pocketNodes[1] + serverNodeConsensusModImpl := GetConsensusModImpl(serverNode) + serverNodeConsensusModImpl.MethodByName("SetHeight").Call([]reflect.Value{reflect.ValueOf(testHeight)}) + + // Choose node 2 as the requester node + requesterNode := pocketNodes[2] + requesterNodePeerAddress := requesterNode.GetBus().GetConsensusModule().GetNodeAddress() + + // Failing Test + // Get Block Req is current block height + 1 + requestHeight := testHeight + 1 + stateSyncGetBlockMessage := &typesCons.StateSyncMessage{ + Message: &typesCons.StateSyncMessage_GetBlockReq{ + GetBlockReq: &typesCons.GetBlockRequest{ + PeerAddress: requesterNodePeerAddress, + Height: requestHeight, + }, + }, + } + + anyProto, err := anypb.New(stateSyncGetBlockMessage) + require.NoError(t, err) + + // Send get block request to the server node + P2PSend(t, serverNode, anyProto) + + numExpectedMsgs := 1 + // Start waiting for the get block request on server node, expect to return error + errMsg := "StateSync Get Block Request Message" + _, err = WaitForNetworkStateSyncEvents(t, clockMock, eventsChannel, errMsg, numExpectedMsgs, 250, false) + require.Error(t, err) +} diff --git a/consensus/e2e_tests/utils_test.go b/consensus/e2e_tests/utils_test.go index e39ea028b..e47f8f299 100644 --- a/consensus/e2e_tests/utils_test.go +++ b/consensus/e2e_tests/utils_test.go @@ -13,12 +13,15 @@ import ( "github.com/golang/mock/gomock" "github.com/pokt-network/pocket/consensus" typesCons "github.com/pokt-network/pocket/consensus/types" + mocksPer "github.com/pokt-network/pocket/persistence/types/mocks" "github.com/pokt-network/pocket/runtime" "github.com/pokt-network/pocket/runtime/configs" "github.com/pokt-network/pocket/runtime/genesis" "github.com/pokt-network/pocket/runtime/test_artifacts" "github.com/pokt-network/pocket/shared" "github.com/pokt-network/pocket/shared/codec" + "github.com/pokt-network/pocket/shared/converters" + coreTypes "github.com/pokt-network/pocket/shared/core/types" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" "github.com/pokt-network/pocket/shared/messaging" "github.com/pokt-network/pocket/shared/modules" @@ -58,6 +61,7 @@ func GenerateNodeRuntimeMgrs(_ *testing.T, validatorCount int, clockMgr clock.Cl Manual: false, DebugTimeBetweenStepsMsec: 0, }, + ServerModeEnabled: true, } runtimeMgrs[i] = runtime.NewManager(config, genesisState, runtime.WithClock(clockMgr)) } @@ -205,11 +209,12 @@ func P2PSend(_ *testing.T, node *shared.Node, any *anypb.Any) { // This is a helper for `waitForEventsInternal` that creates the `includeFilter` function based on // consensus specific parameters. // failOnExtraMessages: -// This flag is useful when running the consensus unit tests. It causes the test to wait up to the -// maximum delay specified in the source code and errors if additional unexpected messages are received. -// For example, if the test expects to receive 5 messages within 2 seconds: -// false: continue if 5 messages are received in 0.5 seconds -// true: wait for another 1.5 seconds after 5 messages are received in 0.5 seconds, and fail if any additional messages are received. +// +// This flag is useful when running the consensus unit tests. It causes the test to wait up to the +// maximum delay specified in the source code and errors if additional unexpected messages are received. +// For example, if the test expects to receive 5 messages within 2 seconds: +// false: continue if 5 messages are received in 0.5 seconds +// true: wait for another 1.5 seconds after 5 messages are received in 0.5 seconds, and fail if any additional messages are received. func WaitForNetworkConsensusEvents( t *testing.T, clck *clock.Mock, @@ -234,6 +239,30 @@ func WaitForNetworkConsensusEvents( return waitForEventsInternal(clck, eventsChannel, consensus.HotstuffMessageContentType, numExpectedMsgs, millis, includeFilter, errMsg, failOnExtraMessages) } +// IMPROVE: Consider unifying this function with WaitForNetworkConsensusEvents +// This is a helper for 'waitForEventsInternal' that creates the `includeFilter` function based on state sync message specific parameters. +func WaitForNetworkStateSyncEvents( + t *testing.T, + clck *clock.Mock, + eventsChannel modules.EventsChannel, + errMsg string, + numExpectedMsgs int, + millis time.Duration, + failOnExtraMessages bool, +) (messages []*anypb.Any, err error) { + includeFilter := func(anyMsg *anypb.Any) bool { + msg, err := codec.GetCodec().FromAny(anyMsg) + require.NoError(t, err) + + _, ok := msg.(*typesCons.StateSyncMessage) + require.True(t, ok) + + return true + } + + return waitForEventsInternal(clck, eventsChannel, consensus.StateSyncMessageContentType, numExpectedMsgs, millis, includeFilter, errMsg, failOnExtraMessages) +} + // RESEARCH(#462): Research ways to eliminate time-based non-determinism from the test framework // IMPROVE: This function can be extended to testing events outside of just the consensus module. func waitForEventsInternal( @@ -241,7 +270,7 @@ func waitForEventsInternal( eventsChannel modules.EventsChannel, eventContentType string, numExpectedMsgs int, - maxWaitTimeMillis time.Duration, // IMPROVE(#295): Remove time specific suffixes as outlined by go-staticcheck (ST1011) + maxWaitTime time.Duration, msgIncludeFilter func(m *anypb.Any) bool, errMsg string, failOnExtraMessages bool, @@ -250,7 +279,7 @@ func waitForEventsInternal( unusedEvents := make([]*messaging.PocketEnvelope, 0) // "Recycle" events back into the events channel if we're not using them // Limit the amount of time we're waiting for the messages to be published on the events channel - ctx, cancel := clck.WithTimeout(context.TODO(), time.Millisecond*maxWaitTimeMillis) + ctx, cancel := clck.WithTimeout(context.TODO(), time.Millisecond*maxWaitTime) defer cancel() // Since the tests use a mock clock, we need to manually advance the clock to trigger the timeout @@ -328,13 +357,43 @@ func basePersistenceMock(t *testing.T, _ modules.EventsChannel, bus modules.Bus) persistenceMock.EXPECT().Start().Return(nil).AnyTimes() persistenceMock.EXPECT().SetBus(gomock.Any()).Return().AnyTimes() persistenceMock.EXPECT().NewReadContext(gomock.Any()).Return(persistenceReadContextMock, nil).AnyTimes() + persistenceMock.EXPECT().ReleaseWriteContext().Return(nil).AnyTimes() + blockStoreMock := mocksPer.NewMockKVStore(ctrl) + + blockStoreMock.EXPECT().Get(gomock.Any()).DoAndReturn(func(height []byte) ([]byte, error) { + heightInt := converters.HeightFromBytes(height) + if bus.GetConsensusModule().CurrentHeight() < heightInt { + return nil, fmt.Errorf("requested height is higher than current height of the node's consensus module") + } + blockWithHeight := &coreTypes.Block{ + BlockHeader: &coreTypes.BlockHeader{ + Height: converters.HeightFromBytes(height), + }, + } + return codec.GetCodec().Marshal(blockWithHeight) + }).AnyTimes() + + persistenceMock.EXPECT().GetBlockStore().Return(blockStoreMock).AnyTimes() + + persistenceReadContextMock.EXPECT().GetMaximumBlockHeight().DoAndReturn(func() (uint64, error) { + height := bus.GetConsensusModule().CurrentHeight() + return height, nil + }).AnyTimes() + + persistenceReadContextMock.EXPECT().GetMinimumBlockHeight().DoAndReturn(func() (uint64, error) { + // mock minimum block height in persistence module to 1 if current height is equal or more than 1, else return 0 as the minimum height + if bus.GetConsensusModule().CurrentHeight() >= 1 { + return 1, nil + } + return 0, nil + }).AnyTimes() + // The persistence context should usually be accessed via the utility module within the context // of the consensus module. This one is only used when loading the initial consensus module // state; hence the `-1` expectation in the call above. persistenceContextMock.EXPECT().Close().Return(nil).AnyTimes() - persistenceReadContextMock.EXPECT().GetLatestBlockHeight().Return(uint64(0), nil).AnyTimes() persistenceReadContextMock.EXPECT().GetAllValidators(gomock.Any()).Return(bus.GetRuntimeMgr().GetGenesis().Validators, nil).AnyTimes() persistenceReadContextMock.EXPECT().Close().Return(nil).AnyTimes() diff --git a/consensus/helpers.go b/consensus/helpers.go index 7d76ab314..9986edf6a 100644 --- a/consensus/helpers.go +++ b/consensus/helpers.go @@ -26,7 +26,8 @@ const ( ByzantineThreshold = float64(2) / float64(3) - HotstuffMessageContentType = "consensus.HotstuffMessage" + HotstuffMessageContentType = "consensus.HotstuffMessage" + StateSyncMessageContentType = "consensus.StateSyncMessage" ) var HotstuffSteps = [...]typesCons.HotstuffStep{NewRound, Prepare, PreCommit, Commit, Decide} @@ -299,7 +300,6 @@ func (m *consensusModule) electNextLeader(message *typesCons.HotstuffMessage) er } /*** General Infrastructure Helpers ***/ - func (m *consensusModule) setLogPrefix(logPrefix string) { logger.Global.UpdateFields(map[string]any{ "kind": logPrefix, diff --git a/consensus/module.go b/consensus/module.go index c34e64269..4201a4a7e 100644 --- a/consensus/module.go +++ b/consensus/module.go @@ -7,6 +7,7 @@ import ( "github.com/pokt-network/pocket/consensus/leader_election" "github.com/pokt-network/pocket/consensus/pacemaker" + "github.com/pokt-network/pocket/consensus/state_sync" consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry" typesCons "github.com/pokt-network/pocket/consensus/types" "github.com/pokt-network/pocket/logger" @@ -57,6 +58,8 @@ type consensusModule struct { leaderId *typesCons.NodeId nodeId typesCons.NodeId + nodeAddress string + // Module Dependencies // IMPROVE(#283): Investigate whether the current approach to how the `utilityContext` should be // managed or changed. Also consider exposing a function that exposes the context @@ -69,6 +72,8 @@ type consensusModule struct { logger modules.Logger logPrefix string + stateSync state_sync.StateSyncModule + hotstuffMempool map[typesCons.HotstuffStep]*hotstuffFIFOMempool } @@ -107,6 +112,23 @@ func (m *consensusModule) SetUtilityContext(utilityContext modules.UtilityContex m.utilityContext = utilityContext } +// Implementations of the ConsensusStateSync interface + +func (m *consensusModule) GetNodeIdFromNodeAddress(peerId string) (uint64, error) { + validators, err := m.getValidatorsAtHeight(m.CurrentHeight()) + if err != nil { + // REFACTOR(#434): As per issue #434, once the new id is sorted out, this return statement must be changed + return 0, err + } + + valAddrToIdMap := typesCons.NewActorMapper(validators).GetValAddrToIdMap() + return uint64(valAddrToIdMap[peerId]), nil +} + +func (m *consensusModule) GetNodeAddress() string { + return m.nodeAddress +} + // Implementations of the type PaceMakerAccessModule interface // SetHeight, SeetRound, SetStep are implemented for ConsensusDebugModule func (m *consensusModule) ClearLeaderMessagesPool() { @@ -128,10 +150,17 @@ func (*consensusModule) Create(bus modules.Bus) (modules.Module, error) { if err != nil { return nil, err } - pm := paceMakerMod.(pacemaker.Pacemaker) + + stateSyncMod, err := state_sync.CreateStateSync(bus) + if err != nil { + return nil, err + } + stateSync := stateSyncMod.(state_sync.StateSyncModule) + m := &consensusModule{ paceMaker: pm, + stateSync: stateSync, leaderElectionMod: leaderElectionMod.(leader_election.LeaderElectionModule), height: 0, @@ -181,6 +210,11 @@ func (*consensusModule) Create(bus modules.Bus) (modules.Module, error) { m.genesisState = genesisState m.nodeId = valAddrToIdMap[address] + m.nodeAddress = address + + if consensusCfg.ServerModeEnabled { + m.stateSync.EnableServerMode() + } m.initMessagesPool() @@ -206,6 +240,10 @@ func (m *consensusModule) Start() error { return err } + if err := m.stateSync.Start(); err != nil { + return err + } + if err := m.leaderElectionMod.Start(); err != nil { return err } @@ -246,7 +284,7 @@ func (*consensusModule) ValidateGenesis(gen *genesis.GenesisState) error { }) // Sort the validators by their address - vals2 := vals[:] //nolint:gocritic // Make a copy of the slice to retain order + vals2 := vals[:] // nolint:gocritic // Make a copy of the slice to retain order sort.Slice(vals, func(i, j int) bool { return vals[i].GetAddress() < vals[j].GetAddress() }) @@ -311,7 +349,7 @@ func (m *consensusModule) loadPersistedState() error { } defer persistenceContext.Close() - latestHeight, err := persistenceContext.GetLatestBlockHeight() + latestHeight, err := persistenceContext.GetMaximumBlockHeight() if err != nil || latestHeight == 0 { // TODO: Proper state sync not implemented yet return nil @@ -323,3 +361,7 @@ func (m *consensusModule) loadPersistedState() error { return nil } + +func (m *consensusModule) EnableServerMode() { + m.stateSync.EnableServerMode() +} diff --git a/consensus/pacemaker/pacemaker.go b/consensus/pacemaker/module.go similarity index 87% rename from consensus/pacemaker/pacemaker.go rename to consensus/pacemaker/module.go index 17826bed1..46d91c1bb 100644 --- a/consensus/pacemaker/pacemaker.go +++ b/consensus/pacemaker/module.go @@ -2,12 +2,12 @@ package pacemaker import ( "context" - "fmt" "log" "time" consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry" typesCons "github.com/pokt-network/pocket/consensus/types" + "github.com/pokt-network/pocket/logger" "github.com/pokt-network/pocket/runtime/configs" "github.com/pokt-network/pocket/shared/codec" "github.com/pokt-network/pocket/shared/modules" @@ -51,7 +51,8 @@ type pacemaker struct { // Only used for development and debugging. debug pacemakerDebug - // REFACTOR: this should be removed, when we build a shared and proper logger + logger modules.Logger + // REFACTOR: logPrefix should be removed in exchange for setting a namespace directly with the logger logPrefix string } @@ -83,6 +84,7 @@ func (*pacemaker) Create(bus modules.Bus) (modules.Module, error) { } func (m *pacemaker) Start() error { + m.logger = logger.Global.CreateLoggerForModule(m.GetModuleName()) m.RestartTimer() return nil } @@ -118,14 +120,14 @@ func (m *pacemaker) ShouldHandleMessage(msg *typesCons.HotstuffMessage) (bool, e // Consensus message is from the past if msg.Height < currentHeight { - m.nodeLog(fmt.Sprintf("⚠️ [WARN][DISCARDING] ⚠️ Node at height %d > message height %d", currentHeight, msg.Height)) + m.logger.Info().Msgf("⚠️ [WARN][DISCARDING] ⚠️ Node at height %d > message height %d", currentHeight, msg.Height) return false, nil } // TODO: Need to restart state sync or be in state sync mode right now // Current node is out of sync if msg.Height > currentHeight { - m.nodeLog(fmt.Sprintf("⚠️ [WARN][DISCARDING] ⚠️ Node at height %d < message at height %d", currentHeight, msg.Height)) + m.logger.Info().Msgf("⚠️ [WARN][DISCARDING] ⚠️ Node at height %d > message height %d", currentHeight, msg.Height) return false, nil } @@ -139,7 +141,7 @@ func (m *pacemaker) ShouldHandleMessage(msg *typesCons.HotstuffMessage) (bool, e // Message is from the past if msg.Round < currentRound || (msg.Round == currentRound && msg.Step < currentStep) { - m.nodeLog(fmt.Sprintf("⚠️ [WARN][DISCARDING] ⚠️ Node at (height, step, round) (%d, %d, %d) > message at (%d, %d, %d)", currentHeight, currentStep, currentRound, msg.Height, msg.Step, msg.Round)) + m.logger.Info().Msgf("⚠️ [WARN][DISCARDING] ⚠️ Node at (height, step, round) (%d, %d, %d) > message at (%d, %d, %d)", currentHeight, currentStep, currentRound, msg.Height, msg.Step, msg.Round) return false, nil } @@ -150,7 +152,7 @@ func (m *pacemaker) ShouldHandleMessage(msg *typesCons.HotstuffMessage) (bool, e // pacemaker catch up! Node is synched to the right height, but on a previous step/round so we just jump to the latest state. if msg.Round > currentRound || (msg.Round == currentRound && msg.Step > currentStep) { - m.nodeLog(typesCons.PacemakerCatchup(currentHeight, uint64(currentStep), currentRound, msg.Height, uint64(msg.Step), msg.Round)) + m.logger.Info().Msg(typesCons.PacemakerCatchup(currentHeight, uint64(currentStep), currentRound, msg.Height, uint64(msg.Step), msg.Round)) consensusMod.SetStep(uint8(msg.Step)) consensusMod.SetRound(msg.Round) @@ -201,7 +203,7 @@ func (m *pacemaker) RestartTimer() { func (m *pacemaker) InterruptRound(reason string) { consensusMod := m.GetBus().GetConsensusModule() - m.nodeLog(typesCons.PacemakerInterrupt(reason, consensusMod.CurrentHeight(), typesCons.HotstuffStep(consensusMod.CurrentStep()), consensusMod.CurrentRound())) + m.logger.Info().Msg(typesCons.PacemakerInterrupt(reason, consensusMod.CurrentHeight(), typesCons.HotstuffStep(consensusMod.CurrentStep()), consensusMod.CurrentRound())) consensusMod.SetRound(consensusMod.CurrentRound() + 1) @@ -232,7 +234,7 @@ func (m *pacemaker) InterruptRound(reason string) { func (m *pacemaker) NewHeight() { consensusMod := m.GetBus().GetConsensusModule() - m.nodeLog(typesCons.PacemakerNewHeight(consensusMod.CurrentHeight() + 1)) + m.logger.Info().Msg(typesCons.PacemakerNewHeight(consensusMod.CurrentHeight() + 1)) consensusMod.SetHeight(consensusMod.CurrentHeight() + 1) consensusMod.ResetForNewHeight() @@ -295,8 +297,3 @@ func (m *pacemaker) getStepTimeout() time.Duration { baseTimeout := time.Duration(int64(time.Millisecond) * int64(m.pacemakerCfg.TimeoutMsec)) return baseTimeout } - -// TODO: Remove once we have a proper logging system. -func (m *pacemaker) nodeLog(s string) { - log.Printf("[%s][%d] %s\n", m.logPrefix, m.GetBus().GetConsensusModule().GetNodeId(), s) -} diff --git a/consensus/state_sync/helpers.go b/consensus/state_sync/helpers.go new file mode 100644 index 000000000..8255b967d --- /dev/null +++ b/consensus/state_sync/helpers.go @@ -0,0 +1,25 @@ +package state_sync + +import ( + typesCons "github.com/pokt-network/pocket/consensus/types" + cryptoPocket "github.com/pokt-network/pocket/shared/crypto" + "google.golang.org/protobuf/types/known/anypb" +) + +func (m *stateSync) SendStateSyncMessage(stateSyncMsg *typesCons.StateSyncMessage, peerId cryptoPocket.Address, blockHeight uint64) error { + anyMsg, err := anypb.New(stateSyncMsg) + if err != nil { + return err + } + m.logger.Info().Uint64("height", blockHeight).Msg(typesCons.SendingStateSyncMessage(peerId, blockHeight)) + return m.sendToPeer(anyMsg, peerId) +} + +// Helper function for sending state sync messages +func (m *stateSync) sendToPeer(msg *anypb.Any, peerId cryptoPocket.Address) error { + if err := m.GetBus().GetP2PModule().Send(peerId, msg); err != nil { + m.logger.Error().Msgf(typesCons.ErrSendMessage.Error(), err) + return err + } + return nil +} diff --git a/consensus/state_sync/interfaces.go b/consensus/state_sync/interfaces.go new file mode 100644 index 000000000..726175f80 --- /dev/null +++ b/consensus/state_sync/interfaces.go @@ -0,0 +1,104 @@ +package state_sync + +import ( + coreTypes "github.com/pokt-network/pocket/shared/core/types" + cryptoPocket "github.com/pokt-network/pocket/shared/crypto" +) + +// REFACTOR: Remove interface definitions from this file to their respective source code files, +// keep interface definitions in the same file with the implementation as in server.go + +type SyncState interface { + // latest local height + LatestHeight() int64 + // latest network height (from the aggregation of Peer Sync Meta) + LatestNetworkHeight() int64 + // retrieve peer meta (actively updated through churn management) + GetPeers() []PeerSyncMeta + // returns ordered array of missing block heights + GetMissingBlockHeights() []int64 +} + +type BlockRequestMessage interface { + // the height the peer wants from the block store + GetHeight() int64 +} + +type BlockResponseMessage interface { + // the bytes of the requested block from the block store + GetBlockBytes() []byte +} + +// TODO: needs to be shared between P2P as the Churn Management Process updates this information +type PeerSyncMeta interface { + // the unique identifier associated with the peer + GetPeerID() string + // the maximum height the peer has in the block store + GetMaxHeight() int64 + // the minimum height the peer has in the block store + GetMinHeight() int64 +} + +// LEGACY interface definition +// TODO: this must be deleted once state sync module is ready. +type StateSyncModuleLEGACY interface { + // -- Constructor Setter Functions -- + + // `HandleStateSync` function: + // - Create a Utility Context + // - Block.ValidateBasic() + // - Consensus Module Replica Path + // - Prepare Block: utilityContext.SetProposalBlock(block) + // - Apply Block: utilityContext.ApplyBlock(block) + // - Validate Block: utilityContext.AppHash == Block.AppHash + // - Store Block: consensusModule.CommitBlock() + HandleStateSyncMessage(msg BlockResponseMessage) + + // `GetPeerSyncMeta` function: + // - Retrieve a list of active peers with their metadata (identified and retrieved through P2P's `Churn Management`) + GetPeerMetadata(GetPeerSyncMeta func() (peers []PeerSyncMeta, err error)) + + // `NetworkSend` function contract: + // - sends data to an address via P2P network + NetworkSend(NetworkSend func(data []byte, address cryptoPocket.Address) error) + + // -- Sync modes -- + + // In the StateSync protocol, the Node fields valid BlockRequests from its peers to help them CatchUp to be Synced. + // This sub-protocol is continuous throughout the lifecycle of StateSync. + RunServerMode() + + // In SyncedMode, the Node is caught up to the latest block and is listening & waiting for the latest block to be passed + // to maintain a synchronous state with the global SyncState. + // - UpdatePeerMetadata from P2P module + // - UpdateSyncState + // - Rely on new blocks to be propagated via the P2P network after Validators reach consensus + // - If `localSyncState.Height < globalNetworkSyncState.Height` -> RunSyncMode() // careful about race-conditions + RunSyncedMode() + + // Runs sync mode 'service' that continuously runs while `localSyncState.Height < globalNetworkSyncState.Height` + // - UpdatePeerMetadata from P2P module + // - Retrieve missing blocks from peers + // - Process retrieved blocks + // - UpdateSyncState + // - If `localSyncState.Height == globalNetworkSyncState.Height` -> RunSyncedMode() + RunSyncMode() + + // Returns the `highest priority aka lowest height` missing block heights up to `max` heights + GetMissingBlockHeights(state SyncState, max int) (blockHeights []int64, err error) + + // Random selection of eligilbe peers enables a fair distribution of blockRequests over time via law of large numbers + // An eligible peer is when `PeerMeta.MinHeight <= blockHeight <= PeerMeta.MaxHeight` + GetRandomEligiblePeersForHeight(blockHeight int64) (eligiblePeer PeerSyncMeta, err error) + + // Uses `NetworkSend` to send a `BlockRequestMessage` to a specific peer + SendBlockRequest(peerId string) error + + // Uses 'NetworkSend' to send a `BlockResponseMessage` to a specific peer + // This function is used in 'ServerMode()' + HandleBlockRequest(message BlockRequestMessage) error + + // Uses `HandleBlock` to process retrieved blocks from peers + // Must update sync state using `SetMissingBlockHeight` + ProcessBlock(block *coreTypes.Block) error +} diff --git a/consensus/state_sync/module.go b/consensus/state_sync/module.go index 2fd7eb116..74bcf353b 100644 --- a/consensus/state_sync/module.go +++ b/consensus/state_sync/module.go @@ -2,25 +2,28 @@ package state_sync import ( typesCons "github.com/pokt-network/pocket/consensus/types" - coreTypes "github.com/pokt-network/pocket/shared/core/types" + "github.com/pokt-network/pocket/logger" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" "github.com/pokt-network/pocket/shared/modules" ) -// This module is responsible for handling requests and business logic that advertises and shares -// local state metadata with other peers synching to the latest block. -type StateSyncServerModule interface { - modules.Module +const ( + DefaultLogPrefix = "NODE" + stateSyncModuleName = "stateSyncModule" +) - // Advertise (send) the local state sync metadata to the requesting peer - HandleStateSyncMetadataRequest(*typesCons.StateSyncMetadataRequest) error +type SyncMode string - // Send the block being requested by the peer - HandleGetBlockRequest(*typesCons.GetBlockRequest) error -} +const ( + Sync SyncMode = "sync" + Synched SyncMode = "synched" + Pacemaker SyncMode = "pacemaker" + Server SyncMode = "server" +) type StateSyncModule interface { modules.Module + StateSyncServerModule // Handle a metadata response from a peer so this node can update its local view of the state // sync metadata available from its peers @@ -28,99 +31,99 @@ type StateSyncModule interface { // Handle a block response from a peer so this node can update apply it to its local state // and catch up to the global world state - HandleStateSyncBlockResponse(*typesCons.StateSyncMetadataResponse) error + HandleGetBlockResponse(*typesCons.GetBlockResponse) error + + IsServerModEnabled() bool + EnableServerMode() + + SendStateSyncMessage(*typesCons.StateSyncMessage, cryptoPocket.Address, uint64) error +} + +var ( + _ modules.Module = &stateSync{} + _ StateSyncModule = &stateSync{} + _ StateSyncServerModule = &stateSync{} +) + +type stateSync struct { + bus modules.Bus + + currentMode SyncMode + serverMode bool + + logger modules.Logger + logPrefix string +} + +func CreateStateSync(bus modules.Bus) (modules.Module, error) { + var m stateSync + return m.Create(bus) +} + +func (*stateSync) Create(bus modules.Bus) (modules.Module, error) { + m := &stateSync{ + logPrefix: DefaultLogPrefix, + } + + if err := bus.RegisterModule(m); err != nil { + return nil, err + } + + // when node is starting, it is in sync mode, as it might need to bootstrap to the latest state + m.currentMode = Sync + m.serverMode = false + + return m, nil +} + +func (m *stateSync) Start() error { + m.logger = logger.Global.CreateLoggerForModule(m.GetModuleName()) + + return nil +} + +func (m *stateSync) Stop() error { + return nil +} + +func (m *stateSync) SetBus(pocketBus modules.Bus) { + m.bus = pocketBus +} + +func (m *stateSync) GetBus() modules.Bus { + if m.bus == nil { + logger.Global.Fatal().Msg("PocketBus is not initialized") + } + return m.bus +} + +func (m *stateSync) GetModuleName() string { + return stateSyncModuleName } -// ~~~~~ TODO(#362): The interface below is only meant to be used as a guideline and not implemented explicitly. It will updated & removed over time. ~~~~~ -type StateSyncModuleLEGACY interface { - // -- Constructor Setter Functions -- - - // `HandleStateSync` function: - // - Create a Utility Context - // - Block.ValidateBasic() - // - Consensus Module Replica Path - // - Prepare Block: utilityContext.SetProposalBlock(block) - // - Apply Block: utilityContext.ApplyBlock(block) - // - Validate Block: utilityContext.AppHash == Block.AppHash - // - Store Block: consensusModule.CommitBlock() - HandleStateSyncMessage(msg BlockResponseMessage) - - // `GetPeerSyncMeta` function: - // - Retrieve a list of active peers with their metadata (identified and retrieved through P2P's `Churn Management`) - GetPeerMetadata(GetPeerSyncMeta func() (peers []PeerSyncMeta, err error)) - - // `NetworkSend` function contract: - // - sends data to an address via P2P network - NetworkSend(NetworkSend func(data []byte, address cryptoPocket.Address) error) - - // -- Sync modes -- - - // In the StateSync protocol, the Node fields valid BlockRequests from its peers to help them CatchUp to be Synced. - // This sub-protocol is continuous throughout the lifecycle of StateSync. - RunServerMode() - - // In SyncedMode, the Node is caught up to the latest block and is listening & waiting for the latest block to be passed - // to maintain a synchronous state with the global SyncState. - // - UpdatePeerMetadata from P2P module - // - UpdateSyncState - // - Rely on new blocks to be propagated via the P2P network after Validators reach consensus - // - If `localSyncState.Height < globalNetworkSyncState.Height` -> RunSyncMode() // careful about race-conditions - RunSyncedMode() - - // Runs sync mode 'service' that continuously runs while `localSyncState.Height < globalNetworkSyncState.Height` - // - UpdatePeerMetadata from P2P module - // - Retrieve missing blocks from peers - // - Process retrieved blocks - // - UpdateSyncState - // - If `localSyncState.Height == globalNetworkSyncState.Height` -> RunSyncedMode() - RunSyncMode() - - // Returns the `highest priority aka lowest height` missing block heights up to `max` heights - GetMissingBlockHeights(state SyncState, max int) (blockHeights []int64, err error) - - // Random selection of eligilbe peers enables a fair distribution of blockRequests over time via law of large numbers - // An eligible peer is when `PeerMeta.MinHeight <= blockHeight <= PeerMeta.MaxHeight` - GetRandomEligiblePeersForHeight(blockHeight int64) (eligiblePeer PeerSyncMeta, err error) - - // Uses `NetworkSend` to send a `BlockRequestMessage` to a specific peer - SendBlockRequest(peerId string) error - - // Uses 'NetworkSend' to send a `BlockResponseMessage` to a specific peer - // This function is used in 'ServerMode()' - HandleBlockRequest(message BlockRequestMessage) error - - // Uses `HandleBlock` to process retrieved blocks from peers - // Must update sync state using `SetMissingBlockHeight` - ProcessBlock(block *coreTypes.Block) error +func (m *stateSync) IsServerModEnabled() bool { + return m.serverMode } -type SyncState interface { - // latest local height - LatestHeight() int64 - // latest network height (from the aggregation of Peer Sync Meta) - LatestNetworkHeight() int64 - // retrieve peer meta (actively updated through churn management) - GetPeers() []PeerSyncMeta - // returns ordered array of missing block heights - GetMissingBlockHeights() []int64 +func (m *stateSync) SetLogPrefix(logPrefix string) { + m.logPrefix = logPrefix } -type BlockRequestMessage interface { - // the height the peer wants from the block store - GetHeight() int64 +func (m *stateSync) EnableServerMode() { + m.currentMode = Server + m.serverMode = true } -type BlockResponseMessage interface { - // the bytes of the requested block from the block store - GetBlockBytes() []byte +// TODO(#352): Implement this function +// Placeholder function +func (m *stateSync) HandleGetBlockResponse(blockRes *typesCons.GetBlockResponse) error { + m.logger.Debug().Msgf("Received get block response: %s", blockRes.Block.String()) + return nil } -// TODO: needs to be shared between P2P as the Churn Management Process updates this information -type PeerSyncMeta interface { - // the unique identifier associated with the peer - GetPeerID() string - // the maximum height the peer has in the block store - GetMaxHeight() int64 - // the minimum height the peer has in the block store - GetMinHeight() int64 +// TODO(#352): Implement the business to handle these correctly +// Placeholder function +func (m *stateSync) HandleStateSyncMetadataResponse(metaDataRes *typesCons.StateSyncMetadataResponse) error { + m.logger.Debug().Msgf("Received get metadata response: %s", metaDataRes.String()) + return nil } diff --git a/consensus/state_sync/server.go b/consensus/state_sync/server.go new file mode 100644 index 000000000..1e7c26105 --- /dev/null +++ b/consensus/state_sync/server.go @@ -0,0 +1,109 @@ +package state_sync + +import ( + "fmt" + + typesCons "github.com/pokt-network/pocket/consensus/types" + "github.com/pokt-network/pocket/shared/codec" + "github.com/pokt-network/pocket/shared/converters" + coreTypes "github.com/pokt-network/pocket/shared/core/types" + cryptoPocket "github.com/pokt-network/pocket/shared/crypto" +) + +// This module is responsible for handling requests and business logic that advertises and shares +// local state metadata with other peers synching to the latest block. +type StateSyncServerModule interface { + // Advertise (send) the local state sync metadata to the requesting peer + HandleStateSyncMetadataRequest(*typesCons.StateSyncMetadataRequest) error + + // Send the block being requested by the peer + HandleGetBlockRequest(*typesCons.GetBlockRequest) error +} + +func (m *stateSync) HandleStateSyncMetadataRequest(metadataReq *typesCons.StateSyncMetadataRequest) error { + consensusMod := m.GetBus().GetConsensusModule() + serverNodePeerId := m.GetBus().GetConsensusModule().GetNodeAddress() + + clientPeerAddress := metadataReq.PeerAddress + m.logger.Info().Msgf("%s received state sync metadata request from: %s", serverNodePeerId, clientPeerAddress) + + // last finalized block + persistenceContext, err := m.GetBus().GetPersistenceModule().NewReadContext(int64(consensusMod.CurrentHeight()) - 1) + if err != nil { + return nil + } + defer persistenceContext.Close() + + maxHeight, err := persistenceContext.GetMaximumBlockHeight() + if err != nil { + return err + } + + minHeight, err := persistenceContext.GetMinimumBlockHeight() + if err != nil { + return err + } + + stateSyncMessage := typesCons.StateSyncMessage{ + Message: &typesCons.StateSyncMessage_MetadataRes{ + MetadataRes: &typesCons.StateSyncMetadataResponse{ + PeerAddress: serverNodePeerId, + MinHeight: minHeight, + MaxHeight: uint64(maxHeight), + }, + }, + } + + return m.SendStateSyncMessage(&stateSyncMessage, cryptoPocket.AddressFromString(clientPeerAddress), m.bus.GetConsensusModule().CurrentHeight()) +} + +func (m *stateSync) HandleGetBlockRequest(blockReq *typesCons.GetBlockRequest) error { + consensusMod := m.GetBus().GetConsensusModule() + serverNodePeerAddress := consensusMod.GetNodeAddress() + + clientPeerAddress := blockReq.PeerAddress + m.logger.Info().Msgf("%s received state sync Get Block Req from: %s", serverNodePeerAddress, clientPeerAddress) + + currentHeight := m.GetBus().GetConsensusModule().CurrentHeight() + + if currentHeight < blockReq.Height { + return fmt.Errorf("requested block height: %d is higher than node's block height: %d", blockReq.Height, consensusMod.CurrentHeight()) + } + + // get block from the persistence module + block, err := m.getBlockAtHeight(blockReq.Height) + if err != nil { + return err + } + + stateSyncMessage := typesCons.StateSyncMessage{ + Message: &typesCons.StateSyncMessage_GetBlockRes{ + GetBlockRes: &typesCons.GetBlockResponse{ + PeerAddress: serverNodePeerAddress, + Block: block, + }, + }, + } + + return m.SendStateSyncMessage(&stateSyncMessage, cryptoPocket.AddressFromString(clientPeerAddress), blockReq.Height) +} + +// Get a block from persistence module given block height +func (m *stateSync) getBlockAtHeight(blockHeight uint64) (*coreTypes.Block, error) { + blockStore := m.GetBus().GetPersistenceModule().GetBlockStore() + heightBytes := converters.HeightToBytes(blockHeight) + + blockBytes, err := blockStore.Get(heightBytes) + if err != nil { + m.logger.Error().Err(typesCons.ErrConsensusMempoolFull).Msg(typesCons.DisregardHotstuffMessage) + return nil, err + } + + var block coreTypes.Block + err = codec.GetCodec().Unmarshal(blockBytes, &block) + if err != nil { + return &coreTypes.Block{}, err + } + + return &block, nil +} diff --git a/consensus/state_sync_handler.go b/consensus/state_sync_handler.go new file mode 100644 index 000000000..e306df8b9 --- /dev/null +++ b/consensus/state_sync_handler.go @@ -0,0 +1,52 @@ +package consensus + +import ( + "fmt" + + typesCons "github.com/pokt-network/pocket/consensus/types" + "github.com/pokt-network/pocket/shared/codec" + "google.golang.org/protobuf/types/known/anypb" +) + +func (m *consensusModule) HandleStateSyncMessage(stateSyncMessageAny *anypb.Any) error { + m.m.Lock() + defer m.m.Unlock() + + switch stateSyncMessageAny.MessageName() { + case StateSyncMessageContentType: + msg, err := codec.GetCodec().FromAny(stateSyncMessageAny) + if err != nil { + return err + } + + stateSyncMessage, ok := msg.(*typesCons.StateSyncMessage) + if !ok { + return fmt.Errorf("failed to cast message to StateSyncMessage") + } + + return m.handleStateSyncMessage(stateSyncMessage) + default: + return typesCons.ErrUnknownStateSyncMessageType(stateSyncMessageAny.MessageName()) + } +} + +func (m *consensusModule) handleStateSyncMessage(stateSyncMessage *typesCons.StateSyncMessage) error { + switch stateSyncMessage.Message.(type) { + case *typesCons.StateSyncMessage_MetadataReq: + if !m.stateSync.IsServerModEnabled() { + return fmt.Errorf("server module is not enabled") + } + return m.stateSync.HandleStateSyncMetadataRequest(stateSyncMessage.GetMetadataReq()) + case *typesCons.StateSyncMessage_MetadataRes: + return m.stateSync.HandleStateSyncMetadataResponse(stateSyncMessage.GetMetadataRes()) + case *typesCons.StateSyncMessage_GetBlockReq: + if !m.stateSync.IsServerModEnabled() { + return fmt.Errorf("server module is not enabled") + } + return m.stateSync.HandleGetBlockRequest(stateSyncMessage.GetGetBlockReq()) + case *typesCons.StateSyncMessage_GetBlockRes: + return m.stateSync.HandleGetBlockResponse(stateSyncMessage.GetGetBlockRes()) + default: + return fmt.Errorf("unspecified state sync message type") + } +} diff --git a/consensus/types/errors.go b/consensus/types/errors.go index 7d7e88446..18db8ad71 100644 --- a/consensus/types/errors.go +++ b/consensus/types/errors.go @@ -9,6 +9,7 @@ import ( "github.com/pokt-network/pocket/logger" "github.com/pokt-network/pocket/shared/codec" + cryptoPocket "github.com/pokt-network/pocket/shared/crypto" "google.golang.org/protobuf/proto" ) @@ -82,6 +83,10 @@ func SendingMessage(msg *HotstuffMessage, nodeId NodeId) string { return fmt.Sprintf("✉️ Sending message ✉️ to %d at (height, step, round) (%d, %d, %d)", nodeId, msg.Height, msg.Step, msg.Round) } +func SendingStateSyncMessage(nodeId cryptoPocket.Address, height uint64) string { + return fmt.Sprintf("🔄 Sending State sync message ✉️ to node %s at height: (%d) 🔄", nodeId, height) +} + func BroadcastingMessage(msg *HotstuffMessage) string { return fmt.Sprintf("📣 Broadcasting message 📣 (height, step, round): (%d, %d, %d)", msg.GetHeight(), msg.GetStep(), msg.GetRound()) } @@ -165,6 +170,8 @@ const ( sendMessageError = "error sending message" broadcastMessageError = "error broadcasting message" createConsensusMessageError = "error creating consensus message" + createStateSyncMessageError = "error creating state sync message" + blockRetrievalError = "couldn't retrieve the block from persistence module" anteValidationError = "discarding hotstuff message because ante validation failed" nilLeaderIdError = "attempting to send a message to leader when LeaderId is nil" newPersistenceReadContextError = "error creating new persistence read context" @@ -202,6 +209,8 @@ var ( ErrSendMessage = errors.New(sendMessageError) ErrBroadcastMessage = errors.New(broadcastMessageError) ErrCreateConsensusMessage = errors.New(createConsensusMessageError) + ErrCreateStateSyncMessage = errors.New(createStateSyncMessageError) + ErrBlockRetrievalMessage = errors.New(blockRetrievalError) ErrHotstuffValidation = errors.New(anteValidationError) ErrNilLeaderId = errors.New(nilLeaderIdError) ErrNewPersistenceReadContext = errors.New(newPersistenceReadContextError) @@ -241,6 +250,10 @@ func ErrUnknownConsensusMessageType(msg any) error { return fmt.Errorf("unknown consensus message type: %v", msg) } +func ErrUnknownStateSyncMessageType(msg interface{}) error { + return fmt.Errorf("unknown state sync message type: %v", msg) +} + func ErrCreateProposeMessage(step HotstuffStep) error { return fmt.Errorf("could not create a %s Propose message", StepToString[step]) } diff --git a/consensus/types/proto/state_sync.proto b/consensus/types/proto/state_sync.proto index 64be8b452..a2eb0cdb1 100644 --- a/consensus/types/proto/state_sync.proto +++ b/consensus/types/proto/state_sync.proto @@ -14,25 +14,35 @@ import "core/types/proto/block.proto"; // explicit enough. message StateSyncMetadataRequest { - string peer_id = 1; // The peer id of the node that is requesting the metadata + // CONSOLIDATE(#347): Integrate with LibP2P + string peer_address = 1; // The peer id of the node that is requesting the metadata } message StateSyncMetadataResponse { - string peer_id = 1; // The `peer_id` needs to be populated by the P2P module of the receiving node so the sender cannot falsify its identity + string peer_address = 1; // The `peer_id` needs to be populated by the P2P module of the receiving node so the sender cannot falsify its identity uint64 min_height = 2; // The minimum height that a peer has in its BlockStore uint64 max_height = 3; // The maximum height that a peer has in its BlockStore } message GetBlockRequest { - string peer_id = 1; // The peer id of the node that is requesting the metadata + string peer_address = 1; // The peer id of the node that is requesting the metadata uint64 height = 2; // The height of the block being requested by the peer } message GetBlockResponse { - string peer_id = 1; // The `peer_id` needs to be populated by the P2P module of the receiving node so the sender cannot falsify its identity + string peer_address = 1; // The `peer_id` needs to be populated by the P2P module of the receiving node so the sender cannot falsify its identity core.Block block = 2; // The block being provided to the peer } +message StateSyncMessage { + oneof message { + StateSyncMetadataRequest metadata_req = 2; + StateSyncMetadataResponse metadata_res = 3; + GetBlockRequest get_block_req = 4; + GetBlockResponse get_block_res = 5; + } +} + // NOT USED: This gRPC interface is **not being used at the moment**. It is in place simply as a // guideline of what how the types in this file could be used if a direct synchronous communication // between nodes were implemented. Furthermore, since the message types are used for asynchronous diff --git a/persistence/block.go b/persistence/block.go index 52b211402..297cee1e5 100644 --- a/persistence/block.go +++ b/persistence/block.go @@ -1,13 +1,14 @@ package persistence import ( - "encoding/binary" "encoding/hex" "fmt" + "log" "github.com/pokt-network/pocket/persistence/kvstore" "github.com/pokt-network/pocket/persistence/types" "github.com/pokt-network/pocket/shared/codec" + "github.com/pokt-network/pocket/shared/converters" coreTypes "github.com/pokt-network/pocket/shared/core/types" ) @@ -26,11 +27,17 @@ func (p *persistenceModule) TransactionExists(transactionHash string) (bool, err } return true, err } +func (p *PostgresContext) GetMinimumBlockHeight() (latestHeight uint64, err error) { + ctx, tx := p.getCtxAndTx() + + err = tx.QueryRow(ctx, types.GetMinimumlockHeightQuery()).Scan(&latestHeight) + return +} -func (p *PostgresContext) GetLatestBlockHeight() (latestHeight uint64, err error) { +func (p *PostgresContext) GetMaximumBlockHeight() (latestHeight uint64, err error) { ctx, tx := p.getCtxAndTx() - err = tx.QueryRow(ctx, types.GetLatestBlockHeightQuery()).Scan(&latestHeight) + err = tx.QueryRow(ctx, types.GetMaximumBlockHeightQuery()).Scan(&latestHeight) return } @@ -99,11 +106,6 @@ func (p *PostgresContext) storeBlock(block *coreTypes.Block) error { if err != nil { return err } - return p.blockStore.Set(heightToBytes(p.Height), blockBz) -} - -func heightToBytes(height int64) []byte { - heightBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(heightBytes, uint64(height)) - return heightBytes + log.Printf("Storing block %d in block store.\n", block.BlockHeader.Height) + return p.blockStore.Set(converters.HeightToBytes(uint64(p.Height)), blockBz) } diff --git a/persistence/debug.go b/persistence/debug.go index 41caa901f..1c75da797 100644 --- a/persistence/debug.go +++ b/persistence/debug.go @@ -7,6 +7,7 @@ import ( "github.com/celestiaorg/smt" "github.com/pokt-network/pocket/persistence/types" "github.com/pokt-network/pocket/shared/codec" + "github.com/pokt-network/pocket/shared/converters" coreTypes "github.com/pokt-network/pocket/shared/core/types" "github.com/pokt-network/pocket/shared/messaging" ) @@ -45,7 +46,7 @@ func (m *persistenceModule) HandleDebugMessage(debugMessage *messaging.DebugMess func (m *persistenceModule) showLatestBlockInStore(_ *messaging.DebugMessage) { // TODO: Add an iterator to the `kvstore` and use that instead height := m.GetBus().GetConsensusModule().CurrentHeight() - 1 - blockBytes, err := m.GetBlockStore().Get(heightToBytes(int64(height))) + blockBytes, err := m.GetBlockStore().Get(converters.HeightToBytes(height)) if err != nil { m.logger.Error().Err(err).Uint64("height", height).Msg("Error getting block from block store") return diff --git a/persistence/docs/CHANGELOG.md b/persistence/docs/CHANGELOG.md index b8a1b36e6..d96cf4c32 100644 --- a/persistence/docs/CHANGELOG.md +++ b/persistence/docs/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.0.0.33] - 2023-02-09 + +- Added mock generation to the `kvstore/kvstore.go`. + ## [0.0.0.32] - 2023-02-07 - Minor documentation cleanup diff --git a/persistence/kvstore/kvstore.go b/persistence/kvstore/kvstore.go index 58b333abd..96cf96df0 100644 --- a/persistence/kvstore/kvstore.go +++ b/persistence/kvstore/kvstore.go @@ -1,5 +1,7 @@ package kvstore +//go:generate mockgen -source=$GOFILE -destination=../types/mocks/block_store_mock.go github.com/pokt-network/pocket/persistence/types KVStore + import ( "errors" "log" diff --git a/persistence/module.go b/persistence/module.go index 343583a8d..887fa8594 100644 --- a/persistence/module.go +++ b/persistence/module.go @@ -226,7 +226,7 @@ func (m *persistenceModule) shouldHydrateGenesisDb() (bool, error) { } defer checkContext.Close() - blockHeight, err := checkContext.GetLatestBlockHeight() + blockHeight, err := checkContext.GetMaximumBlockHeight() if err != nil { return true, nil } diff --git a/persistence/test/state_test.go b/persistence/test/state_test.go index 9d73f866a..875bcd96b 100644 --- a/persistence/test/state_test.go +++ b/persistence/test/state_test.go @@ -1,7 +1,6 @@ package test import ( - "encoding/binary" "encoding/hex" "fmt" "reflect" @@ -10,6 +9,7 @@ import ( "github.com/pokt-network/pocket/persistence/indexer" "github.com/pokt-network/pocket/shared/codec" + "github.com/pokt-network/pocket/shared/converters" coreTypes "github.com/pokt-network/pocket/shared/core/types" "github.com/pokt-network/pocket/shared/modules" "github.com/stretchr/testify/require" @@ -54,7 +54,7 @@ func TestStateHash_DeterministicStateWhenUpdatingAppStake(t *testing.T) { for i := 0; i < len(stateHashes); i++ { // Get the context at the new height and retrieve one of the apps height := int64(i + 1) - heightBz := heightToBytes(height) + heightBz := converters.HeightToBytes(uint64(height)) expectedStateHash := stateHashes[i] db := NewTestPostgresContext(t, height) @@ -224,9 +224,3 @@ func verifyReplayableBlocks(t *testing.T, replayableBlocks []*TestReplayableBloc require.NoError(t, err) } } - -func heightToBytes(height int64) []byte { - heightBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(heightBytes, uint64(height)) - return heightBytes -} diff --git a/persistence/types/block.go b/persistence/types/block.go index 5520ad33b..f96951d27 100644 --- a/persistence/types/block.go +++ b/persistence/types/block.go @@ -25,10 +25,14 @@ func GetBlockHashQuery(height int64) string { return fmt.Sprintf(`SELECT hash FROM %s WHERE height=%d`, BlockTableName, height) } -func GetLatestBlockHeightQuery() string { +func GetMaximumBlockHeightQuery() string { return fmt.Sprintf(`SELECT MAX(height) FROM %s`, BlockTableName) } +func GetMinimumlockHeightQuery() string { + return fmt.Sprintf(`SELECT MIN(height) FROM %s`, BlockTableName) +} + func ClearAllBlocksQuery() string { return fmt.Sprintf(`DELETE FROM %s`, BlockTableName) } diff --git a/persistence/types/mocks/mocks.go b/persistence/types/mocks/mocks.go new file mode 100644 index 000000000..cc067e0b2 --- /dev/null +++ b/persistence/types/mocks/mocks.go @@ -0,0 +1,3 @@ +package mock_kvstore + +// This file is in place to declare the package for dynamically generated mocks diff --git a/runtime/configs/proto/consensus_config.proto b/runtime/configs/proto/consensus_config.proto index 9830a1672..b2c29559b 100644 --- a/runtime/configs/proto/consensus_config.proto +++ b/runtime/configs/proto/consensus_config.proto @@ -7,7 +7,8 @@ option go_package = "github.com/pokt-network/pocket/runtime/configs"; message ConsensusConfig { string private_key = 1; uint64 max_mempool_bytes = 2; // TODO(olshansky): add unit tests for this - PacemakerConfig pacemaker_config = 3; + bool server_mode_enabled = 3; + PacemakerConfig pacemaker_config = 4; } message PacemakerConfig { diff --git a/runtime/docs/CHANGELOG.md b/runtime/docs/CHANGELOG.md index 8cf32b588..016460003 100644 --- a/runtime/docs/CHANGELOG.md +++ b/runtime/docs/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.0.0.16] - 2023-02-09 + +- Update runtime consensus config with bool server mode variable +- Update manager test + ## [0.0.0.15] - 2023-02-07 - Added GITHUB_WIKI tags where it was missing diff --git a/runtime/manager_test.go b/runtime/manager_test.go index 71c3577da..e57f4ca20 100644 --- a/runtime/manager_test.go +++ b/runtime/manager_test.go @@ -209,6 +209,7 @@ func TestNewManagerFromReaders(t *testing.T) { Manual: true, DebugTimeBetweenStepsMsec: 1000, }, + ServerModeEnabled: true, }, Utility: &configs.UtilityConfig{ MaxMempoolTransactionBytes: 1073741824, diff --git a/shared/CHANGELOG.md b/shared/CHANGELOG.md index 67dbd62b9..57d7e9b13 100644 --- a/shared/CHANGELOG.md +++ b/shared/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.0.0.24] - 2023-02-09 + + - Add `ConsensusStateSync` interface that is implemented by the consensus module + ## [0.0.0.23] - 2023-02-07 - Added GITHUB_WIKI tags where it was missing diff --git a/shared/converters/util.go b/shared/converters/util.go index 588607bfa..30a0bb062 100644 --- a/shared/converters/util.go +++ b/shared/converters/util.go @@ -1,6 +1,7 @@ package converters import ( + "encoding/binary" "fmt" "math/big" ) @@ -21,3 +22,13 @@ func StringToBigInt(s string) (*big.Int, error) { func BigIntToString(b *big.Int) string { return b.Text(DefaultDenomination) } + +func HeightFromBytes(heightBz []byte) uint64 { + return binary.LittleEndian.Uint64(heightBz) +} + +func HeightToBytes(height uint64) []byte { + heightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(heightBytes, height) + return heightBytes +} diff --git a/shared/messaging/proto/debug_message.proto b/shared/messaging/proto/debug_message.proto index a6668824f..9497c51ea 100644 --- a/shared/messaging/proto/debug_message.proto +++ b/shared/messaging/proto/debug_message.proto @@ -13,11 +13,18 @@ enum DebugMessageAction { DEBUG_CONSENSUS_PRINT_NODE_STATE = 2; DEBUG_CONSENSUS_TRIGGER_NEXT_VIEW = 3; DEBUG_CONSENSUS_TOGGLE_PACE_MAKER_MODE = 4; // toggle between manual and automatic + + // TODO: Replace `DEBUG_` with `DEBUG_PERSISTENCE_` below for clarity + DEBUG_CONSENSUS_SEND_METADATA_REQ = 5; + DEBUG_CONSENSUS_SEND_BLOCK_REQ = 6; + + DEBUG_SHOW_LATEST_BLOCK_IN_STORE = 7; + + DEBUG_PERSISTENCE_CLEAR_STATE = 8; + DEBUG_PERSISTENCE_RESET_TO_GENESIS = 9; + - DEBUG_SHOW_LATEST_BLOCK_IN_STORE = 5; - DEBUG_PERSISTENCE_CLEAR_STATE = 6; - DEBUG_PERSISTENCE_RESET_TO_GENESIS = 7; } message DebugMessage { diff --git a/shared/modules/consensus_module.go b/shared/modules/consensus_module.go index a11c0245e..7f8444aef 100644 --- a/shared/modules/consensus_module.go +++ b/shared/modules/consensus_module.go @@ -19,17 +19,24 @@ const ( type ConsensusModule interface { Module KeyholderModule + + ConsensusStateSync ConsensusPacemaker // Consensus Engine Handlers HandleMessage(*anypb.Any) error // TODO(gokhan): move it into a debug module HandleDebugMessage(*messaging.DebugMessage) error + // State Sync messages Handler + HandleStateSyncMessage(*anypb.Any) error // Consensus State Accessors CurrentHeight() uint64 CurrentRound() uint64 CurrentStep() uint64 + + // State Sync functions + EnableServerMode() } // This interface represents functions exposed by the Consensus module for Pacemaker specific business logic. @@ -60,3 +67,11 @@ type ConsensusPacemaker interface { GetPrepareQC() (*anypb.Any, error) GetNodeId() uint64 } + +// This interface represents functions exposed by the Consensus module for StateSync specific business logic. +// These functions are intended to only be called by the StateSync module. +// INVESTIGATE: This interface enable a fast implementation of state sync but look into a way of removing it in the future +type ConsensusStateSync interface { + GetNodeIdFromNodeAddress(string) (uint64, error) + GetNodeAddress() string +} diff --git a/shared/modules/persistence_module.go b/shared/modules/persistence_module.go index 62369c5a5..c1b74730d 100644 --- a/shared/modules/persistence_module.go +++ b/shared/modules/persistence_module.go @@ -127,7 +127,8 @@ type PersistenceReadContext interface { // CONSOLIDATE: BlockHash / AppHash / StateHash // Block Queries - GetLatestBlockHeight() (uint64, error) // Returns the height of the latest block in the persistence layer + GetMaximumBlockHeight() (uint64, error) // Returns the height of the latest block in the persistence layer + GetMinimumBlockHeight() (uint64, error) // Returns the min block height in the persistence layer GetBlockHash(height int64) (string, error) // Returns the app hash corresponding to the height provided // Pool Queries diff --git a/shared/node.go b/shared/node.go index 2f599aef9..4af232ea9 100644 --- a/shared/node.go +++ b/shared/node.go @@ -126,6 +126,8 @@ func (node *Node) handleEvent(message *messaging.PocketEnvelope) error { logger.Global.Info().Msg("Received NodeStartedEvent") case consensus.HotstuffMessageContentType: return node.GetBus().GetConsensusModule().HandleMessage(message.Content) + case consensus.StateSyncMessageContentType: + return node.GetBus().GetConsensusModule().HandleStateSyncMessage(message.Content) case utility.TransactionGossipMessageContentType: return node.GetBus().GetUtilityModule().HandleMessage(message.Content) case messaging.DebugMessageEventType: @@ -146,7 +148,9 @@ func (node *Node) handleDebugMessage(message *messaging.PocketEnvelope) error { case messaging.DebugMessageAction_DEBUG_CONSENSUS_RESET_TO_GENESIS, messaging.DebugMessageAction_DEBUG_CONSENSUS_PRINT_NODE_STATE, messaging.DebugMessageAction_DEBUG_CONSENSUS_TRIGGER_NEXT_VIEW, - messaging.DebugMessageAction_DEBUG_CONSENSUS_TOGGLE_PACE_MAKER_MODE: + messaging.DebugMessageAction_DEBUG_CONSENSUS_TOGGLE_PACE_MAKER_MODE, + messaging.DebugMessageAction_DEBUG_CONSENSUS_SEND_BLOCK_REQ, + messaging.DebugMessageAction_DEBUG_CONSENSUS_SEND_METADATA_REQ: return node.GetBus().GetConsensusModule().HandleDebugMessage(debugMessage) // Persistence Debug case messaging.DebugMessageAction_DEBUG_SHOW_LATEST_BLOCK_IN_STORE: