Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions common/chainparametersnotifier/chainParametersNotifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package chainparametersnotifier

import (
"sync"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/config"
logger "github.com/multiversx/mx-chain-logger-go"
)

var log = logger.GetOrCreate("common/chainparameters")

type chainParametersNotifier struct {
mutData sync.RWMutex
wasInitialized bool
currentChainParameters config.ChainParametersByEpochConfig
mutHandler sync.RWMutex
handlers []common.ChainParametersSubscriptionHandler
}

// NewChainParametersNotifier creates a new instance of a chainParametersNotifier component
func NewChainParametersNotifier() *chainParametersNotifier {
return &chainParametersNotifier{
wasInitialized: false,
handlers: make([]common.ChainParametersSubscriptionHandler, 0),
}
}

// UpdateCurrentChainParameters should be called whenever new chain parameters become active on the network
func (cpn *chainParametersNotifier) UpdateCurrentChainParameters(params config.ChainParametersByEpochConfig) {
cpn.mutData.Lock()
shouldSkipParams := cpn.wasInitialized && cpn.currentChainParameters.EnableEpoch == params.EnableEpoch
if shouldSkipParams {
cpn.mutData.Unlock()

return
}
cpn.wasInitialized = true
cpn.currentChainParameters = params
cpn.mutData.Unlock()

cpn.mutHandler.RLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L32-L46 ensures that the handlers are called only once for each epoch change regardless of the number of UpdateCurrentChainParameters calls. However, if the calls for different epochs are called in extremely fast manner in parallel, some calls on the handlers can be made in a different order. I do not think this will be an issue because the processing is done in sync.

handlersCopy := make([]common.ChainParametersSubscriptionHandler, len(cpn.handlers))
copy(handlersCopy, cpn.handlers)
cpn.mutHandler.RUnlock()

log.Debug("chainParametersNotifier.UpdateCurrentChainParameters",
"enable epoch", params.EnableEpoch,
"shard consensus group size", params.ShardConsensusGroupSize,
"shard min number of nodes", params.ShardMinNumNodes,
"meta consensus group size", params.MetachainConsensusGroupSize,
"meta min number of nodes", params.MetachainMinNumNodes,
"round duration", params.RoundDuration,
"hysteresis", params.Hysteresis,
"adaptivity", params.Adaptivity,
)

for _, handler := range handlersCopy {
handler.ChainParametersChanged(params)
}
}

// RegisterNotifyHandler will register the provided handler to be called whenever chain parameters have changed
func (cpn *chainParametersNotifier) RegisterNotifyHandler(handler common.ChainParametersSubscriptionHandler) {
if check.IfNil(handler) {
return
}

cpn.mutHandler.Lock()
cpn.handlers = append(cpn.handlers, handler)
cpn.mutHandler.Unlock()

cpn.mutData.RLock()
handler.ChainParametersChanged(cpn.currentChainParameters)
cpn.mutData.RUnlock()
}

// CurrentChainParameters returns the current chain parameters
func (cpn *chainParametersNotifier) CurrentChainParameters() config.ChainParametersByEpochConfig {
cpn.mutData.RLock()
defer cpn.mutData.RUnlock()

return cpn.currentChainParameters
}

// UnRegisterAll removes all registered handlers queue
func (cpn *chainParametersNotifier) UnRegisterAll() {
cpn.mutHandler.Lock()
cpn.handlers = make([]common.ChainParametersSubscriptionHandler, 0)
cpn.mutHandler.Unlock()
}

// IsInterfaceNil returns true if there is no value under the interface
func (cpn *chainParametersNotifier) IsInterfaceNil() bool {
return cpn == nil
}
126 changes: 126 additions & 0 deletions common/chainparametersnotifier/chainParametersNotifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package chainparametersnotifier

import (
"sync"
"testing"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-go/config"
"github.com/stretchr/testify/require"
)

func TestNewChainParametersNotifier(t *testing.T) {
t.Parallel()

notifier := NewChainParametersNotifier()
require.False(t, check.IfNil(notifier))
}

func TestChainParametersNotifier_UpdateCurrentChainParameters(t *testing.T) {
t.Parallel()

notifier := NewChainParametersNotifier()
require.False(t, check.IfNil(notifier))

chainParams := config.ChainParametersByEpochConfig{
EnableEpoch: 7,
Adaptivity: true,
Hysteresis: 0.7,
}
notifier.UpdateCurrentChainParameters(chainParams)

resultedChainParams := notifier.CurrentChainParameters()
require.NotNil(t, resultedChainParams)

// update with same epoch but other params - should not change (impossible scenario in production, but easier for tests)
chainParams.Hysteresis = 0.8
notifier.UpdateCurrentChainParameters(chainParams)
require.Equal(t, float32(0.7), notifier.CurrentChainParameters().Hysteresis)

chainParams.Hysteresis = 0.8
chainParams.EnableEpoch = 8
notifier.UpdateCurrentChainParameters(chainParams)
require.Equal(t, float32(0.8), notifier.CurrentChainParameters().Hysteresis)
}

func TestChainParametersNotifier_RegisterNotifyHandler(t *testing.T) {
t.Parallel()

notifier := NewChainParametersNotifier()
require.False(t, check.IfNil(notifier))

// register a nil handler - should not panic
notifier.RegisterNotifyHandler(nil)

testNotifee := &dummyNotifee{}
notifier.RegisterNotifyHandler(testNotifee)

chainParams := config.ChainParametersByEpochConfig{
ShardMinNumNodes: 37,
}
notifier.UpdateCurrentChainParameters(chainParams)

require.Equal(t, chainParams, testNotifee.receivedChainParameters)
}

func TestChainParametersNotifier_UnRegisterAll(t *testing.T) {
t.Parallel()

notifier := NewChainParametersNotifier()
require.False(t, check.IfNil(notifier))

testNotifee := &dummyNotifee{}
notifier.RegisterNotifyHandler(testNotifee)
notifier.UnRegisterAll()

chainParams := config.ChainParametersByEpochConfig{
ShardMinNumNodes: 37,
}
notifier.UpdateCurrentChainParameters(chainParams)

require.Empty(t, testNotifee.receivedChainParameters)
}

func TestChainParametersNotifier_ConcurrentOperations(t *testing.T) {
t.Parallel()

notifier := NewChainParametersNotifier()

numOperations := 500
wg := sync.WaitGroup{}
wg.Add(numOperations)
for i := 0; i < numOperations; i++ {
go func(idx int) {
switch idx {
case 0:
notifier.RegisterNotifyHandler(&dummyNotifee{})
case 1:
_ = notifier.CurrentChainParameters()
case 2:
notifier.UpdateCurrentChainParameters(config.ChainParametersByEpochConfig{})
case 3:
notifier.UnRegisterAll()
case 4:
_ = notifier.IsInterfaceNil()
}

wg.Done()
}(i % 5)
}

wg.Wait()
}

type dummyNotifee struct {
receivedChainParameters config.ChainParametersByEpochConfig
}

// ChainParametersChanged -
func (dn *dummyNotifee) ChainParametersChanged(chainParameters config.ChainParametersByEpochConfig) {
dn.receivedChainParameters = chainParameters
}

// IsInterfaceNil -
func (dn *dummyNotifee) IsInterfaceNil() bool {
return dn == nil
}
6 changes: 1 addition & 5 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,17 +719,13 @@ const PutInStorerMaxTime = time.Second
const DefaultUnstakedEpoch = math.MaxUint32

// InvalidMessageBlacklistDuration represents the time to keep a peer in the black list if it sends a message that
// does not follow the protocol: example not useing the same marshaler as the other peers
// does not follow the protocol: example not using the same marshaler as the other peers
const InvalidMessageBlacklistDuration = time.Second * 3600

// PublicKeyBlacklistDuration represents the time to keep a public key in the black list if it will degrade its
// rating to a minimum threshold due to improper messages
const PublicKeyBlacklistDuration = time.Second * 7200

// WrongP2PMessageBlacklistDuration represents the time to keep a peer id in the blacklist if it sends a message that
// do not follow this protocol
const WrongP2PMessageBlacklistDuration = time.Second * 7200

// InvalidSigningBlacklistDuration defines the time to keep a peer id in blacklist if it signs a message with invalid signature
const InvalidSigningBlacklistDuration = time.Second * 7200

Expand Down
32 changes: 32 additions & 0 deletions common/forking/genericEpochNotifier_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package forking

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-go/common/mock"
"github.com/multiversx/mx-chain-go/testscommon"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -152,3 +154,33 @@ func TestGenericEpochNotifier_CheckEpochInSyncShouldWork(t *testing.T) {
assert.Equal(t, uint32(2), atomic.LoadUint32(&numCalls))
assert.True(t, end.Sub(start) >= handlerWait)
}

func TestGenericEpochNotifier_ConcurrentOperations(t *testing.T) {
t.Parallel()

notifier := NewGenericEpochNotifier()

numOperations := 500
wg := sync.WaitGroup{}
wg.Add(numOperations)
for i := 0; i < numOperations; i++ {
go func(idx int) {
switch idx {
case 0:
notifier.RegisterNotifyHandler(&mock.EpochSubscriberHandlerStub{})
case 1:
_ = notifier.CurrentEpoch()
case 2:
notifier.CheckEpoch(&block.MetaBlock{Epoch: 5})
case 3:
notifier.UnRegisterAll()
case 4:
_ = notifier.IsInterfaceNil()
}

wg.Done()
}(i % 5)
}

wg.Wait()
}
7 changes: 7 additions & 0 deletions common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/block"
crypto "github.com/multiversx/mx-chain-crypto-go"
"github.com/multiversx/mx-chain-go/config"
"github.com/multiversx/mx-chain-go/trie/statistics"
)

Expand Down Expand Up @@ -352,3 +353,9 @@ type ManagedPeersHolder interface {
IsMultiKeyMode() bool
IsInterfaceNil() bool
}

// ChainParametersSubscriptionHandler defines the behavior of a chain parameters subscription handler
type ChainParametersSubscriptionHandler interface {
ChainParametersChanged(chainParameters config.ChainParametersByEpochConfig)
IsInterfaceNil() bool
}
3 changes: 3 additions & 0 deletions consensus/spos/bls/subroundStartRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ func (sr *subroundStartRound) generateNextConsensusGroup(roundIndex int64) error

sr.SetConsensusGroup(nextConsensusGroup)

consensusGroupSizeForEpoch := sr.NodesCoordinator().ConsensusGroupSizeForShardAndEpoch(shardId, currentHeader.GetEpoch())
sr.SetConsensusGroupSize(consensusGroupSizeForEpoch)

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/spos/roundConsensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (rcns *roundConsensus) ConsensusGroupSize() int {
}

// SetConsensusGroupSize sets the consensus group size
func (rcns *roundConsensus) SetConsensusGroupSize(consensusGroudpSize int) {
rcns.consensusGroupSize = consensusGroudpSize
func (rcns *roundConsensus) SetConsensusGroupSize(consensusGroupSize int) {
rcns.consensusGroupSize = consensusGroupSize
}

// SelfPubKey returns selfPubKey ID
Expand Down
4 changes: 2 additions & 2 deletions epochStart/bootstrap/disabled/disabledAntiFloodHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func (a *antiFloodHandler) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _
return nil
}

// ApplyConsensusSize does nothing
func (a *antiFloodHandler) ApplyConsensusSize(_ int) {
// SetConsensusSizeNotifier does nothing
func (a *antiFloodHandler) SetConsensusSizeNotifier(_ process.ChainParametersSubscriber, _ uint32) {
}

// SetDebugger returns nil
Expand Down
Loading