Skip to content
4 changes: 2 additions & 2 deletions consensus/broadcast/shardChainMessenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func createInterceptorContainer() process.InterceptorsContainer {
return &testscommon.InterceptorsContainerStub{
GetCalled: func(topic string) (process.Interceptor, error) {
return &testscommon.InterceptorStub{
ProcessReceivedMessageCalled: func(message p2p.MessageP2P) error {
return nil
ProcessReceivedMessageCalled: func(message p2p.MessageP2P) ([]byte, error) {
return nil, nil
},
}, nil
},
Expand Down
2 changes: 1 addition & 1 deletion consensus/spos/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type WorkerHandler interface {
// RemoveAllReceivedMessagesCalls removes all the functions handlers
RemoveAllReceivedMessagesCalls()
// ProcessReceivedMessage method redirects the received message to the channel which should handle it
ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID, source p2p.MessageHandler) error
ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID, source p2p.MessageHandler) ([]byte, error)
// Extend does an extension for the subround with subroundId
Extend(subroundId int)
// GetConsensusStateChangedChannel gets the channel for the consensusStateChanged
Expand Down
22 changes: 11 additions & 11 deletions consensus/spos/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,29 +379,29 @@ func (wrk *Worker) getCleanedList(cnsDataList []*consensus.Message) []*consensus
}

// ProcessReceivedMessage method redirects the received message to the channel which should handle it
func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID, _ p2p.MessageHandler) error {
func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID, _ p2p.MessageHandler) ([]byte, error) {
if check.IfNil(message) {
return ErrNilMessage
return nil, ErrNilMessage
}
if message.Data() == nil {
return ErrNilDataToProcess
return nil, ErrNilDataToProcess
}
if len(message.Signature()) == 0 {
return ErrNilSignatureOnP2PMessage
return nil, ErrNilSignatureOnP2PMessage
}

isPeerBlacklisted := wrk.peerBlacklistHandler.IsPeerBlacklisted(fromConnectedPeer)
if isPeerBlacklisted {
log.Debug("received message from blacklisted peer",
"peer", fromConnectedPeer.Pretty(),
)
return ErrBlacklistedConsensusPeer
return nil, ErrBlacklistedConsensusPeer
}

topic := GetConsensusTopicID(wrk.shardCoordinator)
err := wrk.antifloodHandler.CanProcessMessagesOnTopic(message.Peer(), topic, 1, uint64(len(message.Data())), message.SeqNo())
if err != nil {
return err
return nil, err
}

defer func() {
Expand All @@ -418,7 +418,7 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP
cnsMsg := &consensus.Message{}
err = wrk.marshalizer.Unmarshal(cnsMsg, message.Data())
if err != nil {
return err
return nil, err
}

wrk.consensusState.ResetRoundsWithoutReceivedMessages(cnsMsg.GetPubKey(), message.Peer())
Expand All @@ -433,7 +433,7 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP

err = wrk.checkValidityAndProcessFinalInfo(cnsMsg, message)
if err != nil {
return err
return nil, err
}

wrk.networkShardingCollector.UpdatePeerIDInfo(message.Peer(), cnsMsg.PubKey, wrk.shardCoordinator.SelfId())
Expand All @@ -450,7 +450,7 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP
if isMessageWithBlockHeader || isMessageWithBlockBodyAndHeader {
err = wrk.doJobOnMessageWithHeader(cnsMsg)
if err != nil {
return err
return nil, err
}
}

Expand All @@ -463,12 +463,12 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP
log.Trace("checkSelfState", "error", errNotCritical.Error())
// in this case should return nil but do not process the message
// nil error will mean that the interceptor will validate this message and broadcast it to the connected peers
return nil
return nil, nil
}

go wrk.executeReceivedMessages(cnsMsg)

return nil
return nil, nil
}

func (wrk *Worker) shouldBlacklistPeer(err error) bool {
Expand Down
Loading