diff --git a/Makefile b/Makefile index a9e0f2738..b12da31ef 100644 --- a/Makefile +++ b/Makefile @@ -347,9 +347,11 @@ benchmark_p2p_addrbook: # HACK - Like TECHDEBT, but much worse. This needs to be prioritized # REFACTOR - Similar to TECHDEBT, but will require a substantial rewrite and change across the codebase # CONSIDERATION - A comment that involves extra work but was thoughts / considered as part of some implementation +# CONSOLIDATE - We likely have similar implementations/types of the same thing, and we should consolidate them. +# DEPRECATE - Code that should be removed in the future # DISCUSS_IN_THIS_COMMIT - SHOULD NEVER BE COMMITTED TO MASTER. It is a way for the reviewer of a PR to start / reply to a discussion. # TODO_IN_THIS_COMMIT - SHOULD NEVER BE COMMITTED TO MASTER. It is a way to start the review process while non-critical changes are still in progress -TODO_KEYWORDS = -e "TODO" -e "TECHDEBT" -e "IMPROVE" -e "DISCUSS" -e "INCOMPLETE" -e "INVESTIGATE" -e "CLEANUP" -e "HACK" -e "REFACTOR" -e "CONSIDERATION" -e "TODO_IN_THIS_COMMIT" -e "DISCUSS_IN_THIS_COMMIT" +TODO_KEYWORDS = -e "TODO" -e "TECHDEBT" -e "IMPROVE" -e "DISCUSS" -e "INCOMPLETE" -e "INVESTIGATE" -e "CLEANUP" -e "HACK" -e "REFACTOR" -e "CONSIDERATION" -e "TODO_IN_THIS_COMMIT" -e "DISCUSS_IN_THIS_COMMIT" -e "CONSOLIDATE" -e "DEPRECATE" # How do I use TODOs? # 1. : ; diff --git a/consensus/CHANGELOG.md b/consensus/CHANGELOG.md index a41773b18..f51a52410 100644 --- a/consensus/CHANGELOG.md +++ b/consensus/CHANGELOG.md @@ -7,11 +7,43 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -## [0.0.0.3] - 2022-09-28 +## [0.0.0.4] - 2022-09-28 + - `consensusModule` stores block directly to prevent shared structure in the `utilityModule` +## [0.0.0.3] - 2022-09-26 + +Consensus logic + +- Pass in a list of messages to `findHighQC` instead of a hotstuff step +- Made `CreateProposeMessage` and `CreateVotemessage` accept explicit values, identifying some bugs along the way +- Made sure to call `applyBlock` when using `highQC` from previous round +- Moved business logic for `prepareAndApplyBlock` into `hotstuff_leader.go` +- Removed `MaxBlockBytes` and storing the consensus genesis type locally as is + +Consensus cleanup + +- Using appropriate getters for protocol types in the hotstuff lifecycle +- Replaced `proto.Marshal` with `codec.GetCodec().Marshal` +- Reorganized and cleaned up the code in `consensus/block.go` +- Consolidated & removed a few `TODO`s throughout the consensus module +- Added TECHDEBT and TODOs that will be require for a real block lifecycle +- Fixed typo in `hotstuff_types.proto` +- Moved the hotstuff handler interface to `consensus/hotstuff_handler.go` + +Consensus testing + +- Improved mock module initialization in `consensus/consensus_tests/utils_test.go` + +General + +- Added a diagram for `AppHash` related `ContextInitialization` +- Added `Makefile` keywords for `TODO` + ## [0.0.0.2] - 2022-08-25 + **Encapsulate structures previously in shared [#163](github.com/pokt-network/pocket/issues/163)** + - Ensured proto structures implement shared interfaces - `ConsensusConfig` uses shared interfaces in order to accept `MockConsensusConfig` in test_artifacts - `ConsensusGenesisState` uses shared interfaces in order to accept `MockConsensusGenesisState` in test_artifacts @@ -19,9 +51,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.0.0.1] - 2021-03-31 -HotPocket 1st Iteration (https://github.com/pokt-network/pocket/pull/48) - -# Added +### Added new libraries: HotPocket 1st iteration - Initial implementation of Basic Hotstuff - Initial implementation Hotstuff Pacemaker @@ -32,9 +62,7 @@ HotPocket 1st Iteration (https://github.com/pokt-network/pocket/pull/48) ## [0.0.0.0] - 2021-03-31 -VRF & Cryptographic Sortition Libraries (https://github.com/pokt-network/pocket/pull/37/files) - -### Added +### Added new libraries: VRF & Cryptographic Sortition Libraries - Tests with `make test_vrf` and `make test_sortition` - Benchmarking via `make benchmark_sortition` diff --git a/consensus/block.go b/consensus/block.go index 536c3cc74..a70c6cecc 100644 --- a/consensus/block.go +++ b/consensus/block.go @@ -1,107 +1,13 @@ package consensus import ( - "encoding/hex" + "log" "unsafe" - "github.com/pokt-network/pocket/shared/codec" - typesCons "github.com/pokt-network/pocket/consensus/types" + "github.com/pokt-network/pocket/shared/codec" ) -// TODO(olshansky): Sync with Andrew on the type of validation we need here. -func (m *ConsensusModule) validateBlock(block *typesCons.Block) error { - if block == nil { - return typesCons.ErrNilBlock - } - return nil -} - -// This is a helper function intended to be called by a leader/validator during a view change -func (m *ConsensusModule) prepareBlockAsLeader() (*typesCons.Block, error) { - if m.isReplica() { - return nil, typesCons.ErrReplicaPrepareBlock - } - - if err := m.refreshUtilityContext(); err != nil { - return nil, err - } - - txs, err := m.utilityContext.GetProposalTransactions(m.privateKey.Address(), maxTxBytes, lastByzValidators) - if err != nil { - return nil, err - } - - appHash, err := m.utilityContext.ApplyBlock(int64(m.Height), m.privateKey.Address(), txs, lastByzValidators) - if err != nil { - return nil, err - } - - blockHeader := &typesCons.BlockHeader{ - Height: int64(m.Height), - Hash: hex.EncodeToString(appHash), - NumTxs: uint32(len(txs)), - LastBlockHash: m.appHash, - ProposerAddress: m.privateKey.Address().Bytes(), - QuorumCertificate: []byte("HACK: Temporary placeholder"), - } - - block := &typesCons.Block{ - BlockHeader: blockHeader, - Transactions: txs, - } - - return block, nil -} - -// This is a helper function intended to be called by a replica/voter during a view change -func (m *ConsensusModule) applyBlockAsReplica(block *typesCons.Block) error { - if m.isLeader() { - return typesCons.ErrLeaderApplyBLock - } - - // TODO(olshansky): Add unit tests to verify this. - if unsafe.Sizeof(*block) > uintptr(m.MaxBlockBytes) { - return typesCons.ErrInvalidBlockSize(uint64(unsafe.Sizeof(*block)), m.MaxBlockBytes) - } - - if err := m.refreshUtilityContext(); err != nil { - return err - } - - appHash, err := m.utilityContext.ApplyBlock(int64(m.Height), block.BlockHeader.ProposerAddress, block.Transactions, lastByzValidators) - if err != nil { - return err - } - - // DISCUSS(drewsky): Is `ApplyBlock` going to return blockHash or appHash? - if block.BlockHeader.Hash != hex.EncodeToString(appHash) { - return typesCons.ErrInvalidAppHash(block.BlockHeader.Hash, hex.EncodeToString(appHash)) - } - - return nil -} - -// Creates a new Utility context and clears/nullifies any previous contexts if they exist -func (m *ConsensusModule) refreshUtilityContext() error { - // This is a catch-all to release the previous utility context if it wasn't cleaned up - // in the proper lifecycle (e.g. catch up, error, network partition, etc...). Ideally, this - // should not be called. - if m.utilityContext != nil { - m.nodeLog(typesCons.NilUtilityContextWarning) - m.utilityContext.ReleaseContext() - m.utilityContext = nil - } - - utilityContext, err := m.GetBus().GetUtilityModule().NewContext(int64(m.Height)) - if err != nil { - return err - } - - m.utilityContext = utilityContext - return nil -} - func (m *ConsensusModule) commitBlock(block *typesCons.Block) error { m.nodeLog(typesCons.CommittingBlock(m.Height, len(block.Transactions))) @@ -130,7 +36,7 @@ func (m *ConsensusModule) commitBlock(block *typesCons.Block) error { m.utilityContext.ReleaseContext() m.utilityContext = nil - m.appHash = block.BlockHeader.Hash + m.lastAppHash = block.BlockHeader.Hash return nil } @@ -149,3 +55,49 @@ func (m *ConsensusModule) storeBlock(block *typesCons.Block, blockProtoBytes []b } return nil } + +// TODO: Add unit tests specific to block validation +func (m *ConsensusModule) validateBlockBasic(block *typesCons.Block) error { + if block == nil && m.Step != NewRound { + return typesCons.ErrNilBlock + } + + if block != nil && m.Step == NewRound { + return typesCons.ErrBlockExists + } + + if block != nil && unsafe.Sizeof(*block) > uintptr(m.consGenesis.MaxBlockBytes) { + return typesCons.ErrInvalidBlockSize(uint64(unsafe.Sizeof(*block)), m.consGenesis.MaxBlockBytes) + } + + // If the current block being processed (i.e. voted on) by consensus is non nil, we need to make + // sure that the data (height, round, step, txs, etc) is the same before we start validating the signatures + if m.Block != nil { + // DISCUSS: The only difference between blocks from one step to another is the QC, so we need + // to determine where/how to validate this + if protoHash(m.Block) != protoHash(block) { + log.Println("[TECHDEBT][ERROR] The block being processed is not the same as that received by the consensus module ") + } + } + + return nil +} + +// Creates a new Utility context and clears/nullifies any previous contexts if they exist +func (m *ConsensusModule) refreshUtilityContext() error { + // Catch-all structure to release the previous utility context if it wasn't properly cleaned up. + // Ideally, this should not be called. + if m.utilityContext != nil { + m.nodeLog(typesCons.NilUtilityContextWarning) + m.utilityContext.ReleaseContext() + m.utilityContext = nil + } + + utilityContext, err := m.GetBus().GetUtilityModule().NewContext(int64(m.Height)) + if err != nil { + return err + } + + m.utilityContext = utilityContext + return nil +} diff --git a/consensus/consensus_tests/pacemaker_test.go b/consensus/consensus_tests/pacemaker_test.go index 4134f17cc..80bf64fc4 100644 --- a/consensus/consensus_tests/pacemaker_test.go +++ b/consensus/consensus_tests/pacemaker_test.go @@ -157,22 +157,29 @@ func TestPacemakerCatchupSameStepDifferentRounds(t *testing.T) { Transactions: emptyTxs, } - leaderConsensusMod := GetConsensusModImplementation(leader) + leaderConsensusMod := GetConsensusModElem(leader) leaderConsensusMod.FieldByName("Block").Set(reflect.ValueOf(block)) // Set all nodes to the same STEP and HEIGHT BUT different ROUNDS for _, pocketNode := range pocketNodes { - consensusModImpl := GetConsensusModImplementation(pocketNode) - consensusModImpl.FieldByName("Height").SetUint(testHeight) - consensusModImpl.FieldByName("Step").SetInt(testStep) - consensusModImpl.FieldByName("LeaderId").Set(reflect.Zero(reflect.TypeOf(&leaderId))) // This is re-elected during paceMaker catchup + // utilityContext is only set on new rounds, which is skipped in this test + utilityContext, err := pocketNode.GetBus().GetUtilityModule().NewContext(int64(testHeight)) + require.NoError(t, err) + + consensusModElem := GetConsensusModElem(pocketNode) + consensusModElem.FieldByName("Height").SetUint(testHeight) + consensusModElem.FieldByName("Step").SetInt(testStep) + consensusModElem.FieldByName("LeaderId").Set(reflect.Zero(reflect.TypeOf(&leaderId))) // This is re-elected during paceMaker catchup + + consensusModImpl := GetConsensusModImpl(pocketNode) + consensusModImpl.MethodByName("SetUtilityContext").Call([]reflect.Value{reflect.ValueOf(utilityContext)}) } // Set the leader to be in the highest round. - GetConsensusModImplementation(pocketNodes[1]).FieldByName("Round").SetUint(uint64(leaderRound - 2)) - GetConsensusModImplementation(pocketNodes[2]).FieldByName("Round").SetUint(uint64(leaderRound - 3)) - GetConsensusModImplementation(pocketNodes[leaderId]).FieldByName("Round").SetUint(uint64(leaderRound)) - GetConsensusModImplementation(pocketNodes[4]).FieldByName("Round").SetUint(uint64(leaderRound - 4)) + GetConsensusModElem(pocketNodes[1]).FieldByName("Round").SetUint(uint64(leaderRound - 2)) + GetConsensusModElem(pocketNodes[2]).FieldByName("Round").SetUint(uint64(leaderRound - 3)) + GetConsensusModElem(pocketNodes[leaderId]).FieldByName("Round").SetUint(uint64(leaderRound)) + GetConsensusModElem(pocketNodes[4]).FieldByName("Round").SetUint(uint64(leaderRound - 4)) prepareProposal := &typesCons.HotstuffMessage{ Type: consensus.Propose, diff --git a/consensus/consensus_tests/utils_test.go b/consensus/consensus_tests/utils_test.go index 4903a9abd..3bda5de07 100644 --- a/consensus/consensus_tests/utils_test.go +++ b/consensus/consensus_tests/utils_test.go @@ -14,6 +14,8 @@ import ( "testing" "time" + "github.com/pokt-network/pocket/shared/codec" + "github.com/benbjohnson/clock" "github.com/golang/mock/gomock" "github.com/pokt-network/pocket/consensus" @@ -25,7 +27,6 @@ import ( modulesMock "github.com/pokt-network/pocket/shared/modules/mocks" "github.com/pokt-network/pocket/shared/test_artifacts" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" ) @@ -180,13 +181,17 @@ func StartAllTestPocketNodes(t *testing.T, pocketNodes IdToNodeMapping) { // define the interfaces used for debug/development. The latter will probably scale more but will // require more effort and pollute the source code with debugging information. func GetConsensusNodeState(node *shared.Node) typesCons.ConsensusNodeState { - return reflect.ValueOf(node.GetBus().GetConsensusModule()).MethodByName("GetNodeState").Call([]reflect.Value{})[0].Interface().(typesCons.ConsensusNodeState) + return GetConsensusModImpl(node).MethodByName("GetNodeState").Call([]reflect.Value{})[0].Interface().(typesCons.ConsensusNodeState) } -func GetConsensusModImplementation(node *shared.Node) reflect.Value { +func GetConsensusModElem(node *shared.Node) reflect.Value { return reflect.ValueOf(node.GetBus().GetConsensusModule()).Elem() } +func GetConsensusModImpl(node *shared.Node) reflect.Value { + return reflect.ValueOf(node.GetBus().GetConsensusModule()) +} + /*** Debug/Development Message Helpers ***/ func TriggerNextView(t *testing.T, node *shared.Node) { @@ -230,10 +235,12 @@ func WaitForNetworkConsensusMessages( ) (messages []*anypb.Any, err error) { includeFilter := func(m *anypb.Any) bool { - var hotstuffMessage typesCons.HotstuffMessage - err := anypb.UnmarshalTo(m, &hotstuffMessage, proto.UnmarshalOptions{}) + msg, err := codec.GetCodec().FromAny(m) require.NoError(t, err) + hotstuffMessage, ok := msg.(*typesCons.HotstuffMessage) + require.True(t, ok) + return hotstuffMessage.Type == hotstuffMsgType && hotstuffMessage.Step == step } @@ -348,8 +355,7 @@ func baseP2PMock(t *testing.T, testChannel modules.EventsChannel) *modulesMock.M func baseUtilityMock(t *testing.T, _ modules.EventsChannel) *modulesMock.MockUtilityModule { ctrl := gomock.NewController(t) utilityMock := modulesMock.NewMockUtilityModule(ctrl) - utilityContextMock := modulesMock.NewMockUtilityContext(ctrl) - persistenceContextMock := modulesMock.NewMockPersistenceRWContext(ctrl) + utilityContextMock := baseUtilityContextMock(t) utilityMock.EXPECT().Start().Return(nil).AnyTimes() utilityMock.EXPECT().SetBus(gomock.Any()).Do(func(modules.Bus) {}).AnyTimes() @@ -358,9 +364,14 @@ func baseUtilityMock(t *testing.T, _ modules.EventsChannel) *modulesMock.MockUti Return(utilityContextMock, nil). MaxTimes(4) - utilityContextMock.EXPECT().GetPersistenceContext().Return(persistenceContextMock).AnyTimes() - utilityContextMock.EXPECT().CommitPersistenceContext().Return(nil).AnyTimes() - utilityContextMock.EXPECT().ReleaseContext().Return().AnyTimes() + return utilityMock +} + +func baseUtilityContextMock(t *testing.T) *modulesMock.MockUtilityContext { + ctrl := gomock.NewController(t) + utilityContextMock := modulesMock.NewMockUtilityContext(ctrl) + persistenceContextMock := modulesMock.NewMockPersistenceRWContext(ctrl) + utilityContextMock.EXPECT(). GetProposalTransactions(gomock.Any(), maxTxBytes, gomock.AssignableToTypeOf(emptyByzValidators)). Return(make([][]byte, 0), nil). @@ -369,12 +380,15 @@ func baseUtilityMock(t *testing.T, _ modules.EventsChannel) *modulesMock.MockUti ApplyBlock(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(appHash, nil). AnyTimes() + utilityContextMock.EXPECT().CommitPersistenceContext().Return(nil).AnyTimes() + utilityContextMock.EXPECT().ReleaseContext().Return().AnyTimes() + utilityContextMock.EXPECT().GetPersistenceContext().Return(persistenceContextMock).AnyTimes() - persistenceContextMock.EXPECT().Commit().Return(nil).AnyTimes() - persistenceContextMock.EXPECT().StoreBlock(gomock.Any()).AnyTimes().Return(nil) + persistenceContextMock.EXPECT().StoreTransaction(gomock.Any()).Return(nil).AnyTimes() + persistenceContextMock.EXPECT().StoreBlock(gomock.Any()).Return(nil).AnyTimes() persistenceContextMock.EXPECT().InsertBlock(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) - return utilityMock + return utilityContextMock } func baseTelemetryMock(t *testing.T, _ modules.EventsChannel) *modulesMock.MockTelemetryModule { @@ -385,26 +399,24 @@ func baseTelemetryMock(t *testing.T, _ modules.EventsChannel) *modulesMock.MockT telemetryMock.EXPECT().Start().Do(func() {}).AnyTimes() telemetryMock.EXPECT().SetBus(gomock.Any()).Do(func(modules.Bus) {}).AnyTimes() - telemetryMock.EXPECT().GetTimeSeriesAgent().Return(timeSeriesAgentMock).AnyTimes() - timeSeriesAgentMock.EXPECT().CounterRegister(gomock.Any(), gomock.Any()).MaxTimes(1) - timeSeriesAgentMock.EXPECT().CounterIncrement(gomock.Any()).AnyTimes() - telemetryMock.EXPECT().GetEventMetricsAgent().Return(eventMetricsAgentMock).AnyTimes() - eventMetricsAgentMock.EXPECT().EmitEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() return telemetryMock } func baseTelemetryTimeSeriesAgentMock(t *testing.T) *modulesMock.MockTimeSeriesAgent { ctrl := gomock.NewController(t) - timeseriesAgentMock := modulesMock.NewMockTimeSeriesAgent(ctrl) - return timeseriesAgentMock + timeSeriesAgentMock := modulesMock.NewMockTimeSeriesAgent(ctrl) + timeSeriesAgentMock.EXPECT().CounterRegister(gomock.Any(), gomock.Any()).MaxTimes(1) + timeSeriesAgentMock.EXPECT().CounterIncrement(gomock.Any()).AnyTimes() + return timeSeriesAgentMock } func baseTelemetryEventMetricsAgentMock(t *testing.T) *modulesMock.MockEventMetricsAgent { ctrl := gomock.NewController(t) eventMetricsAgentMock := modulesMock.NewMockEventMetricsAgent(ctrl) + eventMetricsAgentMock.EXPECT().EmitEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() return eventMetricsAgentMock } diff --git a/consensus/debugging.go b/consensus/debugging.go index 8d433db4d..a73dad40e 100644 --- a/consensus/debugging.go +++ b/consensus/debugging.go @@ -3,9 +3,8 @@ package consensus import ( "log" - "github.com/pokt-network/pocket/shared/debug" - typesCons "github.com/pokt-network/pocket/consensus/types" + "github.com/pokt-network/pocket/shared/debug" ) func (m *ConsensusModule) HandleDebugMessage(debugMessage *debug.DebugMessage) error { @@ -32,7 +31,7 @@ func (m *ConsensusModule) GetNodeState() typesCons.ConsensusNodeState { leaderId = *m.LeaderId } return typesCons.ConsensusNodeState{ - NodeId: m.NodeId, + NodeId: m.nodeId, Height: m.Height, Round: uint8(m.Round), Step: uint8(m.Step), diff --git a/consensus/helpers.go b/consensus/helpers.go index efd6dac16..364a9d26e 100644 --- a/consensus/helpers.go +++ b/consensus/helpers.go @@ -1,21 +1,21 @@ package consensus +// TODO: Split this file into multiple helpers (e.g. signatures.go, hotstuff_helpers.go, etc...) import ( "encoding/base64" "log" + "github.com/pokt-network/pocket/shared/codec" "github.com/pokt-network/pocket/shared/debug" "google.golang.org/protobuf/proto" typesCons "github.com/pokt-network/pocket/consensus/types" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" - "google.golang.org/protobuf/types/known/anypb" ) // These constants and variables are wrappers around the autogenerated protobuf types and were // added to simply make the code in the `consensus` module more readable. - const ( NewRound = typesCons.HotstuffStep_HOTSTUFF_STEP_NEWROUND Prepare = typesCons.HotstuffStep_HOTSTUFF_STEP_PREPARE @@ -23,37 +23,36 @@ const ( Commit = typesCons.HotstuffStep_HOTSTUFF_STEP_COMMIT Decide = typesCons.HotstuffStep_HOTSTUFF_STEP_DECIDE + Propose = typesCons.HotstuffMessageType_HOTSTUFF_MESSAGE_PROPOSE + Vote = typesCons.HotstuffMessageType_HOTSTUFF_MESSAGE_VOTE + ByzantineThreshold = float64(2) / float64(3) - HotstuffMessage = "consensus.HotstuffMessage" - UtilityMessage = "consensus.UtilityMessage" - Propose = typesCons.HotstuffMessageType_HOTSTUFF_MESAGE_PROPOSE - Vote = typesCons.HotstuffMessageType_HOTSTUFF_MESSAGE_VOTE + + HotstuffMessage = "consensus.HotstuffMessage" + UtilityMessage = "consensus.UtilityMessage" ) var ( HotstuffSteps = [...]typesCons.HotstuffStep{NewRound, Prepare, PreCommit, Commit, Decide} - - maxTxBytes = 90000 // TODO(olshansky): Move this to config.json. - lastByzValidators = make([][]byte, 0) // TODO(olshansky): Retrieve this from persistence ) // ** Hotstuff Helpers ** // +// IMPROVE: Avoid having the `ConsensusModule` be a receiver of this; making it more functional. +// TODO: Add unit tests for all quorumCert creation & validation logic... func (m *ConsensusModule) getQuorumCertificate(height uint64, step typesCons.HotstuffStep, round uint64) (*typesCons.QuorumCertificate, error) { var pss []*typesCons.PartialSignature - for _, msg := range m.MessagePool[step] { - // TODO(olshansky): Add tests for this + for _, msg := range m.messagePool[step] { if msg.GetPartialSignature() == nil { m.nodeLog(typesCons.WarnMissingPartialSig(msg)) continue } - // TODO(olshansky): Add tests for this - if msg.Height != height || msg.Step != step || msg.Round != round { + if msg.GetHeight() != height || msg.GetStep() != step || msg.GetRound() != round { m.nodeLog(typesCons.WarnUnexpectedMessageInPool(msg, height, step, round)) continue } - ps := msg.GetPartialSignature() + ps := msg.GetPartialSignature() if ps.Signature == nil || len(ps.Address) == 0 { m.nodeLog(typesCons.WarnIncompletePartialSig(ps, msg)) continue @@ -71,16 +70,16 @@ func (m *ConsensusModule) getQuorumCertificate(height uint64, step typesCons.Hot } return &typesCons.QuorumCertificate{ - Height: m.Height, + Height: height, Step: step, - Round: m.Round, + Round: round, Block: m.Block, ThresholdSignature: thresholdSig, }, nil } -func (m *ConsensusModule) findHighQC(step typesCons.HotstuffStep) (qc *typesCons.QuorumCertificate) { - for _, m := range m.MessagePool[step] { +func (m *ConsensusModule) findHighQC(msgs []*typesCons.HotstuffMessage) (qc *typesCons.QuorumCertificate) { + for _, m := range msgs { if m.GetQuorumCertificate() == nil { continue } @@ -91,21 +90,20 @@ func (m *ConsensusModule) findHighQC(step typesCons.HotstuffStep) (qc *typesCons return } -func getThresholdSignature( - partialSigs []*typesCons.PartialSignature) (*typesCons.ThresholdSignature, error) { +func getThresholdSignature(partialSigs []*typesCons.PartialSignature) (*typesCons.ThresholdSignature, error) { thresholdSig := new(typesCons.ThresholdSignature) thresholdSig.Signatures = make([]*typesCons.PartialSignature, len(partialSigs)) copy(thresholdSig.Signatures, partialSigs) return thresholdSig, nil } -func isSignatureValid(m *typesCons.HotstuffMessage, pubKeyString string, signature []byte) bool { +func isSignatureValid(msg *typesCons.HotstuffMessage, pubKeyString string, signature []byte) bool { pubKey, err := cryptoPocket.NewPublicKey(pubKeyString) if err != nil { log.Println("[WARN] Error getting PublicKey from bytes:", err) return false } - bytesToVerify, err := getSignableBytes(m) + bytesToVerify, err := getSignableBytes(msg) if err != nil { log.Println("[WARN] Error getting bytes to verify:", err) return false @@ -114,7 +112,7 @@ func isSignatureValid(m *typesCons.HotstuffMessage, pubKeyString string, signatu } func (m *ConsensusModule) didReceiveEnoughMessageForStep(step typesCons.HotstuffStep) error { - return m.isOptimisticThresholdMet(len(m.MessagePool[step])) + return m.isOptimisticThresholdMet(len(m.messagePool[step])) } func (m *ConsensusModule) isOptimisticThresholdMet(n int) error { @@ -128,12 +126,12 @@ func (m *ConsensusModule) isOptimisticThresholdMet(n int) error { func (m *ConsensusModule) resetForNewHeight() { m.Round = 0 m.Block = nil - m.HighPrepareQC = nil - m.LockedQC = nil + m.highPrepareQC = nil + m.lockedQC = nil } func protoHash(m proto.Message) string { - b, err := proto.Marshal(m) + b, err := codec.GetCodec().Marshal(m) if err != nil { log.Fatalf("Could not marshal proto message: %v", err) } @@ -150,13 +148,12 @@ func (m *ConsensusModule) sendToNode(msg *typesCons.HotstuffMessage) { } m.nodeLog(typesCons.SendingMessage(msg, *m.LeaderId)) - anyConsensusMessage, err := anypb.New(msg) + anyConsensusMessage, err := codec.GetCodec().ToAny(msg) if err != nil { m.nodeLogError(typesCons.ErrCreateConsensusMessage.Error(), err) return } - - if err := m.GetBus().GetP2PModule().Send(cryptoPocket.AddressFromString(m.IdToValAddrMap[*m.LeaderId]), anyConsensusMessage, debug.PocketTopic_CONSENSUS_MESSAGE_TOPIC); err != nil { + if err := m.GetBus().GetP2PModule().Send(cryptoPocket.AddressFromString(m.idToValAddrMap[*m.LeaderId]), anyConsensusMessage, debug.PocketTopic_CONSENSUS_MESSAGE_TOPIC); err != nil { m.nodeLogError(typesCons.ErrSendMessage.Error(), err) return } @@ -164,12 +161,11 @@ func (m *ConsensusModule) sendToNode(msg *typesCons.HotstuffMessage) { func (m *ConsensusModule) broadcastToNodes(msg *typesCons.HotstuffMessage) { m.nodeLog(typesCons.BroadcastingMessage(msg)) - anyConsensusMessage, err := anypb.New(msg) + anyConsensusMessage, err := codec.GetCodec().ToAny(msg) if err != nil { m.nodeLogError(typesCons.ErrCreateConsensusMessage.Error(), err) return } - if err := m.GetBus().GetP2PModule().Broadcast(anyConsensusMessage, debug.PocketTopic_CONSENSUS_MESSAGE_TOPIC); err != nil { m.nodeLogError(typesCons.ErrBroadcastMessage.Error(), err) return @@ -178,9 +174,10 @@ func (m *ConsensusModule) broadcastToNodes(msg *typesCons.HotstuffMessage) { /*** Persistence Helpers ***/ +// TECHDEBT: Integrate this with the `persistence` module or a real mempool. func (m *ConsensusModule) clearMessagesPool() { for _, step := range HotstuffSteps { - m.MessagePool[step] = make([]*typesCons.HotstuffMessage, 0) + m.messagePool[step] = make([]*typesCons.HotstuffMessage, 0) } } @@ -191,7 +188,7 @@ func (m *ConsensusModule) isLeaderUnknown() bool { } func (m *ConsensusModule) isLeader() bool { - return m.LeaderId != nil && *m.LeaderId == m.NodeId + return m.LeaderId != nil && *m.LeaderId == m.nodeId } func (m *ConsensusModule) isReplica() bool { @@ -203,33 +200,37 @@ func (m *ConsensusModule) clearLeader() { m.LeaderId = nil } -func (m *ConsensusModule) electNextLeader(message *typesCons.HotstuffMessage) { +func (m *ConsensusModule) electNextLeader(message *typesCons.HotstuffMessage) error { leaderId, err := m.leaderElectionMod.ElectNextLeader(message) if err != nil || leaderId == 0 { m.nodeLogError(typesCons.ErrLeaderElection(message).Error(), err) m.clearLeader() - return + return err } m.LeaderId = &leaderId if m.isLeader() { m.setLogPrefix("LEADER") - m.nodeLog(typesCons.ElectedSelfAsNewLeader(m.IdToValAddrMap[*m.LeaderId], *m.LeaderId, m.Height, m.Round)) + m.nodeLog(typesCons.ElectedSelfAsNewLeader(m.idToValAddrMap[*m.LeaderId], *m.LeaderId, m.Height, m.Round)) } else { m.setLogPrefix("REPLICA") - m.nodeLog(typesCons.ElectedNewLeader(m.IdToValAddrMap[*m.LeaderId], *m.LeaderId, m.Height, m.Round)) + m.nodeLog(typesCons.ElectedNewLeader(m.idToValAddrMap[*m.LeaderId], *m.LeaderId, m.Height, m.Round)) } + + return nil } /*** General Infrastructure Helpers ***/ +// TODO(#164): Remove this once we have a proper logging system. func (m *ConsensusModule) nodeLog(s string) { - log.Printf("[%s][%d] %s\n", m.logPrefix, m.NodeId, s) + log.Printf("[%s][%d] %s\n", m.logPrefix, m.nodeId, s) } +// TODO(#164): Remove this once we have a proper logging system. func (m *ConsensusModule) nodeLogError(s string, err error) { - log.Printf("[ERROR][%s][%d] %s: %v\n", m.logPrefix, m.NodeId, s, err) + log.Printf("[ERROR][%s][%d] %s: %v\n", m.logPrefix, m.nodeId, s, err) } func (m *ConsensusModule) setLogPrefix(logPrefix string) { diff --git a/consensus/hotstuff_handler.go b/consensus/hotstuff_handler.go new file mode 100644 index 000000000..274a1b45b --- /dev/null +++ b/consensus/hotstuff_handler.go @@ -0,0 +1,55 @@ +package consensus + +import ( + typesCons "github.com/pokt-network/pocket/consensus/types" +) + +// DISCUSS: Should these functions return an error? +type HotstuffMessageHandler interface { + HandleNewRoundMessage(*ConsensusModule, *typesCons.HotstuffMessage) + HandlePrepareMessage(*ConsensusModule, *typesCons.HotstuffMessage) + HandlePrecommitMessage(*ConsensusModule, *typesCons.HotstuffMessage) + HandleCommitMessage(*ConsensusModule, *typesCons.HotstuffMessage) + HandleDecideMessage(*ConsensusModule, *typesCons.HotstuffMessage) +} + +func (m *ConsensusModule) handleHotstuffMessage(msg *typesCons.HotstuffMessage) error { + m.nodeLog(typesCons.DebugHandlingHotstuffMessage(msg)) + + step := msg.GetStep() + + // Pacemaker - Liveness & safety checks + if err := m.paceMaker.ValidateMessage(msg); err != nil { + if m.shouldLogHotstuffDiscardMessage(step) { + m.nodeLog(typesCons.WarnDiscardHotstuffMessage(msg, err.Error())) + } + return err + } + + if m.shouldElectNextLeader() { + if err := m.electNextLeader(msg); err != nil { + return err + } + } + + // Hotstuff - Handle message + if m.isReplica() { + replicaHandlers[step](m, msg) + } + // Note that the leader also acts as a replica, but this logic is implemented in the underlying code. + leaderHandlers[step](m, msg) + + return nil +} + +func (m *ConsensusModule) shouldElectNextLeader() bool { + // Execute leader election if there is no leader and we are in a new round + return m.Step == NewRound && m.LeaderId == nil +} + +func (m *ConsensusModule) shouldLogHotstuffDiscardMessage(step typesCons.HotstuffStep) bool { + // If a replica is not a leader for this round, but has already determined a leader, + // and continues to receive NewRound messages, we avoid logging the "message discard" + // because it creates unnecessary spam. + return !(m.LeaderId != nil && !m.isLeader() && step == NewRound) +} diff --git a/consensus/hotstuff_leader.go b/consensus/hotstuff_leader.go index d691ebc0c..54f41e0dd 100644 --- a/consensus/hotstuff_leader.go +++ b/consensus/hotstuff_leader.go @@ -1,12 +1,15 @@ package consensus import ( + "encoding/hex" "unsafe" consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry" typesCons "github.com/pokt-network/pocket/consensus/types" ) +type HotstuffLeaderMessageHandler struct{} + var ( LeaderMessageHandler HotstuffMessageHandler = &HotstuffLeaderMessageHandler{} leaderHandlers = map[typesCons.HotstuffStep]func(*ConsensusModule, *typesCons.HotstuffMessage){ @@ -18,32 +21,39 @@ var ( } ) -type HotstuffLeaderMessageHandler struct{} - /*** Prepare Step ***/ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *ConsensusModule, msg *typesCons.HotstuffMessage) { + defer m.paceMaker.RestartTimer() handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) return } - // TODO(olshansky): add step specific validation + + // DISCUSS: Do we need to pause for `MinBlockFreqMSec` here to let more transactions or should we stick with optimistic responsiveness? + if err := m.didReceiveEnoughMessageForStep(NewRound); err != nil { m.nodeLog(typesCons.OptimisticVoteCountWaiting(NewRound, err.Error())) return } - - // TODO(olshansky): Do we need to pause for `MinBlockFreqMSec` here to let more transactions come in? m.nodeLog(typesCons.OptimisticVoteCountPassed(NewRound)) + // Clear the previous utility context, if it exists, and create a new one + if err := m.refreshUtilityContext(); err != nil { + m.nodeLogError("Could not refresh utility context", err) + return + } + // Likely to be `nil` if blockchain is progressing well. - highPrepareQC := m.findHighQC(NewRound) + // TECHDEBT: How do we properly validate `highPrepareQC` here? + highPrepareQC := m.findHighQC(m.messagePool[NewRound]) - // TODO(olshansky): Add more unit tests for these checks... - if highPrepareQC == nil || highPrepareQC.Height < m.Height || highPrepareQC.Round < m.Round { - block, err := m.prepareBlockAsLeader() + // TODO: Add more unit tests for these checks... + if m.shouldPrepareNewBlock(highPrepareQC) { + // Leader prepares a new block if `highPrepareQC` is not applicable + block, err := m.prepareAndApplyBlock() if err != nil { m.nodeLogError(typesCons.ErrPrepareBlock.Error(), err) m.paceMaker.InterruptRound() @@ -51,15 +61,20 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *ConsensusM } m.Block = block } else { - // TODO(discuss): Do we need to validate highPrepareQC here? + // DISCUSS: Do we need to call `validateProposal` here? + // Leader acts like a replica if `highPrepareQC` is not `nil` + if err := m.applyBlock(highPrepareQC.Block); err != nil { + m.nodeLogError(typesCons.ErrApplyBlock.Error(), err) + m.paceMaker.InterruptRound() + return + } m.Block = highPrepareQC.Block } m.Step = Prepare - m.MessagePool[NewRound] = nil - m.paceMaker.RestartTimer() + m.messagePool[NewRound] = nil - prepareProposeMessage, err := CreateProposeMessage(m, Prepare, highPrepareQC) + prepareProposeMessage, err := CreateProposeMessage(m.Height, m.Round, Prepare, m.Block, highPrepareQC) if err != nil { m.nodeLogError(typesCons.ErrCreateProposeMessage(Prepare).Error(), err) m.paceMaker.InterruptRound() @@ -68,10 +83,10 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *ConsensusM m.broadcastToNodes(prepareProposeMessage) // Leader also acts like a replica - prepareVoteMessage, err := CreateVoteMessage(m, Prepare, m.Block) + prepareVoteMessage, err := CreateVoteMessage(m.Height, m.Round, Prepare, m.Block, m.privateKey) if err != nil { m.nodeLogError(typesCons.ErrCreateVoteMessage(Prepare).Error(), err) - return // TODO(olshansky): Should we interrupt the round here? + return } m.sendToNode(prepareVoteMessage) } @@ -79,13 +94,14 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *ConsensusM /*** PreCommit Step ***/ func (handler *HotstuffLeaderMessageHandler) HandlePrepareMessage(m *ConsensusModule, msg *typesCons.HotstuffMessage) { + defer m.paceMaker.RestartTimer() handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) return } - // TODO(olshansky): add step specific validation + if err := m.didReceiveEnoughMessageForStep(Prepare); err != nil { m.nodeLog(typesCons.OptimisticVoteCountWaiting(Prepare, err.Error())) return @@ -99,23 +115,22 @@ func (handler *HotstuffLeaderMessageHandler) HandlePrepareMessage(m *ConsensusMo } m.Step = PreCommit - m.HighPrepareQC = prepareQC - m.MessagePool[Prepare] = nil - m.paceMaker.RestartTimer() + m.highPrepareQC = prepareQC + m.messagePool[Prepare] = nil - precommitProposeMessages, err := CreateProposeMessage(m, PreCommit, prepareQC) + preCommitProposeMessage, err := CreateProposeMessage(m.Height, m.Round, PreCommit, m.Block, prepareQC) if err != nil { m.nodeLogError(typesCons.ErrCreateProposeMessage(PreCommit).Error(), err) m.paceMaker.InterruptRound() return } - m.broadcastToNodes(precommitProposeMessages) + m.broadcastToNodes(preCommitProposeMessage) // Leader also acts like a replica - precommitVoteMessage, err := CreateVoteMessage(m, PreCommit, m.Block) + precommitVoteMessage, err := CreateVoteMessage(m.Height, m.Round, PreCommit, m.Block, m.privateKey) if err != nil { m.nodeLogError(typesCons.ErrCreateVoteMessage(PreCommit).Error(), err) - return // TODO(olshansky): Should we interrupt the round here? + return } m.sendToNode(precommitVoteMessage) } @@ -123,13 +138,14 @@ func (handler *HotstuffLeaderMessageHandler) HandlePrepareMessage(m *ConsensusMo /*** Commit Step ***/ func (handler *HotstuffLeaderMessageHandler) HandlePrecommitMessage(m *ConsensusModule, msg *typesCons.HotstuffMessage) { + defer m.paceMaker.RestartTimer() handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) return } - // TODO(olshansky): add step specific validation + if err := m.didReceiveEnoughMessageForStep(PreCommit); err != nil { m.nodeLog(typesCons.OptimisticVoteCountWaiting(PreCommit, err.Error())) return @@ -143,11 +159,10 @@ func (handler *HotstuffLeaderMessageHandler) HandlePrecommitMessage(m *Consensus } m.Step = Commit - m.LockedQC = preCommitQC - m.MessagePool[PreCommit] = nil - m.paceMaker.RestartTimer() + m.lockedQC = preCommitQC + m.messagePool[PreCommit] = nil - commitProposeMessage, err := CreateProposeMessage(m, Commit, preCommitQC) + commitProposeMessage, err := CreateProposeMessage(m.Height, m.Round, Commit, m.Block, preCommitQC) if err != nil { m.nodeLogError(typesCons.ErrCreateProposeMessage(Commit).Error(), err) m.paceMaker.InterruptRound() @@ -156,10 +171,10 @@ func (handler *HotstuffLeaderMessageHandler) HandlePrecommitMessage(m *Consensus m.broadcastToNodes(commitProposeMessage) // Leader also acts like a replica - commitVoteMessage, err := CreateVoteMessage(m, Commit, m.Block) + commitVoteMessage, err := CreateVoteMessage(m.Height, m.Round, Commit, m.Block, m.privateKey) if err != nil { m.nodeLogError(typesCons.ErrCreateVoteMessage(Commit).Error(), err) - return // TODO(olshansky): Should we interrupt the round here? + return } m.sendToNode(commitVoteMessage) } @@ -167,13 +182,14 @@ func (handler *HotstuffLeaderMessageHandler) HandlePrecommitMessage(m *Consensus /*** Decide Step ***/ func (handler *HotstuffLeaderMessageHandler) HandleCommitMessage(m *ConsensusModule, msg *typesCons.HotstuffMessage) { + defer m.paceMaker.RestartTimer() handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) return } - // TODO(olshansky): add step specific validation + if err := m.didReceiveEnoughMessageForStep(Commit); err != nil { m.nodeLog(typesCons.OptimisticVoteCountWaiting(Commit, err.Error())) return @@ -187,10 +203,9 @@ func (handler *HotstuffLeaderMessageHandler) HandleCommitMessage(m *ConsensusMod } m.Step = Decide - m.MessagePool[Commit] = nil - m.paceMaker.RestartTimer() + m.messagePool[Commit] = nil - decideProposeMessage, err := CreateProposeMessage(m, Decide, commitQC) + decideProposeMessage, err := CreateProposeMessage(m.Height, m.Round, Decide, m.Block, commitQC) if err != nil { m.nodeLogError(typesCons.ErrCreateProposeMessage(Decide).Error(), err) m.paceMaker.InterruptRound() @@ -204,7 +219,7 @@ func (handler *HotstuffLeaderMessageHandler) HandleCommitMessage(m *ConsensusMod return } - // There is no "replica behavior" to imitate here + // There is no "replica behavior" to imitate here because the leader already committed the block proposal. m.paceMaker.NewHeight() m.GetBus(). @@ -216,6 +231,7 @@ func (handler *HotstuffLeaderMessageHandler) HandleCommitMessage(m *ConsensusMod } func (handler *HotstuffLeaderMessageHandler) HandleDecideMessage(m *ConsensusModule, msg *typesCons.HotstuffMessage) { + defer m.paceMaker.RestartTimer() handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { @@ -226,10 +242,19 @@ func (handler *HotstuffLeaderMessageHandler) HandleDecideMessage(m *ConsensusMod // anteHandle is the general handler called for every before every specific HotstuffLeaderMessageHandler handler func (handler *HotstuffLeaderMessageHandler) anteHandle(m *ConsensusModule, msg *typesCons.HotstuffMessage) error { - if err := handler.validateBasic(m, msg); err != nil { + // Basic block metadata validation + + if err := m.validateBlockBasic(msg.GetBlock()); err != nil { + return err + } + + // Discard messages with invalid partial signatures before storing it in the leader's consensus mempool + if err := m.validatePartialSignature(msg); err != nil { return err } - m.aggregateMessage(msg) + + // TECHDEBT: Until we integrate with the real mempool, this is a makeshift solution + m.tempIndexHotstuffMessage(msg) return nil } @@ -246,22 +271,13 @@ func (handler *HotstuffLeaderMessageHandler) emitTelemetryEvent(m *ConsensusModu ) } -// ValidateBasic general validation checks that apply to every HotstuffLeaderMessage -func (handler *HotstuffLeaderMessageHandler) validateBasic(m *ConsensusModule, msg *typesCons.HotstuffMessage) error { - // Discard messages with invalid partial signatures before storing it in the leader's consensus mempool - if err := m.validatePartialSignature(msg); err != nil { - return err - } - return nil -} - func (m *ConsensusModule) validatePartialSignature(msg *typesCons.HotstuffMessage) error { - if msg.Step == NewRound { + if msg.GetStep() == NewRound { m.nodeLog(typesCons.ErrUnnecessaryPartialSigForNewRound.Error()) return nil } - if msg.Type == Propose { + if msg.GetType() == Propose { m.nodeLog(typesCons.ErrUnnecessaryPartialSigForLeaderProposal.Error()) return nil } @@ -269,34 +285,100 @@ func (m *ConsensusModule) validatePartialSignature(msg *typesCons.HotstuffMessag if msg.GetPartialSignature() == nil { return typesCons.ErrNilPartialSig } + partialSig := msg.GetPartialSignature() - if msg.GetPartialSignature().Signature == nil || len(msg.GetPartialSignature().Address) == 0 { + if partialSig.Signature == nil || len(partialSig.GetAddress()) == 0 { return typesCons.ErrNilPartialSigOrSourceNotSpecified } - address := msg.GetPartialSignature().Address + address := partialSig.GetAddress() validator, ok := m.validatorMap[address] if !ok { - return typesCons.ErrMissingValidator(address, m.ValAddrToIdMap[address]) + return typesCons.ErrMissingValidator(address, m.valAddrToIdMap[address]) } pubKey := validator.GetPublicKey() - if isSignatureValid(msg, pubKey, msg.GetPartialSignature().Signature) { + if isSignatureValid(msg, pubKey, partialSig.GetSignature()) { return nil } return typesCons.ErrValidatingPartialSig( - address, m.ValAddrToIdMap[address], msg, pubKey) + address, m.valAddrToIdMap[address], msg, pubKey) } -func (m *ConsensusModule) aggregateMessage(msg *typesCons.HotstuffMessage) { - // TODO(olshansky): Add proper tests for this when we figure out where the mempool should live. - // NOTE: This is just a placeholder at the moment. It doesn't actually work because SizeOf returns - // the size of the map pointer, and does not recursively determine the size of all the underlying elements. - if m.consCfg.GetMaxMempoolBytes() < uint64(unsafe.Sizeof(m.MessagePool)) { +// TODO: This is just a placeholder at the moment for indexing hotstuff messages ONLY. +// It doesn't actually work because SizeOf returns the size of the map pointer, +// and does not recursively determine the size of all the underlying elements +// Add proper tests and implementation once the mempool is implemented. +func (m *ConsensusModule) tempIndexHotstuffMessage(msg *typesCons.HotstuffMessage) { + if m.consCfg.GetMaxMempoolBytes() < uint64(unsafe.Sizeof(m.messagePool)) { m.nodeLogError(typesCons.DisregardHotstuffMessage, typesCons.ErrConsensusMempoolFull) return } // Only the leader needs to aggregate consensus related messages. - m.MessagePool[msg.Step] = append(m.MessagePool[msg.Step], msg) + step := msg.GetStep() + m.messagePool[step] = append(m.messagePool[step], msg) +} + +// This is a helper function intended to be called by a leader/validator during a view change +// to prepare a new block that is applied to the new underlying context. +func (m *ConsensusModule) prepareAndApplyBlock() (*typesCons.Block, error) { + if m.isReplica() { + return nil, typesCons.ErrReplicaPrepareBlock + } + + // TECHDEBT: Retrieve this from consensus consensus config + maxTxBytes := 90000 + + // TECHDEBT: Retrieve this from persistence + lastByzValidators := make([][]byte, 0) + + // Reap the mempool for transactions to be applied in this block + txs, err := m.utilityContext.GetProposalTransactions(m.privateKey.Address(), maxTxBytes, lastByzValidators) + if err != nil { + return nil, err + } + + // OPTIMIZE: Determine if we can avoid the `ApplyBlock` call here + // Apply all the transactions in the block + appHash, err := m.utilityContext.ApplyBlock(int64(m.Height), m.privateKey.Address(), txs, lastByzValidators) + if err != nil { + return nil, err + } + + // Construct the block + blockHeader := &typesCons.BlockHeader{ + Height: int64(m.Height), + Hash: hex.EncodeToString(appHash), + NumTxs: uint32(len(txs)), + LastBlockHash: m.lastAppHash, + ProposerAddress: m.privateKey.Address().Bytes(), + QuorumCertificate: []byte("HACK: Temporary placeholder"), + } + block := &typesCons.Block{ + BlockHeader: blockHeader, + Transactions: txs, + } + + return block, nil +} + +// Return true if this node, the leader, should prepare a new block +func (m *ConsensusModule) shouldPrepareNewBlock(highPrepareQC *typesCons.QuorumCertificate) bool { + if highPrepareQC == nil { + m.nodeLog("Preparing a new block - no highPrepareQC found") + return true + } else if m.isHighPrepareQCFromPast(highPrepareQC) { + m.nodeLog("Preparing a new block - highPrepareQC is from the past") + return true + } else if highPrepareQC.Block == nil { + m.nodeLog("[WARN] Preparing a new block - highPrepareQC SHOULD be used but block is nil") + return true + } + return false +} + +// The `highPrepareQC` is from the past so we can safely ignore it +func (m *ConsensusModule) isHighPrepareQCFromPast(highPrepareQC *typesCons.QuorumCertificate) bool { + return highPrepareQC.Height < m.Height || highPrepareQC.Round < m.Round } diff --git a/consensus/hotstuff_replica.go b/consensus/hotstuff_replica.go index 25041c766..e55e85585 100644 --- a/consensus/hotstuff_replica.go +++ b/consensus/hotstuff_replica.go @@ -1,10 +1,11 @@ package consensus import ( + "encoding/hex" "fmt" - "log" consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry" + "github.com/pokt-network/pocket/consensus/types" typesCons "github.com/pokt-network/pocket/consensus/types" ) @@ -24,46 +25,54 @@ var ( /*** NewRound Step ***/ func (handler *HotstuffReplicaMessageHandler) HandleNewRoundMessage(m *ConsensusModule, msg *typesCons.HotstuffMessage) { + defer m.paceMaker.RestartTimer() handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) return } - // TODO(olshansky): add step specific validation - m.paceMaker.RestartTimer() + + // Clear the previous utility context, if it exists, and create a new one + if err := m.refreshUtilityContext(); err != nil { + m.nodeLogError("Could not refresh utility context", err) + return + } + m.Step = Prepare } /*** Prepare Step ***/ func (handler *HotstuffReplicaMessageHandler) HandlePrepareMessage(m *ConsensusModule, msg *typesCons.HotstuffMessage) { + defer m.paceMaker.RestartTimer() handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) return } - // TODO(olshansky): add step specific validation + if err := m.validateProposal(msg); err != nil { m.nodeLogError(fmt.Sprintf("Invalid proposal in %s message", Prepare), err) m.paceMaker.InterruptRound() return } - if err := m.applyBlockAsReplica(msg.Block); err != nil { + block := msg.GetBlock() + if err := m.applyBlock(block); err != nil { m.nodeLogError(typesCons.ErrApplyBlock.Error(), err) m.paceMaker.InterruptRound() return } + m.Block = block m.Step = PreCommit - m.paceMaker.RestartTimer() - prepareVoteMessage, err := CreateVoteMessage(m, Prepare, msg.Block) + prepareVoteMessage, err := CreateVoteMessage(m.Height, m.Round, Prepare, m.Block, m.privateKey) if err != nil { m.nodeLogError(typesCons.ErrCreateVoteMessage(Prepare).Error(), err) - return // TODO(olshansky): Should we interrupt the round here? + return // Not interrupting the round because liveness could continue with one failed vote } m.sendToNode(prepareVoteMessage) } @@ -71,27 +80,28 @@ func (handler *HotstuffReplicaMessageHandler) HandlePrepareMessage(m *ConsensusM /*** PreCommit Step ***/ func (handler *HotstuffReplicaMessageHandler) HandlePrecommitMessage(m *ConsensusModule, msg *typesCons.HotstuffMessage) { + defer m.paceMaker.RestartTimer() handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) return } - // TODO(olshansky): add step specific validation - if err := m.validateQuorumCertificate(msg.GetQuorumCertificate()); err != nil { + + quorumCert := msg.GetQuorumCertificate() + if err := m.validateQuorumCertificate(quorumCert); err != nil { m.nodeLogError(typesCons.ErrQCInvalid(PreCommit).Error(), err) m.paceMaker.InterruptRound() return } m.Step = Commit - m.HighPrepareQC = msg.GetQuorumCertificate() // TODO(discuss): Why are we never using this for validation? - m.paceMaker.RestartTimer() + m.highPrepareQC = quorumCert // INVESTIGATE: Why are we never using this for validation? - preCommitVoteMessage, err := CreateVoteMessage(m, PreCommit, msg.Block) + preCommitVoteMessage, err := CreateVoteMessage(m.Height, m.Round, PreCommit, m.Block, m.privateKey) if err != nil { m.nodeLogError(typesCons.ErrCreateVoteMessage(PreCommit).Error(), err) - return // TODO(olshansky): Should we interrupt the round here? + return // Not interrupting the round because liveness could continue with one failed vote } m.sendToNode(preCommitVoteMessage) } @@ -99,27 +109,28 @@ func (handler *HotstuffReplicaMessageHandler) HandlePrecommitMessage(m *Consensu /*** Commit Step ***/ func (handler *HotstuffReplicaMessageHandler) HandleCommitMessage(m *ConsensusModule, msg *typesCons.HotstuffMessage) { + defer m.paceMaker.RestartTimer() handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) return } - // TODO(olshansky): add step specific validation - if err := m.validateQuorumCertificate(msg.GetQuorumCertificate()); err != nil { + + quorumCert := msg.GetQuorumCertificate() + if err := m.validateQuorumCertificate(quorumCert); err != nil { m.nodeLogError(typesCons.ErrQCInvalid(Commit).Error(), err) m.paceMaker.InterruptRound() return } m.Step = Decide - m.LockedQC = msg.GetQuorumCertificate() // TODO(discuss): How do the replica recover if it's locked? Replica `formally` agrees on the QC while the rest of the network `verbally` agrees on the QC. - m.paceMaker.RestartTimer() + m.lockedQC = quorumCert // DISCUSS: How does the replica recover if it's locked? Replica `formally` agrees on the QC while the rest of the network `verbally` agrees on the QC. - commitVoteMessage, err := CreateVoteMessage(m, Commit, msg.Block) + commitVoteMessage, err := CreateVoteMessage(m.Height, m.Round, Commit, m.Block, m.privateKey) if err != nil { m.nodeLogError(typesCons.ErrCreateVoteMessage(Commit).Error(), err) - return // TODO(olshansky): Should we interrupt the round here? + return // Not interrupting the round because liveness could continue with one failed vote } m.sendToNode(commitVoteMessage) } @@ -127,20 +138,22 @@ func (handler *HotstuffReplicaMessageHandler) HandleCommitMessage(m *ConsensusMo /*** Decide Step ***/ func (handler *HotstuffReplicaMessageHandler) HandleDecideMessage(m *ConsensusModule, msg *typesCons.HotstuffMessage) { + defer m.paceMaker.RestartTimer() handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) return } - // TODO(olshansky): add step specific validation - if err := m.validateQuorumCertificate(msg.GetQuorumCertificate()); err != nil { + + quorumCert := msg.GetQuorumCertificate() + if err := m.validateQuorumCertificate(quorumCert); err != nil { m.nodeLogError(typesCons.ErrQCInvalid(Decide).Error(), err) m.paceMaker.InterruptRound() return } - if err := m.commitBlock(msg.Block); err != nil { + if err := m.commitBlock(m.Block); err != nil { m.nodeLogError("Could not commit block", err) m.paceMaker.InterruptRound() return @@ -151,7 +164,11 @@ func (handler *HotstuffReplicaMessageHandler) HandleDecideMessage(m *ConsensusMo // anteHandle is the handler called on every replica message before specific handler func (handler *HotstuffReplicaMessageHandler) anteHandle(m *ConsensusModule, msg *typesCons.HotstuffMessage) error { - log.Println("TODO: Hotstuff replica ante handle not implemented yet") + // Basic block metadata validation + if err := m.validateBlockBasic(msg.GetBlock()); err != nil { + return err + } + return nil } @@ -169,24 +186,22 @@ func (handler *HotstuffReplicaMessageHandler) emitTelemetryEvent(m *ConsensusMod } func (m *ConsensusModule) validateProposal(msg *typesCons.HotstuffMessage) error { - if !(msg.Type == Propose && msg.Step == Prepare) { + // Check if node should be accepting proposals + if !(msg.GetType() == Propose && msg.GetStep() == Prepare) { return typesCons.ErrProposalNotValidInPrepare } - if err := m.validateBlock(msg.Block); err != nil { - return err - } - - // TODO(discuss): A nil QC implies a successfull CommitQC or TimeoutQC, which have been omitted intentionally since - // they are not needed for consensus validity. However, if a QC is specified, it must be valid. - if msg.GetQuorumCertificate() != nil { - if err := m.validateQuorumCertificate(msg.GetQuorumCertificate()); err != nil { + quorumCert := msg.GetQuorumCertificate() + // A nil QC implies a successful CommitQC or TimeoutQC, which have been omitted intentionally + // since they are not needed for consensus validity. However, if a QC is specified, it must be valid. + if quorumCert != nil { + if err := m.validateQuorumCertificate(quorumCert); err != nil { return err } } - lockedQC := m.LockedQC - justifyQC := msg.GetQuorumCertificate() + lockedQC := m.lockedQC + justifyQC := quorumCert // Safety: not locked if lockedQC == nil { @@ -195,20 +210,40 @@ func (m *ConsensusModule) validateProposal(msg *typesCons.HotstuffMessage) error } // Safety: check the hash of the locked QC - // TODO(olshansky): Extend implementation to adopt `ExtendsFrom` as described in the Hotstuff whitepaper. - if protoHash(lockedQC.Block) == protoHash(justifyQC.Block) { // && lockedQC.Block.ExtendsFrom(justifyQC.Block) + // The equivalent of `lockedQC.Block.ExtendsFrom(justifyQC.Block)` in the hotstuff whitepaper is done in `applyBlock` below. + if protoHash(lockedQC.GetBlock()) == protoHash(justifyQC.Block) { m.nodeLog(typesCons.ProposalBlockExtends) return nil } - // Liveness: node is locked on a QC from the past. [TODO]: Do we want to set `m.LockedQC = nil` here or something else? - if justifyQC.Height > lockedQC.Height || (justifyQC.Height == lockedQC.Height && justifyQC.Round > lockedQC.Round) { - return typesCons.ErrNodeIsLockedOnPastQC + // Liveness: is node locked on a QC from the past? + // DISCUSS: Where should additional logic be added to unlock the node? + if isLocked, err := isNodeLockedOnPastQC(justifyQC, lockedQC); isLocked { + return err } return typesCons.ErrUnhandledProposalCase } +// This helper applies the block metadata to the utility & persistence layers +func (m *ConsensusModule) applyBlock(block *typesCons.Block) error { + // TECHDEBT: Retrieve this from persistence + lastByzValidators := make([][]byte, 0) + + // Apply all the transactions in the block and get the appHash + appHash, err := m.utilityContext.ApplyBlock(int64(m.Height), block.BlockHeader.ProposerAddress, block.Transactions, lastByzValidators) + if err != nil { + return err + } + + // CONSOLIDATE: Terminology of `blockHash`, `appHash` and `stateHash` + if block.BlockHeader.Hash != hex.EncodeToString(appHash) { + return typesCons.ErrInvalidAppHash(block.BlockHeader.Hash, hex.EncodeToString(appHash)) + } + + return nil +} + func (m *ConsensusModule) validateQuorumCertificate(qc *typesCons.QuorumCertificate) error { if qc == nil { return typesCons.ErrNilQC @@ -224,21 +259,22 @@ func (m *ConsensusModule) validateQuorumCertificate(qc *typesCons.QuorumCertific msgToJustify := qcToHotstuffMessage(qc) numValid := 0 + + // TODO(#109): Aggregate signatures once BLS or DKG is implemented for _, partialSig := range qc.ThresholdSignature.Signatures { validator, ok := m.validatorMap[partialSig.Address] if !ok { - m.nodeLogError(typesCons.ErrMissingValidator(partialSig.Address, m.ValAddrToIdMap[partialSig.Address]).Error(), nil) + m.nodeLogError(typesCons.ErrMissingValidator(partialSig.Address, m.valAddrToIdMap[partialSig.Address]).Error(), nil) continue } // TODO(olshansky): Every call to `IsSignatureValid` does a serialization and should be optimized. We can // just serialize `Message` once and verify each signature without re-serializing every time. if !isSignatureValid(msgToJustify, validator.GetPublicKey(), partialSig.Signature) { - m.nodeLog(typesCons.WarnInvalidPartialSigInQC(partialSig.Address, m.ValAddrToIdMap[partialSig.Address])) + m.nodeLog(typesCons.WarnInvalidPartialSigInQC(partialSig.Address, m.valAddrToIdMap[partialSig.Address])) continue } numValid++ } - if err := m.isOptimisticThresholdMet(numValid); err != nil { return err } @@ -246,6 +282,23 @@ func (m *ConsensusModule) validateQuorumCertificate(qc *typesCons.QuorumCertific return nil } +func isNodeLockedOnPastQC(justifyQC, lockedQC *types.QuorumCertificate) (bool, error) { + if isLockedOnPastHeight(justifyQC, lockedQC) { + return true, types.ErrNodeLockedPastHeight + } else if isLockedOnCurrHeightAndPastRound(justifyQC, lockedQC) { + return true, types.ErrNodeLockedPastHeight + } + return false, nil +} + +func isLockedOnPastHeight(justifyQC, lockedQC *types.QuorumCertificate) bool { + return justifyQC.Height > lockedQC.Height +} + +func isLockedOnCurrHeightAndPastRound(justifyQC, lockedQC *types.QuorumCertificate) bool { + return justifyQC.Height == lockedQC.Height && justifyQC.Round > lockedQC.Round +} + func qcToHotstuffMessage(qc *typesCons.QuorumCertificate) *typesCons.HotstuffMessage { return &typesCons.HotstuffMessage{ Height: qc.Height, diff --git a/consensus/messages.go b/consensus/messages.go index 4fd09cea5..b2a06013e 100644 --- a/consensus/messages.go +++ b/consensus/messages.go @@ -4,35 +4,38 @@ import ( "log" typesCons "github.com/pokt-network/pocket/consensus/types" + "github.com/pokt-network/pocket/shared/codec" "github.com/pokt-network/pocket/shared/crypto" - "google.golang.org/protobuf/proto" ) func CreateProposeMessage( - m *ConsensusModule, - step typesCons.HotstuffStep, // step can be taken from `m` but is specified explicitly via interface to avoid ambiguity + height uint64, + round uint64, + step typesCons.HotstuffStep, + block *typesCons.Block, qc *typesCons.QuorumCertificate, ) (*typesCons.HotstuffMessage, error) { - if m.Block == nil { - return nil, typesCons.ErrNilBlockProposal + if block == nil { + return nil, typesCons.ErrNilBlockVote } msg := &typesCons.HotstuffMessage{ Type: Propose, - Height: m.Height, + Height: height, Step: step, - Round: m.Round, - Block: m.Block, + Round: round, + Block: block, Justification: nil, // QC is set below if it is non-nil } - // TODO(olshansky): Add unit tests for this + // TODO: Add unit tests for this if qc == nil && step != Prepare { return nil, typesCons.ErrNilQCProposal } - // TODO(olshansky): Add unit tests for this - if qc != nil { // QC may optionally be nil for NEWROUND steps when everything is progressing smoothly + // TODO: Add unit tests for this + // QC may be nil during NEWROUND if following happy hotstuff path + if qc != nil { msg.Justification = &typesCons.HotstuffMessage_QuorumCertificate{ QuorumCertificate: qc, } @@ -42,9 +45,11 @@ func CreateProposeMessage( } func CreateVoteMessage( - m *ConsensusModule, - step typesCons.HotstuffStep, // step can be taken from `m` but is specified explicitly via interface to avoid ambiguity + height uint64, + round uint64, + step typesCons.HotstuffStep, block *typesCons.Block, + privKey crypto.PrivateKey, // used to sign the vote ) (*typesCons.HotstuffMessage, error) { if block == nil { return nil, typesCons.ErrNilBlockVote @@ -52,44 +57,50 @@ func CreateVoteMessage( msg := &typesCons.HotstuffMessage{ Type: Vote, - Height: m.Height, + Height: height, Step: step, - Round: m.Round, + Round: round, Block: block, Justification: nil, // signature is computed below } msg.Justification = &typesCons.HotstuffMessage_PartialSignature{ PartialSignature: &typesCons.PartialSignature{ - Signature: getMessageSignature(msg, m.privateKey), - Address: m.privateKey.PublicKey().Address().String(), + Signature: getMessageSignature(msg, privKey), + Address: privKey.PublicKey().Address().String(), }, } return msg, nil } -// Returns a "partial" signature of the hotstuff message from one of the validators -func getMessageSignature(m *typesCons.HotstuffMessage, privKey crypto.PrivateKey) []byte { - bytesToSign, err := getSignableBytes(m) +// Returns "partial" signature of the hotstuff message from one of the validators. +// If there is an error signing the bytes, nil is returned instead. +func getMessageSignature(msg *typesCons.HotstuffMessage, privKey crypto.PrivateKey) []byte { + bytesToSign, err := getSignableBytes(msg) if err != nil { + log.Printf("[WARN] error getting bytes to sign: %v\n", err) return nil } + signature, err := privKey.Sign(bytesToSign) if err != nil { - log.Fatalf("Error signing message: %v", err) + log.Printf("[WARN] error signing message: %v\n", err) return nil } + return signature } -// Signature should only be over a subset of the fields in a HotstuffMessage -func getSignableBytes(m *typesCons.HotstuffMessage) ([]byte, error) { +// Signature only over subset of fields in HotstuffMessage +// For reference, see section 4.3 of the the hotstuff whitepaper, partial signatures are +// computed over `tsignr(hm.type, m.viewNumber , m.nodei)`. https://arxiv.org/pdf/1803.05069.pdf +func getSignableBytes(msg *typesCons.HotstuffMessage) ([]byte, error) { msgToSign := &typesCons.HotstuffMessage{ - Height: m.Height, - Step: m.Step, - Round: m.Round, - Block: m.Block, + Height: msg.GetHeight(), + Step: msg.GetStep(), + Round: msg.GetRound(), + Block: msg.GetBlock(), } - return proto.Marshal(msgToSign) + return codec.GetCodec().Marshal(msgToSign) } diff --git a/consensus/module.go b/consensus/module.go index c6e240d3d..917cd88d4 100644 --- a/consensus/module.go +++ b/consensus/module.go @@ -9,9 +9,9 @@ import ( "github.com/pokt-network/pocket/consensus/leader_election" typesCons "github.com/pokt-network/pocket/consensus/types" + "github.com/pokt-network/pocket/shared/codec" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" "github.com/pokt-network/pocket/shared/test_artifacts" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry" @@ -19,7 +19,7 @@ import ( ) const ( - DefaultLogPrefix = "NODE" // Just a default that'll be replaced during consensus operations. + DefaultLogPrefix = "NODE" // TODO(#164): Make implicit when logging is standardized ConsensusModuleName = "consensus" ) @@ -28,12 +28,13 @@ var _ modules.PacemakerConfig = &typesCons.PacemakerConfig{} var _ modules.ConsensusConfig = &typesCons.ConsensusConfig{} var _ modules.ConsensusModule = &ConsensusModule{} -// TODO(olshansky): Any reason to make all of these attributes local only (i.e. not exposed outside the struct)? -// TODO(olshansky): Look for a way to not externalize the `ConsensusModule` struct +// TODO(#256): Do not export the `ConsensusModule` struct or the fields inside of it. type ConsensusModule struct { bus modules.Bus privateKey cryptoPocket.Ed25519PrivateKey - consCfg modules.ConsensusConfig + + consCfg *typesCons.ConsensusConfig + consGenesis *typesCons.ConsensusGenesisState // m is a mutex used to control synchronization when multiple goroutines are accessing the struct and its fields / properties. // @@ -46,31 +47,32 @@ type ConsensusModule struct { Height uint64 Round uint64 Step typesCons.HotstuffStep - Block *typesCons.Block // The current block being voted on prior to committing to finality + Block *typesCons.Block // The current block being proposed / voted on; it has not been committed to finality - HighPrepareQC *typesCons.QuorumCertificate // Highest QC for which replica voted PRECOMMIT - LockedQC *typesCons.QuorumCertificate // Highest QC for which replica voted COMMIT + highPrepareQC *typesCons.QuorumCertificate // Highest QC for which replica voted PRECOMMIT + lockedQC *typesCons.QuorumCertificate // Highest QC for which replica voted COMMIT // Leader Election LeaderId *typesCons.NodeId - NodeId typesCons.NodeId - ValAddrToIdMap typesCons.ValAddrToIdMap // TODO(design): This needs to be updated every time the ValMap is modified - IdToValAddrMap typesCons.IdToValAddrMap // TODO(design): This needs to be updated every time the ValMap is modified + nodeId typesCons.NodeId + valAddrToIdMap typesCons.ValAddrToIdMap // TODO: This needs to be updated every time the ValMap is modified + idToValAddrMap typesCons.IdToValAddrMap // TODO: This needs to be updated every time the ValMap is modified // Consensus State - appHash string + lastAppHash string // TODO: Always retrieve this variable from the persistence module and simplify this struct validatorMap typesCons.ValidatorMap // Module Dependencies + // TODO(#283): Improve how `utilityContext` is managed utilityContext modules.UtilityContext paceMaker Pacemaker leaderElectionMod leader_election.LeaderElectionModule - logPrefix string // TODO(design): Remove later when we build a shared/proper/injected logger - MessagePool map[typesCons.HotstuffStep][]*typesCons.HotstuffMessage // TODO(design): Move this over to the persistence module or elsewhere? - // TODO(andrew): Explain (or remove) why have an explicit `MaxBlockBytes` if we are already storing a reference to `consCfg` above? - // TODO(andrew): This needs to be updated every time the utility module changes this value. It can be accessed via the "application specific bus" (mimicking the intermodule interface in ABCI) - MaxBlockBytes uint64 + // DEPRECATE: Remove later when we build a shared/proper/injected logger + logPrefix string + + // TECHDEBT: Move this over to use the txIndexer + messagePool map[typesCons.HotstuffStep][]*typesCons.HotstuffMessage } func Create(configPath, genesisPath string, useRandomPK bool) (modules.ConsensusModule, error) { @@ -112,32 +114,32 @@ func Create(configPath, genesisPath string, useRandomPK bool) (modules.Consensus m := &ConsensusModule{ bus: nil, - privateKey: privateKey.(cryptoPocket.Ed25519PrivateKey), - consCfg: cfg, + privateKey: privateKey.(cryptoPocket.Ed25519PrivateKey), + consCfg: cfg, + consGenesis: genesis, Height: 0, Round: 0, Step: NewRound, Block: nil, - HighPrepareQC: nil, - LockedQC: nil, + highPrepareQC: nil, + lockedQC: nil, - NodeId: valIdMap[address], + nodeId: valIdMap[address], LeaderId: nil, - ValAddrToIdMap: valIdMap, - IdToValAddrMap: idValMap, + valAddrToIdMap: valIdMap, + idToValAddrMap: idValMap, - appHash: "", + lastAppHash: "", validatorMap: valMap, utilityContext: nil, paceMaker: paceMaker, leaderElectionMod: leaderElectionMod, - logPrefix: DefaultLogPrefix, - MessagePool: make(map[typesCons.HotstuffStep][]*typesCons.HotstuffMessage), - MaxBlockBytes: genesis.GetMaxBlockBytes(), + logPrefix: DefaultLogPrefix, + messagePool: make(map[typesCons.HotstuffStep][]*typesCons.HotstuffMessage), } // TODO(olshansky): Look for a way to avoid doing this. @@ -224,70 +226,22 @@ func (m *ConsensusModule) SetBus(pocketBus modules.Bus) { m.leaderElectionMod.SetBus(pocketBus) } -func (m *ConsensusModule) loadPersistedState() error { - persistenceContext, err := m.GetBus().GetPersistenceModule().NewReadContext(-1) // Unknown height - if err != nil { - return nil - } - defer persistenceContext.Close() - - latestHeight, err := persistenceContext.GetLatestBlockHeight() - if err != nil || latestHeight == 0 { - m.nodeLog("TODO: State sync not implement") - return nil - } - - appHash, err := persistenceContext.GetBlockHash(int64(latestHeight)) - if err != nil { - return fmt.Errorf("error getting block hash for height %d even though it's in the database: %s", latestHeight, err) - } - - // TODO: Populate the rest of the state from the persistence module: validator set, quorum cert, last block hash, etc... - m.Height = uint64(latestHeight) + 1 // +1 because the height of the consensus module is where it is actively participating in consensus - m.appHash = string(appHash) - - m.nodeLog(fmt.Sprintf("Starting node at height %d", latestHeight)) - return nil -} - -// TODO(discuss): Low priority design: think of a way to make `hotstuff_*` files be a sub-package under consensus. -// This is currently not possible because functions tied to the `ConsensusModule` -// struct (implementing the ConsensusModule module), which spans multiple files. -/* -TODO(discuss): The reason we do not assign both the leader and the replica handlers -to the leader (which should also act as a replica when it is a leader) is because it -can create a weird inconsistent state (e.g. both the replica and leader try to restart -the Pacemaker timeout). This requires additional "replica-like" logic in the leader -handler which has both pros and cons: - Pros: - * The leader can short-circuit and optimize replica related logic - * Avoids additional code flowing through the P2P pipeline - * Allows for micro-optimizations - Cons: - * The leader's "replica related logic" requires an additional code path - * Code is less "generalizable" and therefore potentially more error prone -*/ - -// TODO(olshansky): Should we just make these singletons or embed them directly in the ConsensusModule? -type HotstuffMessageHandler interface { - HandleNewRoundMessage(*ConsensusModule, *typesCons.HotstuffMessage) - HandlePrepareMessage(*ConsensusModule, *typesCons.HotstuffMessage) - HandlePrecommitMessage(*ConsensusModule, *typesCons.HotstuffMessage) - HandleCommitMessage(*ConsensusModule, *typesCons.HotstuffMessage) - HandleDecideMessage(*ConsensusModule, *typesCons.HotstuffMessage) -} - func (m *ConsensusModule) HandleMessage(message *anypb.Any) error { m.m.Lock() defer m.m.Unlock() switch message.MessageName() { case HotstuffMessage: - var hotstuffMessage typesCons.HotstuffMessage - err := anypb.UnmarshalTo(message, &hotstuffMessage, proto.UnmarshalOptions{}) + msg, err := codec.GetCodec().FromAny(message) if err != nil { return err } - m.handleHotstuffMessage(&hotstuffMessage) + hotstuffMessage, ok := msg.(*typesCons.HotstuffMessage) + if !ok { + return fmt.Errorf("failed to cast message to HotstuffMessage") + } + if err := m.handleHotstuffMessage(hotstuffMessage); err != nil { + return err + } case UtilityMessage: panic("[WARN] UtilityMessage handling is not implemented by consensus yet...") default: @@ -297,36 +251,8 @@ func (m *ConsensusModule) HandleMessage(message *anypb.Any) error { return nil } -func (m *ConsensusModule) handleHotstuffMessage(msg *typesCons.HotstuffMessage) { - m.nodeLog(typesCons.DebugHandlingHotstuffMessage(msg)) - - // Liveness & safety checks - if err := m.paceMaker.ValidateMessage(msg); err != nil { - // If a replica is not a leader for this round, but has already determined a leader, - // and continues to receive NewRound messages, we avoid logging the "message discard" - // because it creates unnecessary spam. - if !(m.LeaderId != nil && !m.isLeader() && msg.Step == NewRound) { - m.nodeLog(typesCons.WarnDiscardHotstuffMessage(msg, err.Error())) - } - return - } - - // Need to execute leader election if there is no leader and we are in a new round. - if m.Step == NewRound && m.isLeaderUnknown() { - m.electNextLeader(msg) - } - - if m.isReplica() { - replicaHandlers[msg.Step](m, msg) - return - } - - // Note that the leader also acts as a replica, but this logic is implemented in the underlying code. - leaderHandlers[msg.Step](m, msg) -} - func (m *ConsensusModule) AppHash() string { - return m.appHash + return m.lastAppHash } func (m *ConsensusModule) CurrentHeight() uint64 { @@ -336,3 +262,34 @@ func (m *ConsensusModule) CurrentHeight() uint64 { func (m *ConsensusModule) ValidatorMap() modules.ValidatorMap { return typesCons.ValidatorMapToModulesValidatorMap(m.validatorMap) } + +// TODO(#256): Currently only used for testing purposes +func (m *ConsensusModule) SetUtilityContext(utilityContext modules.UtilityContext) { + m.utilityContext = utilityContext +} + +// TODO: Populate the entire state from the persistence module: validator set, quorum cert, last block hash, etc... +func (m *ConsensusModule) loadPersistedState() error { + persistenceContext, err := m.GetBus().GetPersistenceModule().NewReadContext(-1) // Unknown height + if err != nil { + return nil + } + defer persistenceContext.Close() + + latestHeight, err := persistenceContext.GetLatestBlockHeight() + if err != nil || latestHeight == 0 { + m.nodeLog("TODO: State sync not implemented yet") + return nil + } + + appHash, err := persistenceContext.GetBlockHash(int64(latestHeight)) + if err != nil { + return fmt.Errorf("error getting block hash for height %d even though it's in the database: %s", latestHeight, err) + } + + m.Height = uint64(latestHeight) + 1 // +1 because the height of the consensus module is where it is actively participating in consensus + m.lastAppHash = string(appHash) + + m.nodeLog(fmt.Sprintf("Starting node at height %d", latestHeight)) + return nil +} diff --git a/consensus/pacemaker.go b/consensus/pacemaker.go index 21abdbdab..ba4202897 100644 --- a/consensus/pacemaker.go +++ b/consensus/pacemaker.go @@ -183,7 +183,7 @@ func (p *paceMaker) InterruptRound() { p.consensusMod.nodeLog(typesCons.PacemakerInterrupt(p.consensusMod.CurrentHeight(), p.consensusMod.Step, p.consensusMod.Round)) p.consensusMod.Round++ - p.startNextView(p.consensusMod.HighPrepareQC, false) + p.startNextView(p.consensusMod.highPrepareQC, false) } func (p *paceMaker) NewHeight() { diff --git a/consensus/types/errors.go b/consensus/types/errors.go index e3f4d3987..5368ee1bf 100644 --- a/consensus/types/errors.go +++ b/consensus/types/errors.go @@ -6,6 +6,7 @@ import ( "fmt" "log" + "github.com/pokt-network/pocket/shared/codec" "google.golang.org/protobuf/proto" ) @@ -71,11 +72,11 @@ func ElectedSelfAsNewLeader(address string, nodeId NodeId, height, round uint64) } func SendingMessage(msg *HotstuffMessage, nodeId NodeId) string { - return fmt.Sprintf("Sending %s message to %d", StepToString[msg.Step], nodeId) + return fmt.Sprintf("Sending %s message to %d", StepToString[msg.GetStep()], nodeId) } func BroadcastingMessage(msg *HotstuffMessage) string { - return fmt.Sprintf("Broadcasting message for %s step", StepToString[msg.Step]) + return fmt.Sprintf("Broadcasting message for %s step", StepToString[msg.GetStep()]) } func WarnInvalidPartialSigInQC(address string, nodeId NodeId) string { @@ -83,7 +84,7 @@ func WarnInvalidPartialSigInQC(address string, nodeId NodeId) string { } func WarnMissingPartialSig(msg *HotstuffMessage) string { - return fmt.Sprintf("[WARN] No partial signature found for step %s which should not happen...", StepToString[msg.Step]) + return fmt.Sprintf("[WARN] No partial signature found for step %s which should not happen...", StepToString[msg.GetStep()]) } func WarnDiscardHotstuffMessage(_ *HotstuffMessage, reason string) string { @@ -95,7 +96,7 @@ func WarnUnexpectedMessageInPool(_ *HotstuffMessage, height uint64, step Hotstuf } func WarnIncompletePartialSig(ps *PartialSignature, msg *HotstuffMessage) string { - return fmt.Sprintf("[WARN] Partial signature is incomplete for step %s which should not happen...", StepToString[msg.Step]) + return fmt.Sprintf("[WARN] Partial signature is incomplete for step %s which should not happen...", StepToString[msg.GetStep()]) } func DebugTogglePacemakerManualMode(mode string) string { @@ -108,12 +109,13 @@ func DebugNodeState(state ConsensusNodeState) string { func DebugHandlingHotstuffMessage(msg *HotstuffMessage) string { // TODO(olshansky): Add source and destination NodeId of message here - return fmt.Sprintf("[DEBUG] Handling message w/ Height: %d; Type: %s; Round: %d.", msg.Height, StepToString[msg.Step], msg.Round) + return fmt.Sprintf("[DEBUG] Handling message w/ Height: %d; Type: %s; Round: %d.", msg.Height, StepToString[msg.GetStep()], msg.Round) } // Errors const ( nilBLockError = "block is nil" + blockExistsError = "block exists but should be nil" nilBLockProposalError = "block should never be nil when creating a proposal message" nilBLockVoteError = "block should never be nil when creating a vote message for a proposal" proposalNotValidInPrepareError = "proposal is not valid in the PREPARE step" @@ -122,7 +124,8 @@ const ( nilBlockInQCError = "QC must contain a non nil block" nilThresholdSigInQCError = "QC must contains a non nil threshold signature" notEnoughSignaturesError = "did not receive enough partial signature" - nodeIsLockedOnPastQCError = "node is locked on a QC from the past" + nodeIsLockedOnPastHeightQCError = "node is locked on a QC from a past height" + nodeIsLockedOnPastRoundQCError = "node is locked on a QC from a past round" unhandledProposalCaseError = "unhandled proposal validation check" unnecessaryPartialSigForNewRoundError = "newRound messages do not need a partial signature" unnecessaryPartialSigForLeaderProposalError = "leader proposals do not need a partial signature" @@ -142,7 +145,6 @@ const ( prepareBlockError = "could not prepare block" commitBlockError = "could not commit block" replicaPrepareBlockError = "node should not call `prepareBlock` if it is not a leader" - leaderErrApplyBlock = "node should not call `applyBlock` if it is leader" blockSizeTooLargeError = "block size is too large" sendMessageError = "error sending message" broadcastMessageError = "error broadcasting message" @@ -153,6 +155,7 @@ const ( var ( ErrNilBlock = errors.New(nilBLockError) + ErrBlockExists = errors.New(blockExistsError) ErrNilBlockProposal = errors.New(nilBLockProposalError) ErrNilBlockVote = errors.New(nilBLockVoteError) ErrProposalNotValidInPrepare = errors.New(proposalNotValidInPrepareError) @@ -161,7 +164,8 @@ var ( ErrNilBlockInQC = errors.New(nilBlockInQCError) ErrNilThresholdSigInQC = errors.New(nilThresholdSigInQCError) ErrNotEnoughSignatures = errors.New(notEnoughSignaturesError) - ErrNodeIsLockedOnPastQC = errors.New(nodeIsLockedOnPastQCError) + ErrNodeLockedPastHeight = errors.New(nodeIsLockedOnPastHeightQCError) + ErrNodeLockedPastRound = errors.New(nodeIsLockedOnPastRoundQCError) ErrUnhandledProposalCase = errors.New(unhandledProposalCaseError) ErrUnnecessaryPartialSigForNewRound = errors.New(unnecessaryPartialSigForNewRoundError) ErrUnnecessaryPartialSigForLeaderProposal = errors.New(unnecessaryPartialSigForLeaderProposalError) @@ -177,7 +181,6 @@ var ( ErrPrepareBlock = errors.New(prepareBlockError) ErrCommitBlock = errors.New(commitBlockError) ErrReplicaPrepareBlock = errors.New(replicaPrepareBlockError) - ErrLeaderApplyBLock = errors.New(leaderErrApplyBlock) ErrSendMessage = errors.New(sendMessageError) ErrBroadcastMessage = errors.New(broadcastMessageError) ErrCreateConsensusMessage = errors.New(createConsensusMessageError) @@ -203,7 +206,7 @@ func ErrMissingValidator(address string, nodeId NodeId) error { func ErrValidatingPartialSig(senderAddr string, senderNodeId NodeId, msg *HotstuffMessage, pubKey string) error { return fmt.Errorf("%s: Sender: %s (%d); Height: %d; Step: %s; Round: %d; SigHash: %s; BlockHash: %s; PubKey: %s", - invalidPartialSignatureError, senderAddr, senderNodeId, msg.Height, StepToString[msg.Step], msg.Round, string(msg.GetPartialSignature().Signature), protoHash(msg.Block), pubKey) + invalidPartialSignatureError, senderAddr, senderNodeId, msg.Height, StepToString[msg.GetStep()], msg.Round, string(msg.GetPartialSignature().Signature), protoHash(msg.Block), pubKey) } func ErrPacemakerUnexpectedMessageHeight(err error, heightCurrent, heightMessage uint64) error { @@ -211,7 +214,7 @@ func ErrPacemakerUnexpectedMessageHeight(err error, heightCurrent, heightMessage } func ErrPacemakerUnexpectedMessageStepRound(err error, step HotstuffStep, round uint64, msg *HotstuffMessage) error { - return fmt.Errorf("%s: Current (step, round): (%s, %d); Message (step, round): (%s, %d)", err, StepToString[step], round, StepToString[msg.Step], msg.Round) + return fmt.Errorf("%s: Current (step, round): (%s, %d); Message (step, round): (%s, %d)", err, StepToString[step], round, StepToString[msg.GetStep()], msg.Round) } func ErrUnknownConsensusMessageType(msg interface{}) error { @@ -235,7 +238,7 @@ func ErrLeaderElection(msg *HotstuffMessage) error { } func protoHash(m proto.Message) string { - b, err := proto.Marshal(m) + b, err := codec.GetCodec().Marshal(m) if err != nil { log.Fatalf("Could not marshal proto message: %v", err) } diff --git a/consensus/types/proto/block.proto b/consensus/types/proto/block.proto index 6024bcea4..b18d8dfa7 100644 --- a/consensus/types/proto/block.proto +++ b/consensus/types/proto/block.proto @@ -5,7 +5,7 @@ option go_package = "github.com/pokt-network/pocket/consensus/types"; import "google/protobuf/timestamp.proto"; -// TODO (Team) Discuss all tendermint legacy +// TECHDEBT: Re-evaluate some tendermint legacy fields message BlockHeader { int64 height = 1; string hash = 2; diff --git a/consensus/types/proto/hotstuff_types.proto b/consensus/types/proto/hotstuff_types.proto index b1f7c9b4d..c289694c8 100644 --- a/consensus/types/proto/hotstuff_types.proto +++ b/consensus/types/proto/hotstuff_types.proto @@ -18,7 +18,7 @@ enum HotstuffStep { enum HotstuffMessageType { HOTSTUFF_MESSAGE_UNKNOWN = 0; - HOTSTUFF_MESAGE_PROPOSE = 1; + HOTSTUFF_MESSAGE_PROPOSE = 1; HOTSTUFF_MESSAGE_VOTE = 2; } diff --git a/consensus/types/types.go b/consensus/types/types.go index 36896cfaf..86b9a0624 100644 --- a/consensus/types/types.go +++ b/consensus/types/types.go @@ -1,8 +1,10 @@ package types +// TODO: Split this file into multiple types files. import ( - "github.com/pokt-network/pocket/shared/modules" "sort" + + "github.com/pokt-network/pocket/shared/modules" ) type NodeId uint64 diff --git a/shared/codec/codec.go b/shared/codec/codec.go index ac7765673..a41fe2e0b 100644 --- a/shared/codec/codec.go +++ b/shared/codec/codec.go @@ -5,6 +5,8 @@ import ( "google.golang.org/protobuf/types/known/anypb" ) +// TODO: Use generics in place of `proto.Message` in the interface below +// so every caller does not need to do in place casting. type Codec interface { // TODO (Team) move to shared. Possibly rename Marshal(proto.Message) ([]byte, error) Unmarshal([]byte, proto.Message) error @@ -14,6 +16,8 @@ type Codec interface { // TODO (Team) move to shared. Possibly rename var _ Codec = &ProtoCodec{} +// TODO: Need to define a type like `type ProtoAny anypb.Any` so that we are +// mixing protobufs and a centralized codec structure throughout the codebase. type ProtoCodec struct{} func (p *ProtoCodec) Marshal(message proto.Message) ([]byte, error) { diff --git a/shared/docs/flows/AppHash.md b/shared/docs/flows/AppHash.md new file mode 100644 index 000000000..c17f3cd41 --- /dev/null +++ b/shared/docs/flows/AppHash.md @@ -0,0 +1,40 @@ +# AppHash + +## Context Initialization + +```mermaid +sequenceDiagram + %% autonumber + participant N as Node + participant C as Consensus + participant U as Utility + participant P as Persistence + participant PP as Persistence (PostgresDB) + participant PM as Persistence (MerkleTree) + participant P2P as P2P + + %% Should this be P2P? + N-->>C: HandleMessage(anypb.Any) + critical NewRound Message + C->>+U: NewContext(height) + U->>P: NewRWContext(height) + P->>U: PersistenceRWContext + U->>U: Store persistenceContext + U->>-C: UtilityContext + C->>C: Store utilityContext + Note over C, PM: See 'Block Application' + end + Note over N, P2P: Hotstuff lifecycle + N-->>C: HandleMessage(anypb.Any) + critical Decide Message + Note over C, PM: See 'Block Commit' + end +``` + +## Block Application + +TODO(olshansky): Add a sequenceDiagram here. + +## Block Commit + +TODO(olshansky): Add a sequenceDiagram here. diff --git a/shared/docs/flows/README.md b/shared/docs/flows/README.md new file mode 100644 index 000000000..cb4e5d663 --- /dev/null +++ b/shared/docs/flows/README.md @@ -0,0 +1,3 @@ +# Flows + +The purpose of [shared/flows](./) is to document cross-module communication for end-to-end behaviour that has cross-module dependencies and and depends on the interfaces exposed by the interfaces exposed in [shared/modules](../modules). diff --git a/shared/modules/consensus_module.go b/shared/modules/consensus_module.go index c6c6213ed..62a87c368 100644 --- a/shared/modules/consensus_module.go +++ b/shared/modules/consensus_module.go @@ -7,7 +7,8 @@ import ( "google.golang.org/protobuf/types/known/anypb" ) -type ValidatorMap map[string]Actor // TODO (Drewsky) deprecate Validator map or populate from persistence module +// TODO(olshansky): deprecate ValidatorMap or populate from persistence module +type ValidatorMap map[string]Actor // NOTE: Consensus is the core of the replicated state machine and is driven by various asynchronous events. // Consider adding a mutex lock to your implementation that is acquired at the beginning of each entrypoint/function implemented in this interface. @@ -15,11 +16,11 @@ type ValidatorMap map[string]Actor // TODO (Drewsky) deprecate Validator map or type ConsensusModule interface { Module - // Consensus Engine + // Consensus Engine Handlers HandleMessage(*anypb.Any) error HandleDebugMessage(*debug.DebugMessage) error - // Consensus State + // Consensus State Accessors CurrentHeight() uint64 AppHash() string // DISCUSS: Why not call this a BlockHash or StateHash? Should it be a []byte or string? ValidatorMap() ValidatorMap // TODO: This needs to be dynamically updated during various operations and network changes. diff --git a/shared/test_artifacts/generator.go b/shared/test_artifacts/generator.go index 6d6923d32..a48fbc7a3 100644 --- a/shared/test_artifacts/generator.go +++ b/shared/test_artifacts/generator.go @@ -2,11 +2,12 @@ package test_artifacts import ( "fmt" + "math/big" + "strconv" + typesPersistence "github.com/pokt-network/pocket/persistence/types" "github.com/pokt-network/pocket/shared/modules" "github.com/pokt-network/pocket/utility/types" - "math/big" - "strconv" "github.com/pokt-network/pocket/shared/crypto" "google.golang.org/protobuf/types/known/timestamppb"