Skip to content

Commit ab675e9

Browse files
Merge pull request #4927 from multiversx/chain-parameters-notifier
chain parameters notifier
2 parents df0b4fd + 961bb62 commit ab675e9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+813
-246
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package chainparametersnotifier
2+
3+
import (
4+
"sync"
5+
6+
"github.com/multiversx/mx-chain-core-go/core/check"
7+
"github.com/multiversx/mx-chain-go/common"
8+
"github.com/multiversx/mx-chain-go/config"
9+
logger "github.com/multiversx/mx-chain-logger-go"
10+
)
11+
12+
var log = logger.GetOrCreate("common/chainparameters")
13+
14+
type chainParametersNotifier struct {
15+
mutData sync.RWMutex
16+
wasInitialized bool
17+
currentChainParameters config.ChainParametersByEpochConfig
18+
mutHandler sync.RWMutex
19+
handlers []common.ChainParametersSubscriptionHandler
20+
}
21+
22+
// NewChainParametersNotifier creates a new instance of a chainParametersNotifier component
23+
func NewChainParametersNotifier() *chainParametersNotifier {
24+
return &chainParametersNotifier{
25+
wasInitialized: false,
26+
handlers: make([]common.ChainParametersSubscriptionHandler, 0),
27+
}
28+
}
29+
30+
// UpdateCurrentChainParameters should be called whenever new chain parameters become active on the network
31+
func (cpn *chainParametersNotifier) UpdateCurrentChainParameters(params config.ChainParametersByEpochConfig) {
32+
cpn.mutData.Lock()
33+
shouldSkipParams := cpn.wasInitialized && cpn.currentChainParameters.EnableEpoch == params.EnableEpoch
34+
if shouldSkipParams {
35+
cpn.mutData.Unlock()
36+
37+
return
38+
}
39+
cpn.wasInitialized = true
40+
cpn.currentChainParameters = params
41+
cpn.mutData.Unlock()
42+
43+
cpn.mutHandler.RLock()
44+
handlersCopy := make([]common.ChainParametersSubscriptionHandler, len(cpn.handlers))
45+
copy(handlersCopy, cpn.handlers)
46+
cpn.mutHandler.RUnlock()
47+
48+
log.Debug("chainParametersNotifier.UpdateCurrentChainParameters",
49+
"enable epoch", params.EnableEpoch,
50+
"shard consensus group size", params.ShardConsensusGroupSize,
51+
"shard min number of nodes", params.ShardMinNumNodes,
52+
"meta consensus group size", params.MetachainConsensusGroupSize,
53+
"meta min number of nodes", params.MetachainMinNumNodes,
54+
"round duration", params.RoundDuration,
55+
"hysteresis", params.Hysteresis,
56+
"adaptivity", params.Adaptivity,
57+
)
58+
59+
for _, handler := range handlersCopy {
60+
handler.ChainParametersChanged(params)
61+
}
62+
}
63+
64+
// RegisterNotifyHandler will register the provided handler to be called whenever chain parameters have changed
65+
func (cpn *chainParametersNotifier) RegisterNotifyHandler(handler common.ChainParametersSubscriptionHandler) {
66+
if check.IfNil(handler) {
67+
return
68+
}
69+
70+
cpn.mutHandler.Lock()
71+
cpn.handlers = append(cpn.handlers, handler)
72+
cpn.mutHandler.Unlock()
73+
74+
cpn.mutData.RLock()
75+
handler.ChainParametersChanged(cpn.currentChainParameters)
76+
cpn.mutData.RUnlock()
77+
}
78+
79+
// CurrentChainParameters returns the current chain parameters
80+
func (cpn *chainParametersNotifier) CurrentChainParameters() config.ChainParametersByEpochConfig {
81+
cpn.mutData.RLock()
82+
defer cpn.mutData.RUnlock()
83+
84+
return cpn.currentChainParameters
85+
}
86+
87+
// UnRegisterAll removes all registered handlers queue
88+
func (cpn *chainParametersNotifier) UnRegisterAll() {
89+
cpn.mutHandler.Lock()
90+
cpn.handlers = make([]common.ChainParametersSubscriptionHandler, 0)
91+
cpn.mutHandler.Unlock()
92+
}
93+
94+
// IsInterfaceNil returns true if there is no value under the interface
95+
func (cpn *chainParametersNotifier) IsInterfaceNil() bool {
96+
return cpn == nil
97+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package chainparametersnotifier
2+
3+
import (
4+
"sync"
5+
"testing"
6+
7+
"github.com/multiversx/mx-chain-core-go/core/check"
8+
"github.com/multiversx/mx-chain-go/config"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestNewChainParametersNotifier(t *testing.T) {
13+
t.Parallel()
14+
15+
notifier := NewChainParametersNotifier()
16+
require.False(t, check.IfNil(notifier))
17+
}
18+
19+
func TestChainParametersNotifier_UpdateCurrentChainParameters(t *testing.T) {
20+
t.Parallel()
21+
22+
notifier := NewChainParametersNotifier()
23+
require.False(t, check.IfNil(notifier))
24+
25+
chainParams := config.ChainParametersByEpochConfig{
26+
EnableEpoch: 7,
27+
Adaptivity: true,
28+
Hysteresis: 0.7,
29+
}
30+
notifier.UpdateCurrentChainParameters(chainParams)
31+
32+
resultedChainParams := notifier.CurrentChainParameters()
33+
require.NotNil(t, resultedChainParams)
34+
35+
// update with same epoch but other params - should not change (impossible scenario in production, but easier for tests)
36+
chainParams.Hysteresis = 0.8
37+
notifier.UpdateCurrentChainParameters(chainParams)
38+
require.Equal(t, float32(0.7), notifier.CurrentChainParameters().Hysteresis)
39+
40+
chainParams.Hysteresis = 0.8
41+
chainParams.EnableEpoch = 8
42+
notifier.UpdateCurrentChainParameters(chainParams)
43+
require.Equal(t, float32(0.8), notifier.CurrentChainParameters().Hysteresis)
44+
}
45+
46+
func TestChainParametersNotifier_RegisterNotifyHandler(t *testing.T) {
47+
t.Parallel()
48+
49+
notifier := NewChainParametersNotifier()
50+
require.False(t, check.IfNil(notifier))
51+
52+
// register a nil handler - should not panic
53+
notifier.RegisterNotifyHandler(nil)
54+
55+
testNotifee := &dummyNotifee{}
56+
notifier.RegisterNotifyHandler(testNotifee)
57+
58+
chainParams := config.ChainParametersByEpochConfig{
59+
ShardMinNumNodes: 37,
60+
}
61+
notifier.UpdateCurrentChainParameters(chainParams)
62+
63+
require.Equal(t, chainParams, testNotifee.receivedChainParameters)
64+
}
65+
66+
func TestChainParametersNotifier_UnRegisterAll(t *testing.T) {
67+
t.Parallel()
68+
69+
notifier := NewChainParametersNotifier()
70+
require.False(t, check.IfNil(notifier))
71+
72+
testNotifee := &dummyNotifee{}
73+
notifier.RegisterNotifyHandler(testNotifee)
74+
notifier.UnRegisterAll()
75+
76+
chainParams := config.ChainParametersByEpochConfig{
77+
ShardMinNumNodes: 37,
78+
}
79+
notifier.UpdateCurrentChainParameters(chainParams)
80+
81+
require.Empty(t, testNotifee.receivedChainParameters)
82+
}
83+
84+
func TestChainParametersNotifier_ConcurrentOperations(t *testing.T) {
85+
t.Parallel()
86+
87+
notifier := NewChainParametersNotifier()
88+
89+
numOperations := 500
90+
wg := sync.WaitGroup{}
91+
wg.Add(numOperations)
92+
for i := 0; i < numOperations; i++ {
93+
go func(idx int) {
94+
switch idx {
95+
case 0:
96+
notifier.RegisterNotifyHandler(&dummyNotifee{})
97+
case 1:
98+
_ = notifier.CurrentChainParameters()
99+
case 2:
100+
notifier.UpdateCurrentChainParameters(config.ChainParametersByEpochConfig{})
101+
case 3:
102+
notifier.UnRegisterAll()
103+
case 4:
104+
_ = notifier.IsInterfaceNil()
105+
}
106+
107+
wg.Done()
108+
}(i % 5)
109+
}
110+
111+
wg.Wait()
112+
}
113+
114+
type dummyNotifee struct {
115+
receivedChainParameters config.ChainParametersByEpochConfig
116+
}
117+
118+
// ChainParametersChanged -
119+
func (dn *dummyNotifee) ChainParametersChanged(chainParameters config.ChainParametersByEpochConfig) {
120+
dn.receivedChainParameters = chainParameters
121+
}
122+
123+
// IsInterfaceNil -
124+
func (dn *dummyNotifee) IsInterfaceNil() bool {
125+
return dn == nil
126+
}

common/constants.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -719,17 +719,13 @@ const PutInStorerMaxTime = time.Second
719719
const DefaultUnstakedEpoch = math.MaxUint32
720720

721721
// InvalidMessageBlacklistDuration represents the time to keep a peer in the black list if it sends a message that
722-
// does not follow the protocol: example not useing the same marshaler as the other peers
722+
// does not follow the protocol: example not using the same marshaler as the other peers
723723
const InvalidMessageBlacklistDuration = time.Second * 3600
724724

725725
// PublicKeyBlacklistDuration represents the time to keep a public key in the black list if it will degrade its
726726
// rating to a minimum threshold due to improper messages
727727
const PublicKeyBlacklistDuration = time.Second * 7200
728728

729-
// WrongP2PMessageBlacklistDuration represents the time to keep a peer id in the blacklist if it sends a message that
730-
// do not follow this protocol
731-
const WrongP2PMessageBlacklistDuration = time.Second * 7200
732-
733729
// InvalidSigningBlacklistDuration defines the time to keep a peer id in blacklist if it signs a message with invalid signature
734730
const InvalidSigningBlacklistDuration = time.Second * 7200
735731

common/forking/genericEpochNotifier_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package forking
22

33
import (
4+
"sync"
45
"sync/atomic"
56
"testing"
67
"time"
78

89
"github.com/multiversx/mx-chain-core-go/core/check"
10+
"github.com/multiversx/mx-chain-core-go/data/block"
911
"github.com/multiversx/mx-chain-go/common/mock"
1012
"github.com/multiversx/mx-chain-go/testscommon"
1113
"github.com/stretchr/testify/assert"
@@ -152,3 +154,33 @@ func TestGenericEpochNotifier_CheckEpochInSyncShouldWork(t *testing.T) {
152154
assert.Equal(t, uint32(2), atomic.LoadUint32(&numCalls))
153155
assert.True(t, end.Sub(start) >= handlerWait)
154156
}
157+
158+
func TestGenericEpochNotifier_ConcurrentOperations(t *testing.T) {
159+
t.Parallel()
160+
161+
notifier := NewGenericEpochNotifier()
162+
163+
numOperations := 500
164+
wg := sync.WaitGroup{}
165+
wg.Add(numOperations)
166+
for i := 0; i < numOperations; i++ {
167+
go func(idx int) {
168+
switch idx {
169+
case 0:
170+
notifier.RegisterNotifyHandler(&mock.EpochSubscriberHandlerStub{})
171+
case 1:
172+
_ = notifier.CurrentEpoch()
173+
case 2:
174+
notifier.CheckEpoch(&block.MetaBlock{Epoch: 5})
175+
case 3:
176+
notifier.UnRegisterAll()
177+
case 4:
178+
_ = notifier.IsInterfaceNil()
179+
}
180+
181+
wg.Done()
182+
}(i % 5)
183+
}
184+
185+
wg.Wait()
186+
}

common/interface.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/multiversx/mx-chain-core-go/data"
99
"github.com/multiversx/mx-chain-core-go/data/block"
1010
crypto "github.com/multiversx/mx-chain-crypto-go"
11+
"github.com/multiversx/mx-chain-go/config"
1112
"github.com/multiversx/mx-chain-go/trie/statistics"
1213
)
1314

@@ -352,3 +353,9 @@ type ManagedPeersHolder interface {
352353
IsMultiKeyMode() bool
353354
IsInterfaceNil() bool
354355
}
356+
357+
// ChainParametersSubscriptionHandler defines the behavior of a chain parameters subscription handler
358+
type ChainParametersSubscriptionHandler interface {
359+
ChainParametersChanged(chainParameters config.ChainParametersByEpochConfig)
360+
IsInterfaceNil() bool
361+
}

consensus/spos/bls/subroundStartRound.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,9 @@ func (sr *subroundStartRound) generateNextConsensusGroup(roundIndex int64) error
322322

323323
sr.SetConsensusGroup(nextConsensusGroup)
324324

325+
consensusGroupSizeForEpoch := sr.NodesCoordinator().ConsensusGroupSizeForShardAndEpoch(shardId, currentHeader.GetEpoch())
326+
sr.SetConsensusGroupSize(consensusGroupSizeForEpoch)
327+
325328
return nil
326329
}
327330

consensus/spos/roundConsensus.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ func (rcns *roundConsensus) ConsensusGroupSize() int {
8888
}
8989

9090
// SetConsensusGroupSize sets the consensus group size
91-
func (rcns *roundConsensus) SetConsensusGroupSize(consensusGroudpSize int) {
92-
rcns.consensusGroupSize = consensusGroudpSize
91+
func (rcns *roundConsensus) SetConsensusGroupSize(consensusGroupSize int) {
92+
rcns.consensusGroupSize = consensusGroupSize
9393
}
9494

9595
// SelfPubKey returns selfPubKey ID

epochStart/bootstrap/disabled/disabledAntiFloodHandler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ func (a *antiFloodHandler) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _
2929
return nil
3030
}
3131

32-
// ApplyConsensusSize does nothing
33-
func (a *antiFloodHandler) ApplyConsensusSize(_ int) {
32+
// SetConsensusSizeNotifier does nothing
33+
func (a *antiFloodHandler) SetConsensusSizeNotifier(_ process.ChainParametersSubscriber, _ uint32) {
3434
}
3535

3636
// SetDebugger returns nil

0 commit comments

Comments
 (0)