Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/chainparametersnotifier/chainParametersNotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type chainParametersNotifier struct {
handlers []common.ChainParametersSubscriptionHandler
}

// New creates a new instance of a chainParametersNotifier component
func New() *chainParametersNotifier {
// NewChainParametersNotifier creates a new instance of a chainParametersNotifier component
func NewChainParametersNotifier() *chainParametersNotifier {
return &chainParametersNotifier{
wasInitialized: false,
handlers: make([]common.ChainParametersSubscriptionHandler, 0),
Expand Down
10 changes: 5 additions & 5 deletions common/chainparametersnotifier/chainParametersNotifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (
func TestNew(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestNewChainParametersNotifier ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

t.Parallel()

notifier := New()
notifier := NewChainParametersNotifier()
require.False(t, check.IfNil(notifier))
}

func TestChainParametersNotifier_UpdateCurrentChainParameters(t *testing.T) {
t.Parallel()

notifier := New()
notifier := NewChainParametersNotifier()
require.False(t, check.IfNil(notifier))

chainParams := config.ChainParametersByEpochConfig{
Expand All @@ -46,7 +46,7 @@ func TestChainParametersNotifier_UpdateCurrentChainParameters(t *testing.T) {
func TestChainParametersNotifier_RegisterNotifyHandler(t *testing.T) {
t.Parallel()

notifier := New()
notifier := NewChainParametersNotifier()
require.False(t, check.IfNil(notifier))

// register a nil handler - should not panic
Expand All @@ -66,7 +66,7 @@ func TestChainParametersNotifier_RegisterNotifyHandler(t *testing.T) {
func TestChainParametersNotifier_UnRegisterAll(t *testing.T) {
t.Parallel()

notifier := New()
notifier := NewChainParametersNotifier()
require.False(t, check.IfNil(notifier))

testNotifee := &dummyNotifee{}
Expand All @@ -84,7 +84,7 @@ func TestChainParametersNotifier_UnRegisterAll(t *testing.T) {
func TestChainParametersNotifier_ConcurrentOperations(t *testing.T) {
t.Parallel()

notifier := New()
notifier := NewChainParametersNotifier()

numOperations := 500
wg := sync.WaitGroup{}
Expand Down
2 changes: 1 addition & 1 deletion factory/core/coreComponents.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (ccf *coreComponentsFactory) Create() (*coreComponents, error) {
epochNotifier := forking.NewGenericEpochNotifier()
epochStartHandlerWithConfirm := notifier.NewEpochStartSubscriptionHandler()

chainParametersNotifier := chainparametersnotifier.New()
chainParametersNotifier := chainparametersnotifier.NewChainParametersNotifier()
argsChainParametersHandler := sharding.ArgsChainParametersHolder{
EpochStartEventNotifier: epochStartHandlerWithConfirm,
ChainParameters: ccf.config.GeneralSettings.ChainParametersByEpoch,
Expand Down
6 changes: 5 additions & 1 deletion process/mock/floodPreventerStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ type FloodPreventerStub struct {

// IncreaseLoad -
func (fps *FloodPreventerStub) IncreaseLoad(pid core.PeerID, size uint64) error {
return fps.IncreaseLoadCalled(pid, size)
if fps.IncreaseLoadCalled != nil {
return fps.IncreaseLoadCalled(pid, size)
}

return nil
}

// ApplyConsensusSize -
Expand Down
3 changes: 3 additions & 0 deletions process/throttle/antiflood/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@ package antiflood
import "github.com/multiversx/mx-chain-go/process"

func (af *p2pAntiflood) Debugger() process.AntifloodDebugger {
af.mutDebugger.RLock()
defer af.mutDebugger.RUnlock()

return af.debugger
}
19 changes: 13 additions & 6 deletions process/throttle/antiflood/p2pAntiflood.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type p2pAntiflood struct {
peerValidatorMapper process.PeerValidatorMapper
mapTopicsFromAll map[string]struct{}
mutTopicCheck sync.RWMutex
shardID core.OptionalUint32
shardID uint32
mutShardID sync.RWMutex
}

// NewP2PAntiflood creates a new p2p anti flood protection mechanism built on top of a flood preventer implementation.
Expand Down Expand Up @@ -61,18 +62,21 @@ func NewP2PAntiflood(

// SetConsensusSizeNotifier sets the consensus size notifier
func (af *p2pAntiflood) SetConsensusSizeNotifier(chainParametersNotifier process.ChainParametersSubscriber, shardID uint32) {
af.shardID = core.OptionalUint32{
HasValue: true,
Value: shardID,
}
af.mutShardID.Lock()
af.shardID = shardID
af.mutShardID.Unlock()

chainParametersNotifier.RegisterNotifyHandler(af)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be called on the constructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the shard ID is unknown when creating the antiflood components, I cannot make this change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

// ChainParametersChanged will be called when new chain parameters are confirmed on the network
func (af *p2pAntiflood) ChainParametersChanged(chainParameters config.ChainParametersByEpochConfig) {
af.mutShardID.RLock()
shardID := af.shardID
af.mutShardID.RUnlock()

size := chainParameters.ShardConsensusGroupSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concurrency issues on the usage of the af.shardID ?
Also why do we need the set on L76?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a mutex. the setter is needed for computing the consensus size in case of shard vs meta

if af.shardID.HasValue && af.shardID.Value == core.MetachainShardId {
if shardID == core.MetachainShardId {
size = chainParameters.MetachainConsensusGroupSize
}

Expand Down Expand Up @@ -274,6 +278,9 @@ func (af *p2pAntiflood) BlacklistPeer(peer core.PeerID, reason string, duration

// Close will call the close function on all sub components
func (af *p2pAntiflood) Close() error {
af.mutDebugger.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rlock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

defer af.mutDebugger.Unlock()

return af.debugger.Close()
}

Expand Down
52 changes: 51 additions & 1 deletion process/throttle/antiflood/p2pAntiflood_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package antiflood_test

import (
"errors"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/multiversx/mx-chain-go/process/mock"
"github.com/multiversx/mx-chain-go/process/throttle/antiflood"
"github.com/multiversx/mx-chain-go/process/throttle/antiflood/disabled"
"github.com/multiversx/mx-chain-go/testscommon/commonmocks"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -325,7 +327,7 @@ func TestP2pAntiflood_SetConsensusSizeNotifier(t *testing.T) {
},
)

chainParamsSubscriber := chainparametersnotifier.New()
chainParamsSubscriber := chainparametersnotifier.NewChainParametersNotifier()
afm.SetConsensusSizeNotifier(chainParamsSubscriber, 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use a variable/constant instead of literal 5?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


chainParamsSubscriber.UpdateCurrentChainParameters(config.ChainParametersByEpochConfig{
Expand Down Expand Up @@ -473,3 +475,51 @@ func TestP2pAntiflood_IsOriginatorEligibleForTopic(t *testing.T) {
err = afm.IsOriginatorEligibleForTopic(core.PeerID(validatorPID), "topic")
assert.Nil(t, err)
}

func TestP2pAntiflood_ConcurrentOperations(t *testing.T) {
afm, _ := antiflood.NewP2PAntiflood(
&mock.PeerBlackListHandlerStub{},
&mock.TopicAntiFloodStub{},
&mock.FloodPreventerStub{},
)

numOperations := 500
wg := sync.WaitGroup{}
wg.Add(numOperations)
for i := 0; i < numOperations; i++ {
go func(idx int) {
switch idx {
case 0:
afm.SetConsensusSizeNotifier(&commonmocks.ChainParametersNotifierStub{}, 1)
case 1:
afm.ChainParametersChanged(config.ChainParametersByEpochConfig{})
case 2:
_ = afm.Close()
case 3:
_ = afm.CanProcessMessage(&mock.P2PMessageMock{}, "peer")
case 4:
afm.BlacklistPeer("peer", "reason", time.Millisecond)
case 5:
_ = afm.CanProcessMessagesOnTopic("peer", "topic", 37, 39, []byte("sequence"))
case 6:
_ = afm.IsOriginatorEligibleForTopic("peer", "topic")
case 7:
afm.ResetForTopic("topic")
case 8:
_ = afm.SetDebugger(&disabled.AntifloodDebugger{})
case 9:
afm.SetMaxMessagesForTopic("topic", 37)
case 10:
afm.SetTopicsForAll("topic", "topic1")
case 11:
_ = afm.Debugger()
case 12:
_ = afm.SetPeerValidatorMapper(&mock.PeerShardResolverStub{})
}

wg.Done()
}(i % 13)
}

wg.Wait()
}
9 changes: 9 additions & 0 deletions testscommon/commonmocks/chainParametersNotifierStub.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package commonmocks

import (
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/config"
)

// ChainParametersNotifierStub -
type ChainParametersNotifierStub struct {
ChainParametersChangedCalled func(chainParameters config.ChainParametersByEpochConfig)
UpdateCurrentChainParametersCalled func(params config.ChainParametersByEpochConfig)
RegisterNotifyHandlerCalled func(handler common.ChainParametersSubscriptionHandler)
}

// ChainParametersChanged -
Expand All @@ -24,6 +26,13 @@ func (c *ChainParametersNotifierStub) UpdateCurrentChainParameters(params config
}
}

// RegisterNotifyHandler -
func (c *ChainParametersNotifierStub) RegisterNotifyHandler(handler common.ChainParametersSubscriptionHandler) {
if c.RegisterNotifyHandlerCalled != nil {
c.RegisterNotifyHandlerCalled(handler)
}
}

func (c *ChainParametersNotifierStub) IsInterfaceNil() bool {
return c == nil
}