Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/block"
crypto "github.com/multiversx/mx-chain-crypto-go"
"github.com/multiversx/mx-chain-go/config"
"github.com/multiversx/mx-chain-go/trie/statistics"
)
Expand Down Expand Up @@ -332,6 +333,27 @@ type EnableEpochsHandler interface {
IsInterfaceNil() bool
}

// ManagedPeersHolder defines the operations of an entity that holds managed identities for a node
type ManagedPeersHolder interface {
AddManagedPeer(privateKeyBytes []byte) error
GetPrivateKey(pkBytes []byte) (crypto.PrivateKey, error)
GetP2PIdentity(pkBytes []byte) ([]byte, core.PeerID, error)
GetMachineID(pkBytes []byte) (string, error)
GetNameAndIdentity(pkBytes []byte) (string, string, error)
IncrementRoundsWithoutReceivedMessages(pkBytes []byte)
ResetRoundsWithoutReceivedMessages(pkBytes []byte)
GetManagedKeysByCurrentNode() map[string]crypto.PrivateKey
IsKeyManagedByCurrentNode(pkBytes []byte) bool
IsKeyRegistered(pkBytes []byte) bool
IsPidManagedByCurrentNode(pid core.PeerID) bool
IsKeyValidator(pkBytes []byte) bool
SetValidatorState(pkBytes []byte, state bool)
GetNextPeerAuthenticationTime(pkBytes []byte) (time.Time, error)
SetNextPeerAuthenticationTime(pkBytes []byte, nextTime time.Time)
IsMultiKeyMode() bool
IsInterfaceNil() bool
}

// ChainParametersSubscriptionHandler defines the behavior of a chain parameters subscription handler
type ChainParametersSubscriptionHandler interface {
ChainParametersChanged(chainParameters config.ChainParametersByEpochConfig)
Expand Down
3 changes: 3 additions & 0 deletions consensus/spos/bls/subroundStartRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ func (sr *subroundStartRound) generateNextConsensusGroup(roundIndex int64) error

sr.SetConsensusGroup(nextConsensusGroup)

consensusGroupSizeForEpoch := sr.NodesCoordinator().ConsensusGroupSizeForShardAndEpoch(shardId, currentHeader.GetEpoch())
sr.SetConsensusGroupSize(consensusGroupSizeForEpoch)

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/spos/roundConsensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (rcns *roundConsensus) ConsensusGroupSize() int {
}

// SetConsensusGroupSize sets the consensus group size
func (rcns *roundConsensus) SetConsensusGroupSize(consensusGroudpSize int) {
rcns.consensusGroupSize = consensusGroudpSize
func (rcns *roundConsensus) SetConsensusGroupSize(consensusGroupSize int) {
rcns.consensusGroupSize = consensusGroupSize
}

// SelfPubKey returns selfPubKey ID
Expand Down
27 changes: 15 additions & 12 deletions factory/consensus/consensusComponents.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/multiversx/mx-chain-go/process/sync"
"github.com/multiversx/mx-chain-go/process/sync/storageBootstrap"
"github.com/multiversx/mx-chain-go/sharding"
nodesCoord "github.com/multiversx/mx-chain-go/sharding/nodesCoordinator"
"github.com/multiversx/mx-chain-go/state/syncer"
trieFactory "github.com/multiversx/mx-chain-go/trie/factory"
"github.com/multiversx/mx-chain-go/trie/statistics"
Expand Down Expand Up @@ -76,7 +77,6 @@ type consensusComponents struct {
worker factory.ConsensusWorker
peerBlacklistHandler consensus.PeerBlacklistHandler
consensusTopic string
consensusGroupSize int
}

// NewConsensusComponentsFactory creates an instance of consensusComponentsFactory
Expand Down Expand Up @@ -136,13 +136,6 @@ func (ccf *consensusComponentsFactory) Create() (*consensusComponents, error) {
}
cc := &consensusComponents{}

consensusGroupSize, err := getConsensusGroupSize(ccf.coreComponents.GenesisNodesSetup(), ccf.processComponents.ShardCoordinator())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if err != nil {
return nil, err
}

cc.consensusGroupSize = int(consensusGroupSize)

blockchain := ccf.dataComponents.Blockchain()
notInitializedGenesisBlock := len(blockchain.GetGenesisHeaderHash()) == 0 ||
check.IfNil(blockchain.GetGenesisHeader())
Expand All @@ -163,7 +156,12 @@ func (ccf *consensusComponentsFactory) Create() (*consensusComponents, error) {
cc.bootstrapper.StartSyncingBlocks()

epoch := ccf.getEpoch()
consensusState, err := ccf.createConsensusState(epoch, cc.consensusGroupSize)

consensusGroupSize, err := getConsensusGroupSize(ccf.coreComponents.GenesisNodesSetup(), ccf.processComponents.ShardCoordinator(), ccf.processComponents.NodesCoordinator(), epoch)
if err != nil {
return nil, err
}
consensusState, err := ccf.createConsensusState(epoch, consensusGroupSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -741,12 +739,17 @@ func (ccf *consensusComponentsFactory) checkArgs() error {
return nil
}

func getConsensusGroupSize(nodesConfig sharding.GenesisNodesSetupHandler, shardCoordinator sharding.Coordinator) (uint32, error) {
func getConsensusGroupSize(nodesConfig sharding.GenesisNodesSetupHandler, shardCoordinator sharding.Coordinator, nodesCoordinator nodesCoord.NodesCoordinator, epoch uint32) (int, error) {
consensusGroupSize := nodesCoordinator.ConsensusGroupSizeForShardAndEpoch(shardCoordinator.SelfId(), epoch)
if consensusGroupSize > 0 {
return consensusGroupSize, nil
}

if shardCoordinator.SelfId() == core.MetachainShardId {
return nodesConfig.GetMetaConsensusGroupSize(), nil
return int(nodesConfig.GetMetaConsensusGroupSize()), nil
}
if shardCoordinator.SelfId() < shardCoordinator.NumberOfShards() {
return nodesConfig.GetShardConsensusGroupSize(), nil
return int(nodesConfig.GetShardConsensusGroupSize()), nil
}

return 0, sharding.ErrShardIdOutOfRange
Expand Down
12 changes: 0 additions & 12 deletions factory/consensus/consensusComponentsHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,6 @@ func (mcc *managedConsensusComponents) BroadcastMessenger() consensus.BroadcastM
return mcc.consensusComponents.broadcastMessenger
}

// ConsensusGroupSize returns the consensus group size
func (mcc *managedConsensusComponents) ConsensusGroupSize() (int, error) {
mcc.mutConsensusComponents.RLock()
defer mcc.mutConsensusComponents.RUnlock()

if mcc.consensusComponents == nil {
return 0, errors.ErrNilConsensusComponentsHolder
}

return mcc.consensusComponents.consensusGroupSize, nil
}

// CheckSubcomponents verifies all subcomponents
func (mcc *managedConsensusComponents) CheckSubcomponents() error {
mcc.mutConsensusComponents.RLock()
Expand Down
1 change: 0 additions & 1 deletion factory/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ type ConsensusComponentsHolder interface {
Chronology() consensus.ChronologyHandler
ConsensusWorker() ConsensusWorker
BroadcastMessenger() consensus.BroadcastMessenger
ConsensusGroupSize() (int, error)
Bootstrapper() process.Bootstrapper
IsInterfaceNil() bool
}
Expand Down
1 change: 0 additions & 1 deletion integrationTests/testConsensusNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ func (tcn *TestConsensusNode) initNode(args ArgsTestConsensusNode) {
node.WithStateComponents(stateComponents),
node.WithNetworkComponents(networkComponents),
node.WithRoundDuration(args.RoundTime),
node.WithConsensusGroupSize(args.ConsensusSize),
node.WithConsensusType(args.ConsensusType),
node.WithGenesisTime(time.Unix(args.StartTime, 0)),
node.WithValidatorSignatureSize(signatureSize),
Expand Down
6 changes: 0 additions & 6 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ type filter interface {
type Node struct {
initialNodesPubkeys map[uint32][]string
roundDuration uint64
consensusGroupSize int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor conflicts here when we will bring the rc/v1.5.0 in rc/v1.6.0 because I've added unit test for the getter & setter of the consensus group size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll happily delete those unit tests when merging 😄

genesisTime time.Time
peerDenialEvaluator p2p.PeerDenialEvaluator
esdtStorageHandler vmcommon.ESDTNFTStorageHandler
Expand Down Expand Up @@ -148,11 +147,6 @@ func (n *Node) CreateShardedStores() error {
return nil
}

// GetConsensusGroupSize returns the configured consensus size
func (n *Node) GetConsensusGroupSize() int {
return n.consensusGroupSize
}

// GetBalance gets the balance for a specific address
func (n *Node) GetBalance(address string, options api.AccountQueryOptions) (*big.Int, api.BlockInfo, error) {
userAccount, blockInfo, err := n.loadUserAccountHandlerByAddress(address, options)
Expand Down
6 changes: 0 additions & 6 deletions node/nodeHelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ func CreateNode(

genesisTime := time.Unix(coreComponents.GenesisNodesSetup().GetStartTime(), 0)

consensusGroupSize, err := consensusComponents.ConsensusGroupSize()
if err != nil {
return nil, err
}

var nd *Node
nd, err = NewNode(
WithStatusCoreComponents(statusCoreComponents),
Expand All @@ -86,7 +81,6 @@ func CreateNode(
WithNetworkComponents(networkComponents),
WithInitialNodesPubKeys(coreComponents.GenesisNodesSetup().InitialNodesPubKeys()),
WithRoundDuration(coreComponents.GenesisNodesSetup().GetRoundDuration()),
WithConsensusGroupSize(consensusGroupSize),
WithGenesisTime(genesisTime),
WithConsensusType(config.Consensus.Type),
WithBootstrapRoundIndex(bootstrapRoundIndex),
Expand Down
12 changes: 0 additions & 12 deletions node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,6 @@ func WithRoundDuration(roundDuration uint64) Option {
}
}

// WithConsensusGroupSize sets up the consensus group size option for the Node
func WithConsensusGroupSize(consensusGroupSize int) Option {
return func(n *Node) error {
if consensusGroupSize < 1 {
return ErrNegativeOrZeroConsensusGroupSize
}
log.Info("consensus group", "size", consensusGroupSize)
n.consensusGroupSize = consensusGroupSize
return nil
}
}

// WithGenesisTime sets up the genesis time option for the Node
func WithGenesisTime(genesisTime time.Time) Option {
return func(n *Node) error {
Expand Down
26 changes: 0 additions & 26 deletions node/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,6 @@ func TestWithRoundDuration_ShouldWork(t *testing.T) {
assert.Nil(t, err)
}

func TestWithConsensusGroupSize_NegativeGroupSizeShouldErr(t *testing.T) {
t.Parallel()

node, _ := NewNode()

opt := WithConsensusGroupSize(-1)
err := opt(node)

assert.Equal(t, 0, node.consensusGroupSize)
assert.Equal(t, ErrNegativeOrZeroConsensusGroupSize, err)
}

func TestWithConsensusGroupSize_ShouldWork(t *testing.T) {
t.Parallel()

node, _ := NewNode()

groupSize := 567

opt := WithConsensusGroupSize(groupSize)
err := opt(node)

assert.True(t, node.consensusGroupSize == groupSize)
assert.Nil(t, err)
}

func TestWithGenesisTime(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 2 additions & 1 deletion process/throttle/antiflood/p2pAntiflood_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/multiversx/mx-chain-go/process/throttle/antiflood"
"github.com/multiversx/mx-chain-go/process/throttle/antiflood/disabled"
"github.com/multiversx/mx-chain-go/testscommon/commonmocks"
"github.com/multiversx/mx-chain-go/testscommon/p2pmocks"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -496,7 +497,7 @@ func TestP2pAntiflood_ConcurrentOperations(t *testing.T) {
case 2:
_ = afm.Close()
case 3:
_ = afm.CanProcessMessage(&mock.P2PMessageMock{}, "peer")
_ = afm.CanProcessMessage(&p2pmocks.P2PMessageMock{}, "peer")
case 4:
afm.BlacklistPeer("peer", "reason", time.Millisecond)
case 5:
Expand Down
94 changes: 94 additions & 0 deletions sharding/nodesCoordinator/indexHashedNodesCoordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2519,3 +2519,97 @@ func TestIndexHashedNodesCoordinator_GetShardValidatorInfoData(t *testing.T) {
require.Equal(t, svi, shardValidatorInfo)
})
}

func TestNodesCoordinator_CustomConsensusGroupSize(t *testing.T) {
arguments := createArguments()
eligibleMap := createDummyNodesMap(3, 2, "eligible")
waitingMap := createDummyNodesMap(0, 2, "waiting")
arguments.EligibleNodes = eligibleMap
arguments.WaitingNodes = waitingMap
arguments.ValidatorInfoCacher = dataPool.NewCurrentEpochValidatorInfoPool()
arguments.ChainParametersHandler = &shardingmock.ChainParametersHandlerStub{
ChainParametersForEpochCalled: func(epoch uint32) (config.ChainParametersByEpochConfig, error) {
if epoch < 3 {
return config.ChainParametersByEpochConfig{
ShardConsensusGroupSize: 2,
ShardMinNumNodes: 2,
MetachainConsensusGroupSize: 3,
MetachainMinNumNodes: 3,
}, nil
}

if epoch < 6 {
return config.ChainParametersByEpochConfig{
ShardConsensusGroupSize: 3,
ShardMinNumNodes: 3,
MetachainConsensusGroupSize: 3,
MetachainMinNumNodes: 3,
}, nil
}

if epoch < 9 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 nice approach to avoid calls like if (epoch < 9) && (epoch >= 6) {...

return config.ChainParametersByEpochConfig{
ShardConsensusGroupSize: 3,
ShardMinNumNodes: 3,
MetachainConsensusGroupSize: 2,
MetachainMinNumNodes: 2,
}, nil
}

return config.ChainParametersByEpochConfig{
ShardConsensusGroupSize: 3,
ShardMinNumNodes: 3,
MetachainConsensusGroupSize: 3,
MetachainMinNumNodes: 3,
}, nil
},
}

ihnc, _ := NewIndexHashedNodesCoordinator(arguments)
require.NotNil(t, ihnc)

header := &block.MetaBlock{
PrevRandSeed: []byte("rand seed"),
EpochStart: block.EpochStart{LastFinalizedHeaders: []block.EpochStartShardData{{}}},
}

// change to epoch 1 - should have 3 eligible per shard, 3 per meta
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add tests for epochs 2, 3, 4, 6, 8, 9, 10, and 11 to test the correct change conditions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored the unit test and now it checks from epoch 0 to epoch 100

epoch := uint32(1)
header.Epoch = epoch
ihnc.nodesConfig[epoch] = ihnc.nodesConfig[0]
body := createBlockBodyFromNodesCoordinator(ihnc, epoch, ihnc.validatorInfoCacher)
ihnc.EpochStartPrepare(header, body)
ihnc.EpochStartAction(header)
require.Len(t, ihnc.nodesConfig[epoch].eligibleMap[0], 2)
require.Len(t, ihnc.nodesConfig[epoch].eligibleMap[common.MetachainShardId], 3)

// change to epoch 5 - should have 3 eligible per shard, 3 per meta
epoch = 5
header.Epoch = epoch
ihnc.nodesConfig[epoch] = ihnc.nodesConfig[0]
body = createBlockBodyFromNodesCoordinator(ihnc, epoch, ihnc.validatorInfoCacher)
ihnc.EpochStartPrepare(header, body)
ihnc.EpochStartAction(header)
require.Len(t, ihnc.nodesConfig[epoch].eligibleMap[0], 3)
require.Len(t, ihnc.nodesConfig[epoch].eligibleMap[common.MetachainShardId], 3)

// change to epoch 7 - should have 3 eligible per shard, 2 per meta
epoch = 7
header.Epoch = epoch
ihnc.nodesConfig[epoch] = ihnc.nodesConfig[5]
body = createBlockBodyFromNodesCoordinator(ihnc, epoch, ihnc.validatorInfoCacher)
ihnc.EpochStartPrepare(header, body)
ihnc.EpochStartAction(header)
require.Len(t, ihnc.nodesConfig[epoch].eligibleMap[0], 3)
require.Len(t, ihnc.nodesConfig[epoch].eligibleMap[common.MetachainShardId], 2)

// change to epoch 12 - should have 3 eligible per shard, 3 per meta
epoch = 12
header.Epoch = epoch
ihnc.nodesConfig[epoch] = ihnc.nodesConfig[5]
body = createBlockBodyFromNodesCoordinator(ihnc, epoch, ihnc.validatorInfoCacher)
ihnc.EpochStartPrepare(header, body)
ihnc.EpochStartAction(header)
require.Len(t, ihnc.nodesConfig[epoch].eligibleMap[0], 3)
require.Len(t, ihnc.nodesConfig[epoch].eligibleMap[common.MetachainShardId], 3)
}