Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions consensus/spos/bls/proxy/subroundsHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func getDefaultArgumentsSubroundHandler() (*SubroundsHandlerArgs, *spos.Consensu
consensusCore.SetEnableEpochsHandler(epochsEnable)
consensusCore.SetEquivalentProofsPool(&dataRetriever.ProofsPoolMock{})
consensusCore.SetEpochNotifier(epochNotifier)
consensusCore.SetInvalidSignersCache(&consensus.InvalidSignersCacheMock{})
handlerArgs.ConsensusCoreHandler = consensusCore

return handlerArgs, consensusCore
Expand Down
26 changes: 18 additions & 8 deletions consensus/spos/bls/v2/subroundEndRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ func (sr *subroundEndRound) receivedInvalidSignersInfo(_ context.Context, cnsDta
return false
}

invalidSignersHash := sr.Hasher().Compute(string(cnsDta.InvalidSigners))
invalidSignersCache := sr.InvalidSignersCache()
if invalidSignersCache.HasInvalidSigners(string(invalidSignersHash)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are different number of invalidSigners: first time 2: A and B, and in on a second message it gets only A, it seems that the message will be propagated 2 times?

return false
}

err := sr.verifyInvalidSigners(cnsDta.InvalidSigners)
if err != nil {
log.Trace("receivedInvalidSignersInfo.verifyInvalidSigners", "error", err.Error())
Expand All @@ -158,6 +164,8 @@ func (sr *subroundEndRound) receivedInvalidSignersInfo(_ context.Context, cnsDta

log.Debug("step 3: invalid signers info has been evaluated")

invalidSignersCache.AddInvalidSigners(string(invalidSignersHash))

sr.PeerHonestyHandler().ChangeScore(
messageSender,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
Expand Down Expand Up @@ -501,24 +509,23 @@ func (sr *subroundEndRound) handleInvalidSignersOnAggSigFail() ([]byte, []byte,
invalidPubKeys, err := sr.verifyNodesOnAggSigFail(ctx)
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to the changes from this PR, but this cancel() doesn't seem right to call it after the "intensive". Maybe move it up 1 line and call it on a defer?

if err != nil {
log.Debug("doEndRoundJobByNode.verifyNodesOnAggSigFail", "error", err.Error())
log.Debug("handleInvalidSignersOnAggSigFail.verifyNodesOnAggSigFail", "error", err.Error())
return nil, nil, err
}

_, err = sr.getFullMessagesForInvalidSigners(invalidPubKeys)
invalidSigners, err := sr.getFullMessagesForInvalidSigners(invalidPubKeys)
if err != nil {
log.Debug("doEndRoundJobByNode.getFullMessagesForInvalidSigners", "error", err.Error())
log.Debug("handleInvalidSignersOnAggSigFail.getFullMessagesForInvalidSigners", "error", err.Error())
return nil, nil, err
}

// TODO: handle invalid signers broadcast without flooding the network
// if len(invalidSigners) > 0 {
// sr.createAndBroadcastInvalidSigners(invalidSigners)
// }
if len(invalidSigners) > 0 {
sr.createAndBroadcastInvalidSigners(invalidSigners)
}

bitmap, sig, err := sr.computeAggSigOnValidNodes()
if err != nil {
log.Debug("doEndRoundJobByNode.computeAggSigOnValidNodes", "error", err.Error())
log.Debug("handleInvalidSignersOnAggSigFail.computeAggSigOnValidNodes", "error", err.Error())
return nil, nil, err
}

Expand Down Expand Up @@ -646,6 +653,9 @@ func (sr *subroundEndRound) createAndBroadcastInvalidSigners(invalidSigners []by
invalidSigners,
)

invalidSignersHash := sr.Hasher().Compute(string(invalidSigners))
sr.InvalidSignersCache().AddInvalidSigners(string(invalidSignersHash))

err = sr.BroadcastMessenger().BroadcastConsensusMessage(cnsMsg)
if err != nil {
log.Debug("doEndRoundJob.BroadcastConsensusMessage", "error", err.Error())
Expand Down
44 changes: 41 additions & 3 deletions consensus/spos/bls/v2/subroundEndRound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,27 @@ func TestSubroundEndRound_ReceivedInvalidSignersInfo(t *testing.T) {
res := sr.ReceivedInvalidSignersInfo(&cnsData)
assert.False(t, res)
})
t.Run("invalid signers cache already has this message", func(t *testing.T) {
t.Parallel()

container := consensusMocks.InitConsensusCore()
invalidSignersCache := &consensusMocks.InvalidSignersCacheMock{
HasInvalidSignersCalled: func(hash string) bool {
return true
},
}
container.SetInvalidSignersCache(invalidSignersCache)

sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{})
cnsData := consensus.Message{
BlockHeaderHash: []byte("X"),
PubKey: []byte("A"),
InvalidSigners: []byte("invalidSignersData"),
}

res := sr.ReceivedInvalidSignersInfo(&cnsData)
assert.False(t, res)
})
t.Run("invalid signers data", func(t *testing.T) {
t.Parallel()

Expand All @@ -1528,6 +1549,13 @@ func TestSubroundEndRound_ReceivedInvalidSignersInfo(t *testing.T) {
t.Parallel()

container := consensusMocks.InitConsensusCore()
wasAddInvalidSignersCalled := false
invalidSignersCache := &consensusMocks.InvalidSignersCacheMock{
AddInvalidSignersCalled: func(hash string) {
wasAddInvalidSignersCalled = true
},
}
container.SetInvalidSignersCache(invalidSignersCache)

sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{})
sr.SetHeader(&block.HeaderV2{
Expand All @@ -1541,6 +1569,7 @@ func TestSubroundEndRound_ReceivedInvalidSignersInfo(t *testing.T) {

res := sr.ReceivedInvalidSignersInfo(&cnsData)
assert.True(t, res)
require.True(t, wasAddInvalidSignersCalled)
})
}

Expand Down Expand Up @@ -1704,25 +1733,34 @@ func TestSubroundEndRound_CreateAndBroadcastInvalidSigners(t *testing.T) {

expectedInvalidSigners := []byte("invalid signers")

wasCalled := false
wasBroadcastConsensusMessageCalled := false
container := consensusMocks.InitConsensusCore()
messenger := &consensusMocks.BroadcastMessengerMock{
BroadcastConsensusMessageCalled: func(message *consensus.Message) error {
assert.Equal(t, expectedInvalidSigners, message.InvalidSigners)
wasCalled = true
wasBroadcastConsensusMessageCalled = true
wg.Done()
return nil
},
}
container.SetBroadcastMessenger(messenger)

wasAddInvalidSignersCalled := false
invalidSignersCache := &consensusMocks.InvalidSignersCacheMock{
AddInvalidSignersCalled: func(hash string) {
wasAddInvalidSignersCalled = true
},
}
container.SetInvalidSignersCache(invalidSignersCache)
sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{})
sr.SetSelfPubKey("A")

sr.CreateAndBroadcastInvalidSigners(expectedInvalidSigners)

wg.Wait()

require.True(t, wasCalled)
require.True(t, wasBroadcastConsensusMessageCalled)
require.True(t, wasAddInvalidSignersCalled)
})
}

Expand Down
1 change: 1 addition & 0 deletions consensus/spos/bls/v2/subroundStartRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (sr *subroundStartRound) doStartRoundJob(_ context.Context) bool {
topic := spos.GetConsensusTopicID(sr.ShardCoordinator())
sr.GetAntiFloodHandler().ResetForTopic(topic)
sr.worker.ResetConsensusMessages()
sr.worker.ResetInvalidSignersCache()

return true
}
Expand Down
13 changes: 13 additions & 0 deletions consensus/spos/consensusCore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type ConsensusCore struct {
enableEpochsHandler common.EnableEpochsHandler
equivalentProofsPool consensus.EquivalentProofsPool
epochNotifier process.EpochNotifier
invalidSignersCache InvalidSignersCache
}

// ConsensusCoreArgs store all arguments that are needed to create a ConsensusCore object
Expand Down Expand Up @@ -72,6 +73,7 @@ type ConsensusCoreArgs struct {
EnableEpochsHandler common.EnableEpochsHandler
EquivalentProofsPool consensus.EquivalentProofsPool
EpochNotifier process.EpochNotifier
InvalidSignersCache InvalidSignersCache
}

// NewConsensusCore creates a new ConsensusCore instance
Expand Down Expand Up @@ -104,6 +106,7 @@ func NewConsensusCore(
enableEpochsHandler: args.EnableEpochsHandler,
equivalentProofsPool: args.EquivalentProofsPool,
epochNotifier: args.EpochNotifier,
invalidSignersCache: args.InvalidSignersCache,
}

err := ValidateConsensusCore(consensusCore)
Expand Down Expand Up @@ -239,6 +242,11 @@ func (cc *ConsensusCore) EquivalentProofsPool() consensus.EquivalentProofsPool {
return cc.equivalentProofsPool
}

// InvalidSignersCache returns the invalid signers cache component
func (cc *ConsensusCore) InvalidSignersCache() InvalidSignersCache {
return cc.invalidSignersCache
}

// SetBlockchain sets blockchain handler
func (cc *ConsensusCore) SetBlockchain(blockChain data.ChainHandler) {
cc.blockChain = blockChain
Expand Down Expand Up @@ -364,6 +372,11 @@ func (cc *ConsensusCore) SetEpochNotifier(epochNotifier process.EpochNotifier) {
cc.epochNotifier = epochNotifier
}

// SetInvalidSignersCache sets the invalid signers cache
func (cc *ConsensusCore) SetInvalidSignersCache(cache InvalidSignersCache) {
cc.invalidSignersCache = cache
}

// IsInterfaceNil returns true if there is no value under the interface
func (cc *ConsensusCore) IsInterfaceNil() bool {
return cc == nil
Expand Down
3 changes: 3 additions & 0 deletions consensus/spos/consensusCoreValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func ValidateConsensusCore(container ConsensusCoreHandler) error {
if check.IfNil(container.EpochStartRegistrationHandler()) {
return ErrNilEpochStartNotifier
}
if check.IfNil(container.InvalidSignersCache()) {
return ErrNilInvalidSignersCache
}

return nil
}
13 changes: 13 additions & 0 deletions consensus/spos/consensusCoreValidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func initConsensusDataContainer() *spos.ConsensusCore {
enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{}
proofsPool := &dataRetriever.ProofsPoolMock{}
epochNotifier := &epochNotifierMock.EpochNotifierStub{}
invalidSignersCache := &consensusMocks.InvalidSignersCacheMock{}

consensusCore, _ := spos.NewConsensusCore(&spos.ConsensusCoreArgs{
BlockChain: blockChain,
Expand Down Expand Up @@ -73,6 +74,7 @@ func initConsensusDataContainer() *spos.ConsensusCore {
EnableEpochsHandler: enableEpochsHandler,
EquivalentProofsPool: proofsPool,
EpochNotifier: epochNotifier,
InvalidSignersCache: invalidSignersCache,
})

return consensusCore
Expand Down Expand Up @@ -372,6 +374,17 @@ func TestConsensusContainerValidator_ValidateNilEpochStartRegistrationHandlerSho
assert.Equal(t, spos.ErrNilEpochStartNotifier, err)
}

func TestConsensusContainerValidator_ValidateNilInvalidSignersCacheShouldFail(t *testing.T) {
t.Parallel()

container := initConsensusDataContainer()
container.SetInvalidSignersCache(nil)

err := spos.ValidateConsensusCore(container)

assert.Equal(t, spos.ErrNilInvalidSignersCache, err)
}

func TestConsensusContainerValidator_ShouldWork(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions consensus/spos/consensusCore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func createDefaultConsensusCoreArgs() *spos.ConsensusCoreArgs {
EnableEpochsHandler: consensusCoreMock.EnableEpochsHandler(),
EquivalentProofsPool: consensusCoreMock.EquivalentProofsPool(),
EpochNotifier: consensusCoreMock.EpochNotifier(),
InvalidSignersCache: &consensus.InvalidSignersCacheMock{},
}
return args
}
Expand Down
6 changes: 6 additions & 0 deletions consensus/spos/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,9 @@ var ErrNilEpochNotifier = errors.New("nil epoch notifier")

// ErrNilEpochStartNotifier signals that nil epoch start notifier has been provided
var ErrNilEpochStartNotifier = errors.New("nil epoch start notifier")

// ErrInvalidSignersAlreadyReceived signals that an invalid signers message has been already received
var ErrInvalidSignersAlreadyReceived = errors.New("invalid signers already received")

// ErrNilInvalidSignersCache signals that nil invalid signers has been provided
var ErrNilInvalidSignersCache = errors.New("nil invalid signers cache")
11 changes: 11 additions & 0 deletions consensus/spos/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type ConsensusCoreHandler interface {
EnableEpochsHandler() common.EnableEpochsHandler
EquivalentProofsPool() consensus.EquivalentProofsPool
EpochNotifier() process.EpochNotifier
InvalidSignersCache() InvalidSignersCache
IsInterfaceNil() bool
}

Expand Down Expand Up @@ -127,6 +128,8 @@ type WorkerHandler interface {
ResetConsensusMessages()
// ResetConsensusRoundState resets the consensus round state when transitioning to a different consensus version
ResetConsensusRoundState()
// ResetInvalidSignersCache resets the invalid signers cache
ResetInvalidSignersCache()
// IsInterfaceNil returns true if there is no value under the interface
IsInterfaceNil() bool
}
Expand Down Expand Up @@ -265,3 +268,11 @@ type RoundThresholdHandler interface {
FallbackThreshold(subroundId int) int
SetFallbackThreshold(subroundId int, threshold int)
}

// InvalidSignersCache encapsulates the methods needed for a invalid signers cache
type InvalidSignersCache interface {
AddInvalidSigners(hash string)
HasInvalidSigners(hash string) bool
Reset()
IsInterfaceNil() bool
}
49 changes: 49 additions & 0 deletions consensus/spos/invalidSignersCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package spos

import "sync"

type invalidSignersCache struct {
sync.RWMutex
invalidSignersMap map[string]struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, maybe sync.Map would have also worked (shorter code)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applied

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

back to map + mut due to reset method

}

// NewInvalidSignersCache returns a new instance of invalidSignersCache
func NewInvalidSignersCache() *invalidSignersCache {
return &invalidSignersCache{
invalidSignersMap: make(map[string]struct{}),
}
}

// AddInvalidSigners adds the provided hash into the internal map if it does not exist
func (cache *invalidSignersCache) AddInvalidSigners(hash string) {
if len(hash) == 0 {
return
}

cache.Lock()
defer cache.Unlock()

cache.invalidSignersMap[hash] = struct{}{}
}

// HasInvalidSigners check whether the provided hash exists in int internal map or not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in int 🙃

func (cache *invalidSignersCache) HasInvalidSigners(hash string) bool {
cache.RLock()
defer cache.RUnlock()

_, has := cache.invalidSignersMap[hash]
return has
}

// Reset clears the internal map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maps.

func (cache *invalidSignersCache) Reset() {
cache.Lock()
defer cache.Unlock()

cache.invalidSignersMap = make(map[string]struct{})
}

// IsInterfaceNil returns true if there is no value under the interface
func (cache *invalidSignersCache) IsInterfaceNil() bool {
return cache == nil
}
Loading