Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions consensus/mock/equivalentMessagesDebuggerStub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package mock

// EquivalentMessagesDebuggerStub -
type EquivalentMessagesDebuggerStub struct {
DisplayEquivalentMessagesStatisticsCalled func(getDataHandler func() map[string]uint64)
}

// DisplayEquivalentMessagesStatistics -
func (stub *EquivalentMessagesDebuggerStub) DisplayEquivalentMessagesStatistics(getDataHandler func() map[string]uint64) {
if stub.DisplayEquivalentMessagesStatisticsCalled != nil {
stub.DisplayEquivalentMessagesStatisticsCalled(getDataHandler)
}
}

// IsInterfaceNil -
func (stub *EquivalentMessagesDebuggerStub) IsInterfaceNil() bool {
return stub == nil
}
8 changes: 8 additions & 0 deletions consensus/mock/sposWorkerMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type SposWorkerMock struct {
ReceivedHeaderCalled func(headerHandler data.HeaderHandler, headerHash []byte)
SetAppStatusHandlerCalled func(ash core.AppStatusHandler) error
ResetConsensusMessagesCalled func()
RemoveAllEquivalentMessagesCalled func()
}

// AddReceivedMessageCall -
Expand Down Expand Up @@ -108,6 +109,13 @@ func (sposWorkerMock *SposWorkerMock) ResetConsensusMessages() {
}
}

// RemoveAllEquivalentMessages -
func (sposWorkerMock *SposWorkerMock) RemoveAllEquivalentMessages() {
if sposWorkerMock.RemoveAllEquivalentMessagesCalled != nil {
sposWorkerMock.RemoveAllEquivalentMessagesCalled()
}
}

// IsInterfaceNil returns true if there is no value under the interface
func (sposWorkerMock *SposWorkerMock) IsInterfaceNil() bool {
return sposWorkerMock == nil
Expand Down
1 change: 1 addition & 0 deletions consensus/spos/bls/blsSubroundsFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (fct *factory) generateStartRoundSubround() error {
processingThresholdPercent,
fct.worker.ExecuteStoredMessages,
fct.worker.ResetConsensusMessages,
fct.worker.RemoveAllEquivalentMessages,
)
if err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions consensus/spos/bls/blsSubroundsFactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func executeStoredMessages() {
func resetConsensusMessages() {
}

// removeAllEquivalentMessages removes all equivalent messages
func removeAllEquivalentMessages() {
}

func initRoundHandlerMock() *mock.RoundHandlerMock {
return &mock.RoundHandlerMock{
RoundIndex: 0,
Expand Down
4 changes: 4 additions & 0 deletions consensus/spos/bls/subroundStartRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type subroundStartRound struct {
processingThresholdPercentage int
executeStoredMessages func()
resetConsensusMessages func()
removeAllEquivalentMessages func()

outportHandler outport.OutportHandler
}
Expand All @@ -35,6 +36,7 @@ func NewSubroundStartRound(
processingThresholdPercentage int,
executeStoredMessages func(),
resetConsensusMessages func(),
removeAllEquivalentMessages func(),
) (*subroundStartRound, error) {
err := checkNewSubroundStartRoundParams(
baseSubround,
Expand All @@ -48,6 +50,7 @@ func NewSubroundStartRound(
processingThresholdPercentage: processingThresholdPercentage,
executeStoredMessages: executeStoredMessages,
resetConsensusMessages: resetConsensusMessages,
removeAllEquivalentMessages: removeAllEquivalentMessages,
outportHandler: disabled.NewDisabledOutport(),
outportMutex: sync.RWMutex{},
}
Expand Down Expand Up @@ -95,6 +98,7 @@ func (sr *subroundStartRound) doStartRoundJob(_ context.Context) bool {
topic := spos.GetConsensusTopicID(sr.ShardCoordinator())
sr.GetAntiFloodHandler().ResetForTopic(topic)
sr.resetConsensusMessages()
sr.removeAllEquivalentMessages()
return true
}

Expand Down
8 changes: 8 additions & 0 deletions consensus/spos/bls/subroundStartRound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func defaultSubroundStartRoundFromSubround(sr *spos.Subround) (bls.SubroundStart
bls.ProcessingThresholdPercent,
executeStoredMessages,
resetConsensusMessages,
removeAllEquivalentMessages,
)

return startRound, err
Expand All @@ -35,6 +36,7 @@ func defaultWithoutErrorSubroundStartRoundFromSubround(sr *spos.Subround) bls.Su
bls.ProcessingThresholdPercent,
executeStoredMessages,
resetConsensusMessages,
removeAllEquivalentMessages,
)

return startRound
Expand Down Expand Up @@ -73,6 +75,7 @@ func initSubroundStartRoundWithContainer(container spos.ConsensusCoreHandler) bl
bls.ProcessingThresholdPercent,
executeStoredMessages,
resetConsensusMessages,
removeAllEquivalentMessages,
)

return srStartRound
Expand All @@ -92,6 +95,7 @@ func TestSubroundStartRound_NewSubroundStartRoundNilSubroundShouldFail(t *testin
bls.ProcessingThresholdPercent,
executeStoredMessages,
resetConsensusMessages,
removeAllEquivalentMessages,
)

assert.Nil(t, srStartRound)
Expand Down Expand Up @@ -464,6 +468,7 @@ func TestSubroundStartRound_InitCurrentRoundShouldMetrics(t *testing.T) {
bls.ProcessingThresholdPercent,
displayStatistics,
executeStoredMessages,
removeAllEquivalentMessages,
)
srStartRound.Check()
assert.True(t, wasCalled)
Expand Down Expand Up @@ -506,6 +511,7 @@ func TestSubroundStartRound_InitCurrentRoundShouldMetrics(t *testing.T) {
bls.ProcessingThresholdPercent,
displayStatistics,
executeStoredMessages,
removeAllEquivalentMessages,
)
srStartRound.Check()
assert.True(t, wasCalled)
Expand Down Expand Up @@ -568,6 +574,7 @@ func TestSubroundStartRound_InitCurrentRoundShouldMetrics(t *testing.T) {
bls.ProcessingThresholdPercent,
displayStatistics,
executeStoredMessages,
removeAllEquivalentMessages,
)
srStartRound.Check()
assert.True(t, wasMetricConsensusStateCalled)
Expand Down Expand Up @@ -634,6 +641,7 @@ func TestSubroundStartRound_InitCurrentRoundShouldMetrics(t *testing.T) {
bls.ProcessingThresholdPercent,
displayStatistics,
executeStoredMessages,
removeAllEquivalentMessages,
)
srStartRound.Check()
assert.True(t, wasMetricConsensusStateCalled)
Expand Down
71 changes: 71 additions & 0 deletions consensus/spos/debug/equivalentMessagesDebugger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package debug

import (
"fmt"

"github.com/multiversx/mx-chain-core-go/display"
logger "github.com/multiversx/mx-chain-logger-go"
)

var log = logger.GetOrCreate("debug/equivalentmessages")

type equivalentMessagesDebugger struct {
shouldProcessDataFunc func() bool
}

// NewEquivalentMessagesDebugger returns a new instance of equivalentMessagesDebugger
func NewEquivalentMessagesDebugger() *equivalentMessagesDebugger {
debugger := &equivalentMessagesDebugger{
shouldProcessDataFunc: isLogTrace,
}

return debugger
}

// DisplayEquivalentMessagesStatistics prints all the equivalent messages
func (debugger *equivalentMessagesDebugger) DisplayEquivalentMessagesStatistics(getDataHandler func() map[string]uint64) {
if !debugger.shouldProcessDataFunc() {
return
}
if getDataHandler == nil {
return
}

dataMap := getDataHandler()
log.Trace(fmt.Sprintf("Equivalent messages statistics for current round\n%s", dataToString(dataMap)))
}

func dataToString(data map[string]uint64) string {
header := []string{
"Block header hash",
"Equivalent messages received",
}

lines := make([]*display.LineData, 0, len(data))
idx := 0
for hash, cnt := range data {
horizontalLineAfter := idx == len(data)
line := []string{
hash,
fmt.Sprintf("%d", cnt),
}
lines = append(lines, display.NewLineData(horizontalLineAfter, line))
idx++
}

table, err := display.CreateTableString(header, lines)
if err != nil {
return "error creating p2p stats table: " + err.Error()
}

return table
}

func isLogTrace() bool {
return log.GetLevel() == logger.LogTrace
}

// IsInterfaceNil returns true if there is no value under the interface
func (debugger *equivalentMessagesDebugger) IsInterfaceNil() bool {
return debugger == nil
}
79 changes: 79 additions & 0 deletions consensus/spos/debug/equivalentMessagesDebugger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package debug

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNewEquivalentMessagesDebugger_IsInterfaceNil(t *testing.T) {
t.Parallel()

var debugger *equivalentMessagesDebugger
require.True(t, debugger.IsInterfaceNil())

debugger = NewEquivalentMessagesDebugger()
require.False(t, debugger.IsInterfaceNil())
}

func TestEquivalentMessagesDebugger_DisplayEquivalentMessagesStatistics(t *testing.T) {
t.Parallel()

t.Run("log level not trace should early exit", func(t *testing.T) {
t.Parallel()

defer func() {
r := recover()
if r != nil {
require.Fail(t, "should have not panicked")
}
}()

debugger := NewEquivalentMessagesDebugger()
debugger.DisplayEquivalentMessagesStatistics(func() map[string]uint64 {
return make(map[string]uint64)
})
})
t.Run("nil get data handler should early exit", func(t *testing.T) {
t.Parallel()

defer func() {
r := recover()
if r != nil {
require.Fail(t, "should have not panicked")
}
}()

debugger := NewEquivalentMessagesDebugger()
debugger.shouldProcessDataFunc = func() bool {
return true
}

debugger.DisplayEquivalentMessagesStatistics(nil)
})
t.Run("should work", func(t *testing.T) {
t.Parallel()

defer func() {
r := recover()
if r != nil {
require.Fail(t, "should have not panicked")
}
}()

debugger := NewEquivalentMessagesDebugger()
debugger.shouldProcessDataFunc = func() bool {
return true
}

debugger.DisplayEquivalentMessagesStatistics(func() map[string]uint64 {
return map[string]uint64{
"hash1": 1,
"hash2": 2,
"hash3": 3,
"hash4": 4,
}
})

})
}
6 changes: 6 additions & 0 deletions consensus/spos/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,9 @@ var ErrNilSigningHandler = errors.New("nil signing handler")

// ErrNilKeysHandler signals that a nil keys handler was provided
var ErrNilKeysHandler = errors.New("nil keys handler")

// ErrEquivalentMessageAlreadyReceived signals that an equivalent message has been already received
var ErrEquivalentMessageAlreadyReceived = errors.New("equivalent message already received")

// ErrNilEquivalentMessagesDebugger signals that a nil equivalent messages debugger has been provided
var ErrNilEquivalentMessagesDebugger = errors.New("nil equivalent messages debugger")
5 changes: 5 additions & 0 deletions consensus/spos/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ func (wrk *Worker) AppStatusHandler() core.AppStatusHandler {
return wrk.appStatusHandler
}

// GetEquivalentMessages -
func (wrk *Worker) GetEquivalentMessages() map[string]uint64 {
return wrk.getEquivalentMessages()
}

// CheckConsensusMessageValidity -
func (cmv *consensusMessageValidator) CheckConsensusMessageValidity(cnsMsg *consensus.Message, originator core.PeerID) error {
return cmv.checkConsensusMessageValidity(cnsMsg, originator)
Expand Down
8 changes: 8 additions & 0 deletions consensus/spos/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ type WorkerHandler interface {
ReceivedHeader(headerHandler data.HeaderHandler, headerHash []byte)
// ResetConsensusMessages resets at the start of each round all the previous consensus messages received
ResetConsensusMessages()
// RemoveAllEquivalentMessages removes all the equivalent messages
RemoveAllEquivalentMessages()
// IsInterfaceNil returns true if there is no value under the interface
IsInterfaceNil() bool
}
Expand Down Expand Up @@ -170,3 +172,9 @@ type PeerBlackListCacher interface {
Sweep()
IsInterfaceNil() bool
}

// EquivalentMessagesDebugger defines the specific debugger for equivalent messages
type EquivalentMessagesDebugger interface {
DisplayEquivalentMessagesStatistics(getDataHandler func() map[string]uint64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make the signature more straightforward. Why not declare the function like

DisplayEquivalentMessagesStatistics(messages map[string]uint64)

Copy link
Collaborator Author

@sstanculeanu sstanculeanu Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to avoid copying the map when we don't need the log.. with the function pointer approach, it is copied only when the debugger is active

IsInterfaceNil() bool
}
Loading