From bc88c616785f98eaa0784442c20ea09c8a5e081d Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 25 Mar 2021 15:39:52 -0400 Subject: [PATCH 01/14] open grandpa substream on connect --- dot/network/notifications.go | 7 +++++++ dot/network/service.go | 29 +++++++++++++++++++++++++++++ lib/grandpa/message_handler.go | 1 + lib/grandpa/network.go | 1 + 4 files changed, 38 insertions(+) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index bfdff68df9..a7144cab84 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -102,6 +102,13 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, "peer", stream.Conn().RemotePeer(), ) + if info.protocolID == "/paritytech/grandpa/1" { + logger.Info("received message on grandpa sub-protocol", "protocol", info.protocolID, + "message", msg, + "peer", stream.Conn().RemotePeer(), + ) + } + if msg.IsHandshake() { hs, ok := msg.(Handshake) if !ok { diff --git a/dot/network/service.go b/dot/network/service.go index 6ce82d8be0..528191c0ff 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -315,6 +315,35 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) { logger.Trace("failed to send block announce handshake to peer", "peer", peer, "error", err) } } + + grandpaInfo, has := s.notificationsProtocols[ConsensusMsgType] + if !has { + // this shouldn't happen + logger.Warn("consensus protocol is not yet registered!") + return + } + + // open grandpa substream + hs, err = grandpaInfo.getHandshake() + if err != nil { + logger.Warn("failed to get handshake", "protocol", grandpaInfo.protocolID, "error", err) + return + } + + grandpaInfo.mapMu.RLock() + defer grandpaInfo.mapMu.RUnlock() + + if hsData, has := grandpaInfo.handshakeData[peer]; !has || !hsData.received { + grandpaInfo.handshakeData[peer] = &handshakeData{ + validated: false, + } + + logger.Debug("sending handshake", "protocol", grandpaInfo.protocolID, "peer", peer, "message", hs) + err = s.host.send(peer, grandpaInfo.protocolID, hs) + if err != nil { + logger.Debug("failed to send grandpa handshake to peer", "peer", peer, "error", err) + } + } } func (s *Service) beginDiscovery() error { diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 8d4db4eed9..3d9318f67b 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -60,6 +60,7 @@ func (h *MessageHandler) handleMessage(msg *ConsensusMessage) (*ConsensusMessage h.grandpa.in <- vm } case finalizationType: + h.grandpa.logger.Info("got finalization message!!", "msg", msg) if fm, ok := m.(*FinalizationMessage); ok { return h.handleFinalizationMessage(fm) } diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index 5b1b444638..8317020565 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -130,6 +130,7 @@ func (s *Service) handleNetworkMessage(_ peer.ID, msg NotificationsMessage) erro return ErrInvalidMessageType } + s.logger.Info("got grandpa message!", "msg", msg) resp, err := s.messageHandler.handleMessage(cm) if err != nil { return err From 046f26abbda3c8c276b20bf4318e157498939b87 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 25 Mar 2021 19:01:03 -0400 Subject: [PATCH 02/14] add logs --- dot/network/service.go | 2 +- lib/grandpa/network.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/network/service.go b/dot/network/service.go index 528191c0ff..204b548e2f 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -333,7 +333,7 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) { grandpaInfo.mapMu.RLock() defer grandpaInfo.mapMu.RUnlock() - if hsData, has := grandpaInfo.handshakeData[peer]; !has || !hsData.received { + if hsData, has := grandpaInfo.handshakeData[peer]; !has || !hsData.received { //nolint grandpaInfo.handshakeData[peer] = &handshakeData{ validated: false, } diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index 8317020565..442ee95f96 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -104,7 +104,7 @@ func (s *Service) registerProtocol() error { func (s *Service) getHandshake() (Handshake, error) { return &GrandpaHandshake{ - Roles: 0, // TODO: are roles returned? + Roles: 1, // TODO: don't hard-code this }, nil } From 50c2d47bd578c21de1d7b9c851d8bb620bb8f64b Mon Sep 17 00:00:00 2001 From: noot Date: Fri, 26 Mar 2021 13:53:45 -0400 Subject: [PATCH 03/14] add logs --- dot/network/block_announce.go | 1 + dot/network/notifications.go | 1 + dot/network/service.go | 4 ++-- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dot/network/block_announce.go b/dot/network/block_announce.go index 541e7dbf3a..61b5d2fc55 100644 --- a/dot/network/block_announce.go +++ b/dot/network/block_announce.go @@ -256,6 +256,7 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err // with its peer and send a BlockRequest message func (s *Service) handleBlockAnnounceMessage(peer peer.ID, msg NotificationsMessage) error { if an, ok := msg.(*BlockAnnounceMessage); ok { + logger.Info("received BlockAnnounce!", "msg", an) s.syncQueue.handleBlockAnnounce(an, peer) err := s.syncer.HandleBlockAnnounce(an) if err != nil { diff --git a/dot/network/notifications.go b/dot/network/notifications.go index a7144cab84..36c7497e54 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -77,6 +77,7 @@ func createDecoder(info *notificationsProtocol, handshakeDecoder HandshakeDecode } // otherwise, assume we are receiving the Message + logger.Debug("decoding message", "protocol", info.protocolID) return messageDecoder(in) } } diff --git a/dot/network/service.go b/dot/network/service.go index 204b548e2f..2a5106ea1b 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -212,7 +212,7 @@ func (s *Service) Start() error { } // since this opens block announce streams, it should happen after the protocol is registered - s.host.h.Network().SetConnHandler(s.handleConn) + //s.host.h.Network().SetConnHandler(s.handleConn) // log listening addresses to console for _, addr := range s.host.multiaddrs() { @@ -566,7 +566,7 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder // decode message based on message type msg, err := decoder(msgBytes[:tot], peer) if err != nil { - logger.Trace("Failed to decode message from peer", "peer", peer, "err", err) + logger.Info("failed to decode message from peer", "protocol", stream.Protocol(), "err", err, "msg bytes", msgBytes[:tot]) continue } From 761eab24147e8a228f08f636e5972166d8e3aac1 Mon Sep 17 00:00:00 2001 From: noot Date: Mon, 29 Mar 2021 10:37:33 -0400 Subject: [PATCH 04/14] log update --- dot/network/notifications.go | 20 ++++++++++---------- dot/network/service.go | 8 ++++---- dot/network/sync.go | 8 ++++---- dot/network/transaction.go | 35 ++++++++++++++++++----------------- dot/network/utils.go | 4 ++-- 5 files changed, 38 insertions(+), 37 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index da72884add..265f374c89 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -86,7 +86,6 @@ func createDecoder(info *notificationsProtocol, handshakeDecoder HandshakeDecode } // otherwise, assume we are receiving the Message - logger.Debug("decoding message", "protocol", info.protocolID) return messageDecoder(in) } } @@ -107,17 +106,17 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, return errors.New("message is not NotificationsMessage") } - logger.Trace("received message on notifications sub-protocol", "protocol", info.protocolID, + logger.Debug("received message on notifications sub-protocol", "protocol", info.protocolID, "message", msg, "peer", stream.Conn().RemotePeer(), ) - if info.protocolID == "/paritytech/grandpa/1" { - logger.Info("received message on grandpa sub-protocol", "protocol", info.protocolID, - "message", msg, - "peer", stream.Conn().RemotePeer(), - ) - } + // if info.protocolID == "/paritytech/grandpa/1" { + // logger.Info("received message on grandpa sub-protocol", "protocol", info.protocolID, + // "message", msg, + // "peer", stream.Conn().RemotePeer(), + // ) + // } if msg.IsHandshake() { hs, ok := msg.(Handshake) @@ -152,6 +151,7 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, logger.Debug("failed to get handshake", "protocol", info.protocolID, "error", err) return err } + //resp := hs err = s.host.send(peer, info.protocolID, resp) if err != nil { @@ -159,12 +159,12 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, _ = stream.Conn().Close() return err } - logger.Trace("receiver: sent handshake", "protocol", info.protocolID, "peer", peer) + logger.Debug("receiver: sent handshake", "protocol", info.protocolID, "peer", peer) } // if we are the initiator and haven't received the handshake already, validate it if hsData, has := info.getHandshakeData(peer); has && !hsData.validated { - logger.Trace("sender: validating handshake") + logger.Debug("sender: validating handshake") err := handshakeValidator(peer, hs) if err != nil { logger.Trace("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err) diff --git a/dot/network/service.go b/dot/network/service.go index 1f2b4125ce..1af7535dbd 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -333,10 +333,10 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) { grandpaInfo.mapMu.RLock() defer grandpaInfo.mapMu.RUnlock() - if hsData, has := grandpaInfo.handshakeData[peer]; !has || !hsData.received { //nolint - grandpaInfo.handshakeData[peer] = &handshakeData{ + if hsData, has := grandpaInfo.getHandshakeData(peer); !has || !hsData.received { //nolint + grandpaInfo.handshakeData.Store(peer, &handshakeData{ validated: false, - } + }) logger.Debug("sending handshake", "protocol", grandpaInfo.protocolID, "peer", peer, "message", hs) err = s.host.send(peer, grandpaInfo.protocolID, hs) @@ -566,7 +566,7 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder // decode message based on message type msg, err := decoder(msgBytes[:tot], peer) if err != nil { - logger.Info("failed to decode message from peer", "protocol", stream.Protocol(), "err", err, "msg bytes", msgBytes[:tot]) + logger.Info("failed to decode message from peer", "protocol", stream.Protocol(), "err", err, "msg bytes", fmt.Sprintf("0x%x", msgBytes[:tot])) continue } diff --git a/dot/network/sync.go b/dot/network/sync.go index 70d822d577..921ce04491 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -525,7 +525,7 @@ func (q *syncQueue) trySync(req *syncRequest) { return } - logger.Debug("beginning to send out request", "start", req.req.StartingBlock.Value()) + logger.Trace("beginning to send out request", "start", req.req.StartingBlock.Value()) if len(req.to) != 0 { resp, err := q.syncWithPeer(req.to, req.req) if err == nil { @@ -535,11 +535,11 @@ func (q *syncQueue) trySync(req *syncRequest) { } } - logger.Debug("failed to sync with peer", "peer", req.to, "error", err) + logger.Trace("failed to sync with peer", "peer", req.to, "error", err) q.updatePeerScore(req.to, -1) } - logger.Debug("trying peers in prioritized order...") + logger.Trace("trying peers in prioritized order...") syncPeers := q.getSortedPeers() for _, peer := range syncPeers { @@ -563,7 +563,7 @@ func (q *syncQueue) trySync(req *syncRequest) { } } - logger.Debug("failed to sync with any peer :(") + logger.Trace("failed to sync with any peer :(") if req.req.StartingBlock.IsUint64() { q.requestData.Store(req.req.StartingBlock.Uint64(), requestData{ sent: true, diff --git a/dot/network/transaction.go b/dot/network/transaction.go index 5eb1f7fa97..f19cb455b9 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -102,7 +102,7 @@ func (tm *TransactionMessage) IsHandshake() bool { } type transactionHandshake struct { - Roles byte + //Roles byte } // SubProtocol returns the transactions sub-protocol @@ -112,22 +112,22 @@ func (hs *transactionHandshake) SubProtocol() string { // String formats a transactionHandshake as a string func (hs *transactionHandshake) String() string { - return fmt.Sprintf("transactionHandshake Roles=%d", - hs.Roles) + return fmt.Sprintf("transactionHandshake") // Roles=%d", hs.Roles) } // Encode encodes a transactionHandshake message using SCALE func (hs *transactionHandshake) Encode() ([]byte, error) { - return scale.Encode(hs) + //return scale.Encode(hs) + return []byte{}, nil } // Decode the message into a transactionHandshake func (hs *transactionHandshake) Decode(in []byte) error { - msg, err := scale.Decode(in, hs) - if err != nil { - return err - } - hs.Roles = msg.(*transactionHandshake).Roles + // msg, err := scale.Decode(in, hs) + // if err != nil { + // return err + // } + // hs.Roles = msg.(*transactionHandshake).Roles return nil } @@ -148,18 +148,19 @@ func (hs *transactionHandshake) IsHandshake() bool { func (s *Service) getTransactionHandshake() (Handshake, error) { return &transactionHandshake{ - Roles: s.cfg.Roles, + //Roles: s.cfg.Roles, }, nil } func decodeTransactionHandshake(in []byte) (Handshake, error) { - if len(in) < 1 { - return nil, errors.New("invalid handshake") - } - - return &transactionHandshake{ - Roles: in[0], - }, nil + // if len(in) < 1 { + // return nil, errors.New("invalid handshake") + // } + + // return &transactionHandshake{ + // Roles: in[0], + // }, nil + return &transactionHandshake{}, nil } func validateTransactionHandshake(_ peer.ID, _ Handshake) error { diff --git a/dot/network/utils.go b/dot/network/utils.go index 49d3a9c054..62e9c53c00 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -185,11 +185,11 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { if err == io.EOF { return 0, err } else if err != nil { - return 0, err // TODO: read bytes read from readLEB128ToUint64 + return 0, err // TODO: return bytes read from readLEB128ToUint64 } if length == 0 { - return 0, err // TODO: read bytes read from readLEB128ToUint64 + return 0, err // TODO: return bytes read from readLEB128ToUint64 } // TODO: check if length > len(buf), if so probably log.Crit From 5f976d9e22cb34898494d0fa0b17b14f001cdda0 Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 30 Mar 2021 12:17:29 -0400 Subject: [PATCH 05/14] begin to update syncer to request justifications --- dot/network/block_announce.go | 16 ++-- dot/network/host.go | 8 +- dot/network/notifications.go | 15 ++- dot/network/service.go | 7 +- dot/network/sync.go | 175 +++++++++++++++++++++++++++------- dot/sync/syncer.go | 5 + 6 files changed, 167 insertions(+), 59 deletions(-) diff --git a/dot/network/block_announce.go b/dot/network/block_announce.go index 61f80db0c5..29b7592bda 100644 --- a/dot/network/block_announce.go +++ b/dot/network/block_announce.go @@ -212,14 +212,6 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err return errors.New("genesis hash mismatch") } - // if peer has higher best block than us, begin syncing - latestHeader, err := s.blockState.BestBlockHeader() - if err != nil { - return err - } - - bestBlockNum := big.NewInt(int64(bhs.BestBlockNumber)) - np, ok := s.notificationsProtocols[BlockAnnounceMsgType] if !ok { // this should never happen. @@ -239,6 +231,14 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err data.handshake = hs + // if peer has higher best block than us, begin syncing + latestHeader, err := s.blockState.BestBlockHeader() + if err != nil { + return err + } + + bestBlockNum := big.NewInt(int64(bhs.BestBlockNumber)) + // check if peer block number is greater than host block number if latestHeader.Number.Cmp(bestBlockNum) >= 0 { return nil diff --git a/dot/network/host.go b/dot/network/host.go index 787b0c0e96..79594760e4 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -237,7 +237,7 @@ func (h *host) bootstrap() { // peer (gets the already opened outbound message stream or opens a new one). func (h *host) send(p peer.ID, pid protocol.ID, msg Message) (err error) { // get outbound stream for given peer - s := h.getStream(p, pid) + s := h.getOutboundStream(p, pid) // check if stream needs to be opened if s == nil { @@ -286,10 +286,10 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error { return err } -// getStream returns the outbound message stream for the given peer or returns +// getOutboundStream returns the outbound message stream for the given peer or returns // nil if no outbound message stream exists. For each peer, each host opens an // outbound message stream and writes to the same stream until closed or reset. -func (h *host) getStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Stream) { +func (h *host) getOutboundStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Stream) { conns := h.h.Network().ConnsToPeer(p) // loop through connections (only one for now) @@ -310,7 +310,7 @@ func (h *host) getStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Strea // closeStream closes a stream open to the peer with the given sub-protocol, if it exists. func (h *host) closeStream(p peer.ID, pid protocol.ID) { - stream := h.getStream(p, pid) + stream := h.getOutboundStream(p, pid) if stream != nil { _ = stream.Close() } diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 265f374c89..57e2be8400 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -106,18 +106,11 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, return errors.New("message is not NotificationsMessage") } - logger.Debug("received message on notifications sub-protocol", "protocol", info.protocolID, + logger.Trace("received message on notifications sub-protocol", "protocol", info.protocolID, "message", msg, "peer", stream.Conn().RemotePeer(), ) - // if info.protocolID == "/paritytech/grandpa/1" { - // logger.Info("received message on grandpa sub-protocol", "protocol", info.protocolID, - // "message", msg, - // "peer", stream.Conn().RemotePeer(), - // ) - // } - if msg.IsHandshake() { hs, ok := msg.(Handshake) if !ok { @@ -151,7 +144,6 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, logger.Debug("failed to get handshake", "protocol", info.protocolID, "error", err) return err } - //resp := hs err = s.host.send(peer, info.protocolID, resp) if err != nil { @@ -194,6 +186,11 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, return nil } + logger.Debug("received message on notifications sub-protocol", "protocol", info.protocolID, + "message", msg, + "peer", stream.Conn().RemotePeer(), + ) + err := messageHandler(peer, msg) if err != nil { return err diff --git a/dot/network/service.go b/dot/network/service.go index 1af7535dbd..ffa9526da1 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -457,6 +457,9 @@ func (s *Service) RegisterNotificationsProtocol(sub protocol.ID, info := s.notificationsProtocols[messageID] + decoder := createDecoder(info, handshakeDecoder, messageDecoder) + handlerWithValidate := s.createNotificationsMessageHandler(info, handshakeValidator, messageHandler) + s.host.registerStreamHandlerWithOverwrite(sub, overwriteProtocol, func(stream libp2pnetwork.Stream) { logger.Trace("received stream", "sub-protocol", sub) conn := stream.Conn() @@ -466,10 +469,6 @@ func (s *Service) RegisterNotificationsProtocol(sub protocol.ID, } p := conn.RemotePeer() - - decoder := createDecoder(info, handshakeDecoder, messageDecoder) - handlerWithValidate := s.createNotificationsMessageHandler(info, handshakeValidator, messageHandler) - s.readStream(stream, p, decoder, handlerWithValidate) }) diff --git a/dot/network/sync.go b/dot/network/sync.go index 921ce04491..d11e6c4e6b 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -115,8 +115,9 @@ type syncQueue struct { cancel context.CancelFunc peerScore *sync.Map // map[peer.ID]int; peers we have successfully synced from before -> their score; score increases on successful response - requestData *sync.Map // map[uint64]requestData; map of start # of request -> requestData - requestCh chan *syncRequest + requestData *sync.Map // map[uint64]requestData; map of start # of request -> requestData + justificationRequestData *sync.Map // map[uint64]requestData; map of requests of justifications -> requestData + requestCh chan *syncRequest responses []*types.BlockData responseCh chan []*types.BlockData @@ -133,22 +134,24 @@ func newSyncQueue(s *Service) *syncQueue { ctx, cancel := context.WithCancel(s.ctx) return &syncQueue{ - s: s, - ctx: ctx, - cancel: cancel, - peerScore: new(sync.Map), - requestData: new(sync.Map), - requestCh: make(chan *syncRequest, blockRequestBufferSize), - responses: []*types.BlockData{}, - responseCh: make(chan []*types.BlockData), - benchmarker: newSyncBenchmarker(), - buf: make([]byte, maxBlockResponseSize), + s: s, + ctx: ctx, + cancel: cancel, + peerScore: new(sync.Map), + requestData: new(sync.Map), + justificationRequestData: new(sync.Map), + requestCh: make(chan *syncRequest, blockRequestBufferSize), + responses: []*types.BlockData{}, + responseCh: make(chan []*types.BlockData), + benchmarker: newSyncBenchmarker(), + buf: make([]byte, maxBlockResponseSize), } } func (q *syncQueue) start() { go q.handleResponseQueue() go q.syncAtHead() + go q.finalizeAtHead() go q.processBlockRequests() go q.processBlockResponses() @@ -201,6 +204,40 @@ func (q *syncQueue) syncAtHead() { } } +func (q *syncQueue) finalizeAtHead() { + prev, err := q.s.blockState.GetFinalizedHeader(0, 0) + if err != nil { + logger.Error("failed to get latest finalized block header", "error", err) + return + } + + for { + select { + // sleep for average block time TODO: make this configurable from slot duration + case <-time.After(time.Second * 6): + case <-q.ctx.Done(): + return + } + + curr, err := q.s.blockState.GetFinalizedHeader(0, 0) + if err != nil { + continue + } + + if curr.Number.Cmp(prev.Number) > 0 { + continue + } + + // no new blocks have been finalized, request block justifications from peers + head, err := q.s.blockState.BestBlockNumber() + if err != nil { + continue + } + + q.pushJustificationRequest(head.Uint64() - uint64(blockRequestSize)) + } +} + func (q *syncQueue) handleResponseQueue() { for { select { @@ -451,31 +488,64 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { } } -func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error { - if len(resp.BlockData) == 0 { - return fmt.Errorf("response data is empty") +func (q *syncQueue) pushJustificationRequest(start uint64) { + req := createBlockRequest(int64(start), blockRequestSize) + req.RequestedData = RequestedDataJustification + + if d, has := q.justificationRequestData.Load(start); has { + data := d.(requestData) + // we haven't sent the request out yet, or we've already gotten the response + if !data.sent || data.sent && data.received { + return + } } - start, end, err := resp.getStartAndEnd() - if err != nil { - // update peer's score - q.updatePeerScore(pid, -1) - return fmt.Errorf("response doesn't contain block headers") + logger.Debug("pushing justification request to queue", "start", start) + + q.justificationRequestData.Store(start, requestData{ + received: false, + }) + + q.requestCh <- &syncRequest{ + req: req, + to: "", } +} - if resp.BlockData[0].Body == nil || !resp.BlockData[0].Body.Exists() { - // update peer's score - q.updatePeerScore(pid, -1) - return fmt.Errorf("response doesn't contain block bodies") +var errEmptyResponseData = fmt.Errorf("response data is empty") + +func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error { + if len(resp.BlockData) == 0 { + return errEmptyResponseData } - // update peer's score - q.updatePeerScore(pid, 1) - q.requestData.Store(uint64(start), requestData{ - sent: true, - received: true, - from: pid, - }) + var ( + start, end int64 + err error + ) + + if resp.BlockData[0].Header.Exists() { + start, end, err = resp.getStartAndEnd() + if err != nil { + // update peer's score + q.updatePeerScore(pid, -1) + return fmt.Errorf("response doesn't contain block headers") + } + + if resp.BlockData[0].Body == nil || !resp.BlockData[0].Body.Exists() { + // update peer's score + q.updatePeerScore(pid, -1) + return fmt.Errorf("response doesn't contain block bodies") + } + + // update peer's score TODO: store justifications by hash? + q.updatePeerScore(pid, 1) + q.requestData.Store(uint64(start), requestData{ + sent: true, + received: true, + from: pid, + }) + } q.responseLock.Lock() defer q.responseLock.Unlock() @@ -513,6 +583,13 @@ func (q *syncQueue) processBlockRequests() { } } + if d, has := q.justificationRequestData.Load(req.req.StartingBlock.Uint64()); has { + data := d.(requestData) + if data.sent && data.received { + continue + } + } + q.trySync(req) case <-q.ctx.Done(): return @@ -556,19 +633,25 @@ func (q *syncQueue) trySync(req *syncRequest) { } err = q.pushResponse(resp, peer.pid) - if err != nil { - logger.Trace("failed to push block response", "error", err) + if err != nil && err != errEmptyResponseData { + logger.Debug("failed to push block response", "error", err) } else { return } } logger.Trace("failed to sync with any peer :(") - if req.req.StartingBlock.IsUint64() { + if req.req.StartingBlock.IsUint64() && (req.req.RequestedData&RequestedDataHeader) == 1 { q.requestData.Store(req.req.StartingBlock.Uint64(), requestData{ sent: true, received: false, }) + } else if req.req.StartingBlock.IsUint64() && (req.req.RequestedData&RequestedDataHeader) == 0 { + // TODO: potentially store by hash instead? + q.justificationRequestData.Store(req.req.StartingBlock.Uint64(), requestData{ + sent: true, + received: false, + }) } req.to = "" @@ -613,6 +696,11 @@ func (q *syncQueue) processBlockResponses() { for { select { case data := <-q.responseCh: + if !data[0].Header.Exists() { + q.handleBlockJustification(data) + continue + } + q.handleBlockData(data) case <-q.ctx.Done(): return @@ -620,6 +708,20 @@ func (q *syncQueue) processBlockResponses() { } } +func (q *syncQueue) handleBlockJustification(data []*types.BlockData) { + logger.Debug("sending justification data to syncer", "start", data[0].Hash, "end", data[len(data)-1].Hash) + + _, err := q.s.syncer.ProcessBlockData(data) + if err != nil { + //q.handleBlockDataFailure(idx, err, data) + logger.Warn("failed to handle block justifications", "error", err) + return + } + + logger.Debug("finished processing justification data", "start", data[0].Hash, "end", data[len(data)-1].Hash) + // TODO: update peer's score +} + func (q *syncQueue) handleBlockData(data []*types.BlockData) { bestNum, err := q.s.blockState.BestBlockNumber() if err != nil { @@ -821,6 +923,11 @@ func sortRequests(reqs []*syncRequest) []*syncRequest { func sortResponses(resps []*types.BlockData) []*types.BlockData { sort.Slice(resps, func(i, j int) bool { + // TODO: store justification-only BlockData separately? + if resps[i].Number() == nil || resps[j].Number() == nil { + return false + } + return resps[i].Number().Int64() < resps[j].Number().Int64() }) diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index c5a96dd23c..4a23bb5f43 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -187,6 +187,11 @@ func (s *Service) ProcessBlockData(data []*types.BlockData) (int, error) { logger.Debug("failed to add block to blocktree", "hash", bd.Hash, "error", err) } + if bd.Justification != nil && bd.Justification.Exists() { + logger.Debug("handling Justification...", "number", header.Number, "hash", bd.Hash) + s.handleJustification(header, bd.Justification.Value()) + } + continue } From c6452c851a7a376639d0af99e0780d5ef4bfe40a Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 30 Mar 2021 22:15:01 -0400 Subject: [PATCH 06/14] justification requesting at head functional --- dot/network/service.go | 2 +- dot/network/state.go | 1 + dot/network/sync.go | 165 ++++++++++++------------------ dot/network/sync_justification.go | 81 +++++++++++++++ dot/state/block.go | 10 ++ dot/sync/syncer.go | 6 +- 6 files changed, 160 insertions(+), 105 deletions(-) create mode 100644 dot/network/sync_justification.go diff --git a/dot/network/service.go b/dot/network/service.go index ffa9526da1..18607c74d5 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -212,7 +212,7 @@ func (s *Service) Start() error { } // since this opens block announce streams, it should happen after the protocol is registered - //s.host.h.Network().SetConnHandler(s.handleConn) + s.host.h.Network().SetConnHandler(s.handleConn) // log listening addresses to console for _, addr := range s.host.multiaddrs() { diff --git a/dot/network/state.go b/dot/network/state.go index 77725bd7de..72a4831e35 100644 --- a/dot/network/state.go +++ b/dot/network/state.go @@ -30,6 +30,7 @@ type BlockState interface { GenesisHash() common.Hash HasBlockBody(common.Hash) (bool, error) GetFinalizedHeader(round, setID uint64) (*types.Header, error) + GetHashByNumber(num *big.Int) (common.Hash, error) } // Syncer is implemented by the syncing service diff --git a/dot/network/sync.go b/dot/network/sync.go index d11e6c4e6b..d5349c63b0 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -116,12 +116,13 @@ type syncQueue struct { peerScore *sync.Map // map[peer.ID]int; peers we have successfully synced from before -> their score; score increases on successful response requestData *sync.Map // map[uint64]requestData; map of start # of request -> requestData - justificationRequestData *sync.Map // map[uint64]requestData; map of requests of justifications -> requestData + justificationRequestData *sync.Map // map[common.Hash]requestData; map of requests of justifications -> requestData requestCh chan *syncRequest - responses []*types.BlockData - responseCh chan []*types.BlockData - responseLock sync.RWMutex + justificationResponses []*types.BlockData + responses []*types.BlockData + responseCh chan []*types.BlockData + responseLock sync.RWMutex buf []byte goal int64 // goal block number we are trying to sync to @@ -142,6 +143,7 @@ func newSyncQueue(s *Service) *syncQueue { justificationRequestData: new(sync.Map), requestCh: make(chan *syncRequest, blockRequestBufferSize), responses: []*types.BlockData{}, + justificationResponses: []*types.BlockData{}, responseCh: make(chan []*types.BlockData), benchmarker: newSyncBenchmarker(), buf: make([]byte, maxBlockResponseSize), @@ -183,7 +185,7 @@ func (q *syncQueue) syncAtHead() { } // we aren't at the head yet, sleep - if curr.Number.Int64() < q.goal { + if curr.Number.Int64() < q.goal && curr.Number.Cmp(prev.Number) > 0 { prev = curr continue } @@ -204,40 +206,6 @@ func (q *syncQueue) syncAtHead() { } } -func (q *syncQueue) finalizeAtHead() { - prev, err := q.s.blockState.GetFinalizedHeader(0, 0) - if err != nil { - logger.Error("failed to get latest finalized block header", "error", err) - return - } - - for { - select { - // sleep for average block time TODO: make this configurable from slot duration - case <-time.After(time.Second * 6): - case <-q.ctx.Done(): - return - } - - curr, err := q.s.blockState.GetFinalizedHeader(0, 0) - if err != nil { - continue - } - - if curr.Number.Cmp(prev.Number) > 0 { - continue - } - - // no new blocks have been finalized, request block justifications from peers - head, err := q.s.blockState.BestBlockNumber() - if err != nil { - continue - } - - q.pushJustificationRequest(head.Uint64() - uint64(blockRequestSize)) - } -} - func (q *syncQueue) handleResponseQueue() { for { select { @@ -432,14 +400,14 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { start := best.Int64() + 1 req := createBlockRequest(start, 0) - if d, has := q.requestData.Load(start); has { - data := d.(requestData) - // we haven't sent the request out yet, or we've already gotten the response - if !data.sent || data.sent && data.received { - logger.Debug("ignoring request, already received data", "start", start) - return - } - } + // if d, has := q.requestData.Load(start); has { + // data := d.(requestData) + // // we haven't sent the request out yet, or we've already gotten the response + // if !data.sent || data.sent && data.received { + // logger.Debug("ignoring request, already received data", "start", start) + // return + // } + // } logger.Debug("pushing request to queue", "start", start) @@ -488,35 +456,33 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { } } -func (q *syncQueue) pushJustificationRequest(start uint64) { - req := createBlockRequest(int64(start), blockRequestSize) - req.RequestedData = RequestedDataJustification +var ( + errEmptyResponseData = fmt.Errorf("response data is empty") + errEmptyJustificationData = fmt.Errorf("no justifications in response data") +) - if d, has := q.justificationRequestData.Load(start); has { - data := d.(requestData) - // we haven't sent the request out yet, or we've already gotten the response - if !data.sent || data.sent && data.received { - return - } +func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error { + if len(resp.BlockData) == 0 { + return errEmptyResponseData } - logger.Debug("pushing justification request to queue", "start", start) - - q.justificationRequestData.Store(start, requestData{ - received: false, - }) - - q.requestCh <- &syncRequest{ - req: req, - to: "", - } -} + if _, has := q.justificationRequestData.Load(resp.BlockData[0].Hash); has && !resp.BlockData[0].Header.Exists() { + numJustifications := 0 + for _, bd := range resp.BlockData { + if bd.Justification.Exists() { + q.justificationResponses = append(q.justificationResponses, bd) + numJustifications++ + } + } -var errEmptyResponseData = fmt.Errorf("response data is empty") + if numJustifications == 0 { + return errEmptyJustificationData + } -func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error { - if len(resp.BlockData) == 0 { - return errEmptyResponseData + logger.Info("pushed justification data to queue", "hash", resp.BlockData[0].Hash) + q.responseCh <- q.justificationResponses + q.justificationResponses = []*types.BlockData{} + return nil } var ( @@ -524,29 +490,27 @@ func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error err error ) - if resp.BlockData[0].Header.Exists() { - start, end, err = resp.getStartAndEnd() - if err != nil { - // update peer's score - q.updatePeerScore(pid, -1) - return fmt.Errorf("response doesn't contain block headers") - } - - if resp.BlockData[0].Body == nil || !resp.BlockData[0].Body.Exists() { - // update peer's score - q.updatePeerScore(pid, -1) - return fmt.Errorf("response doesn't contain block bodies") - } + start, end, err = resp.getStartAndEnd() + if err != nil { + // update peer's score + q.updatePeerScore(pid, -1) + return fmt.Errorf("response doesn't contain block headers") + } - // update peer's score TODO: store justifications by hash? - q.updatePeerScore(pid, 1) - q.requestData.Store(uint64(start), requestData{ - sent: true, - received: true, - from: pid, - }) + if resp.BlockData[0].Body == nil || !resp.BlockData[0].Body.Exists() { + // update peer's score + q.updatePeerScore(pid, -1) + return fmt.Errorf("response doesn't contain block bodies") } + // update peer's score + q.updatePeerScore(pid, 1) + q.requestData.Store(uint64(start), requestData{ + sent: true, + received: true, + from: pid, + }) + q.responseLock.Lock() defer q.responseLock.Unlock() @@ -583,12 +547,12 @@ func (q *syncQueue) processBlockRequests() { } } - if d, has := q.justificationRequestData.Load(req.req.StartingBlock.Uint64()); has { - data := d.(requestData) - if data.sent && data.received { - continue - } - } + // if d, has := q.justificationRequestData.Load(req.req.StartingBlock.Hash()); has { + // data := d.(requestData) + // if data.sent && data.received { + // continue + // } + // } q.trySync(req) case <-q.ctx.Done(): @@ -633,7 +597,7 @@ func (q *syncQueue) trySync(req *syncRequest) { } err = q.pushResponse(resp, peer.pid) - if err != nil && err != errEmptyResponseData { + if err != nil && err != errEmptyResponseData && err != errEmptyJustificationData { logger.Debug("failed to push block response", "error", err) } else { return @@ -646,9 +610,8 @@ func (q *syncQueue) trySync(req *syncRequest) { sent: true, received: false, }) - } else if req.req.StartingBlock.IsUint64() && (req.req.RequestedData&RequestedDataHeader) == 0 { - // TODO: potentially store by hash instead? - q.justificationRequestData.Store(req.req.StartingBlock.Uint64(), requestData{ + } else if req.req.StartingBlock.IsHash() && (req.req.RequestedData&RequestedDataHeader) == 0 { + q.justificationRequestData.Store(req.req.StartingBlock.Hash(), requestData{ sent: true, received: false, }) diff --git a/dot/network/sync_justification.go b/dot/network/sync_justification.go new file mode 100644 index 0000000000..557be84f8f --- /dev/null +++ b/dot/network/sync_justification.go @@ -0,0 +1,81 @@ +package network + +import ( + "math/big" + "time" +) + +func (q *syncQueue) finalizeAtHead() { + prev, err := q.s.blockState.GetFinalizedHeader(0, 0) + if err != nil { + logger.Error("failed to get latest finalized block header", "error", err) + return + } + + for { + select { + // sleep for average block time TODO: make this configurable from slot duration + case <-time.After(time.Second * 12): + case <-q.ctx.Done(): + return + } + + curr, err := q.s.blockState.GetFinalizedHeader(0, 0) + if err != nil { + continue + } + + logger.Debug("checking finalized blocks", "curr", curr.Number, "prev", prev.Number) + + if curr.Number.Cmp(prev.Number) > 0 { + prev = curr + continue + } + + // no new blocks have been finalized, request block justifications from peers + head, err := q.s.blockState.BestBlockNumber() + if err != nil { + prev = curr + continue + } + + prev = curr + + start := head.Uint64() - uint64(blockRequestSize) + if curr.Number.Uint64() > start { + start = curr.Number.Uint64() + 1 + } + + q.pushJustificationRequest(start) + } +} + +func (q *syncQueue) pushJustificationRequest(start uint64) { + startHash, err := q.s.blockState.GetHashByNumber(big.NewInt(int64(start))) + if err != nil { + logger.Error("failed to get hash for block w/ number", "number", start, "error", err) + return + } + + req := createBlockRequestWithHash(startHash, blockRequestSize) + req.RequestedData = RequestedDataJustification + + // if d, has := q.justificationRequestData.Load(startHash); has { + // data := d.(requestData) + // // we haven't sent the request out yet, or we've already gotten the response + // if !data.sent || data.sent && data.received { + // return + // } + // } + + logger.Debug("pushing justification request to queue", "start", start) + + q.justificationRequestData.Store(startHash, requestData{ + received: false, + }) + + q.requestCh <- &syncRequest{ + req: req, + to: "", + } +} diff --git a/dot/state/block.go b/dot/state/block.go index 7c2f33acd1..302d92fe5e 100644 --- a/dot/state/block.go +++ b/dot/state/block.go @@ -264,6 +264,16 @@ func (bs *BlockState) GetHeader(hash common.Hash) (*types.Header, error) { return result, err } +// GetHashByNumber returns the block hash given the number +func (bs *BlockState) GetHashByNumber(num *big.Int) (common.Hash, error) { + bh, err := bs.db.Get(headerHashKey(num.Uint64())) + if err != nil { + return common.Hash{}, fmt.Errorf("cannot get block %d: %s", num, err) + } + + return common.NewHash(bh), nil +} + // GetHeaderByNumber returns a block header given a number func (bs *BlockState) GetHeaderByNumber(num *big.Int) (*types.Header, error) { bh, err := bs.db.Get(headerHashKey(num.Uint64())) diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index 4a23bb5f43..f7b67bf7ec 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -162,7 +162,7 @@ func (s *Service) ProcessBlockData(data []*types.BlockData) (int, error) { } for i, bd := range data { - logger.Debug("starting processing of block", "hash", bd.Hash) + logger.Info("starting processing of block", "hash", bd.Hash, "justification", bd.Justification) err := s.blockState.CompareAndSetBlockData(bd) if err != nil { @@ -188,7 +188,7 @@ func (s *Service) ProcessBlockData(data []*types.BlockData) (int, error) { } if bd.Justification != nil && bd.Justification.Exists() { - logger.Debug("handling Justification...", "number", header.Number, "hash", bd.Hash) + logger.Info("handling Justification...", "number", header.Number, "hash", bd.Hash) s.handleJustification(header, bd.Justification.Value()) } @@ -256,7 +256,7 @@ func (s *Service) ProcessBlockData(data []*types.BlockData) (int, error) { logger.Debug("block processed", "hash", bd.Hash) } - if bd.Justification != nil && bd.Justification.Exists() { + if bd.Justification != nil && bd.Justification.Exists() && header != nil { logger.Debug("handling Justification...", "number", bd.Number(), "hash", bd.Hash) s.handleJustification(header, bd.Justification.Value()) } From c68f3b9e5ce0bd5eb4d1ef1e71398ae107768112 Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 31 Mar 2021 15:03:16 -0400 Subject: [PATCH 07/14] fix dot/network tests --- dot/network/block_announce.go | 1 - dot/network/host_test.go | 10 +++++----- dot/network/state_test.go | 4 ++++ dot/network/transaction_test.go | 4 +--- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/dot/network/block_announce.go b/dot/network/block_announce.go index 29b7592bda..05e1c9b71f 100644 --- a/dot/network/block_announce.go +++ b/dot/network/block_announce.go @@ -256,7 +256,6 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err // with its peer and send a BlockRequest message func (s *Service) handleBlockAnnounceMessage(peer peer.ID, msg NotificationsMessage) error { if an, ok := msg.(*BlockAnnounceMessage); ok { - logger.Info("received BlockAnnounce!", "msg", an) s.syncQueue.handleBlockAnnounce(an, peer) err := s.syncer.HandleBlockAnnounce(an) if err != nil { diff --git a/dot/network/host_test.go b/dot/network/host_test.go index 56430d94cb..aca5f2313e 100644 --- a/dot/network/host_test.go +++ b/dot/network/host_test.go @@ -269,7 +269,7 @@ func TestExistingStream(t *testing.T) { } require.NoError(t, err) - stream := nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID) + stream := nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID) require.Nil(t, stream, "node A should not have an outbound stream") // node A opens the stream to send the first message @@ -279,7 +279,7 @@ func TestExistingStream(t *testing.T) { time.Sleep(TestMessageTimeout) require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A") - stream = nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID) + stream = nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID) require.NotNil(t, stream, "node A should have an outbound stream") // node A uses the stream to send a second message @@ -287,7 +287,7 @@ func TestExistingStream(t *testing.T) { require.NoError(t, err) require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A") - stream = nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID) + stream = nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID) require.NotNil(t, stream, "node B should have an outbound stream") // node B opens the stream to send the first message @@ -297,7 +297,7 @@ func TestExistingStream(t *testing.T) { time.Sleep(TestMessageTimeout) require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B") - stream = nodeB.host.getStream(nodeA.host.id(), nodeB.host.protocolID) + stream = nodeB.host.getOutboundStream(nodeA.host.id(), nodeB.host.protocolID) require.NotNil(t, stream, "node B should have an outbound stream") // node B uses the stream to send a second message @@ -305,7 +305,7 @@ func TestExistingStream(t *testing.T) { require.NoError(t, err) require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B") - stream = nodeB.host.getStream(nodeA.host.id(), nodeB.host.protocolID) + stream = nodeB.host.getOutboundStream(nodeA.host.id(), nodeB.host.protocolID) require.NotNil(t, stream, "node B should have an outbound stream") } diff --git a/dot/network/state_test.go b/dot/network/state_test.go index abc41d7a62..a268b9914e 100644 --- a/dot/network/state_test.go +++ b/dot/network/state_test.go @@ -75,3 +75,7 @@ func (mbs *MockBlockState) HasBlockBody(common.Hash) (bool, error) { func (mbs *MockBlockState) GetFinalizedHeader(_, _ uint64) (*types.Header, error) { return mbs.BestBlockHeader() } + +func (mbs *MockBlockState) GetHashByNumber(_ *big.Int) (common.Hash, error) { + return common.Hash{}, nil +} diff --git a/dot/network/transaction_test.go b/dot/network/transaction_test.go index db47f69e4b..07876168d9 100644 --- a/dot/network/transaction_test.go +++ b/dot/network/transaction_test.go @@ -29,9 +29,7 @@ import ( ) func TestDecodeTransactionHandshake(t *testing.T) { - testHandshake := &transactionHandshake{ - Roles: 4, - } + testHandshake := &transactionHandshake{} enc, err := testHandshake.Encode() require.NoError(t, err) From 6aff7a0ff5b45c438936202ea55d05352cca8e9f Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 31 Mar 2021 15:16:41 -0400 Subject: [PATCH 08/14] cleanup --- dot/rpc/modules/system_test.go | 4 ++++ dot/sync/syncer.go | 4 ++-- lib/grandpa/message_handler.go | 1 - lib/grandpa/network.go | 1 - 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/dot/rpc/modules/system_test.go b/dot/rpc/modules/system_test.go index 08e3512faf..c1f982530c 100644 --- a/dot/rpc/modules/system_test.go +++ b/dot/rpc/modules/system_test.go @@ -87,6 +87,10 @@ func (s *mockBlockState) GetFinalizedHeader(_, _ uint64) (*types.Header, error) return s.BestBlockHeader() } +func (s *mockBlockState) GetHashByNumber(_ *big.Int) (common.Hash, error) { + return common.Hash{}, nil +} + type mockTransactionHandler struct{} func (h *mockTransactionHandler) HandleTransactionMessage(_ *network.TransactionMessage) error { diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index f7b67bf7ec..b04aadb864 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -162,7 +162,7 @@ func (s *Service) ProcessBlockData(data []*types.BlockData) (int, error) { } for i, bd := range data { - logger.Info("starting processing of block", "hash", bd.Hash, "justification", bd.Justification) + logger.Debug("starting processing of block", "hash", bd.Hash) err := s.blockState.CompareAndSetBlockData(bd) if err != nil { @@ -188,7 +188,7 @@ func (s *Service) ProcessBlockData(data []*types.BlockData) (int, error) { } if bd.Justification != nil && bd.Justification.Exists() { - logger.Info("handling Justification...", "number", header.Number, "hash", bd.Hash) + logger.Debug("handling Justification...", "number", header.Number, "hash", bd.Hash) s.handleJustification(header, bd.Justification.Value()) } diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 3d9318f67b..8d4db4eed9 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -60,7 +60,6 @@ func (h *MessageHandler) handleMessage(msg *ConsensusMessage) (*ConsensusMessage h.grandpa.in <- vm } case finalizationType: - h.grandpa.logger.Info("got finalization message!!", "msg", msg) if fm, ok := m.(*FinalizationMessage); ok { return h.handleFinalizationMessage(fm) } diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index 442ee95f96..f596282b5f 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -130,7 +130,6 @@ func (s *Service) handleNetworkMessage(_ peer.ID, msg NotificationsMessage) erro return ErrInvalidMessageType } - s.logger.Info("got grandpa message!", "msg", msg) resp, err := s.messageHandler.handleMessage(cm) if err != nil { return err From d73806ad2a454337b5e1f4842fa8afaadb989380 Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 31 Mar 2021 15:59:27 -0400 Subject: [PATCH 09/14] cleanup, lint --- dot/network/notifications.go | 10 ++--- dot/network/service.go | 33 +-------------- dot/network/sync.go | 70 +++++++++++++++++-------------- dot/network/sync_justification.go | 9 ---- dot/network/transaction.go | 23 ++-------- 5 files changed, 49 insertions(+), 96 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 57e2be8400..aecdafc8d5 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -106,12 +106,12 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, return errors.New("message is not NotificationsMessage") } - logger.Trace("received message on notifications sub-protocol", "protocol", info.protocolID, - "message", msg, - "peer", stream.Conn().RemotePeer(), - ) - if msg.IsHandshake() { + logger.Trace("received handshake on notifications sub-protocol", "protocol", info.protocolID, + "message", msg, + "peer", stream.Conn().RemotePeer(), + ) + hs, ok := msg.(Handshake) if !ok { return errors.New("failed to convert message to Handshake") diff --git a/dot/network/service.go b/dot/network/service.go index 18607c74d5..6c454d611a 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -304,7 +304,7 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) { defer info.mapMu.RUnlock() peer := conn.RemotePeer() - if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { + if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { //nolint info.handshakeData.Store(peer, &handshakeData{ validated: false, }) @@ -315,35 +315,6 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) { logger.Trace("failed to send block announce handshake to peer", "peer", peer, "error", err) } } - - grandpaInfo, has := s.notificationsProtocols[ConsensusMsgType] - if !has { - // this shouldn't happen - logger.Warn("consensus protocol is not yet registered!") - return - } - - // open grandpa substream - hs, err = grandpaInfo.getHandshake() - if err != nil { - logger.Warn("failed to get handshake", "protocol", grandpaInfo.protocolID, "error", err) - return - } - - grandpaInfo.mapMu.RLock() - defer grandpaInfo.mapMu.RUnlock() - - if hsData, has := grandpaInfo.getHandshakeData(peer); !has || !hsData.received { //nolint - grandpaInfo.handshakeData.Store(peer, &handshakeData{ - validated: false, - }) - - logger.Debug("sending handshake", "protocol", grandpaInfo.protocolID, "peer", peer, "message", hs) - err = s.host.send(peer, grandpaInfo.protocolID, hs) - if err != nil { - logger.Debug("failed to send grandpa handshake to peer", "peer", peer, "error", err) - } - } } func (s *Service) beginDiscovery() error { @@ -565,7 +536,7 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder // decode message based on message type msg, err := decoder(msgBytes[:tot], peer) if err != nil { - logger.Info("failed to decode message from peer", "protocol", stream.Protocol(), "err", err, "msg bytes", fmt.Sprintf("0x%x", msgBytes[:tot])) + logger.Trace("failed to decode message from peer", "protocol", stream.Protocol(), "err", err) continue } diff --git a/dot/network/sync.go b/dot/network/sync.go index d5349c63b0..8cd3f6fb23 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -93,6 +93,11 @@ var ( protectedPeerThreshold int = 7 ) +var ( + errEmptyResponseData = fmt.Errorf("response data is empty") + errEmptyJustificationData = fmt.Errorf("no justifications in response data") +) + type syncPeer struct { pid peer.ID score int @@ -119,10 +124,9 @@ type syncQueue struct { justificationRequestData *sync.Map // map[common.Hash]requestData; map of requests of justifications -> requestData requestCh chan *syncRequest - justificationResponses []*types.BlockData - responses []*types.BlockData - responseCh chan []*types.BlockData - responseLock sync.RWMutex + responses []*types.BlockData + responseCh chan []*types.BlockData + responseLock sync.RWMutex buf []byte goal int64 // goal block number we are trying to sync to @@ -143,7 +147,6 @@ func newSyncQueue(s *Service) *syncQueue { justificationRequestData: new(sync.Map), requestCh: make(chan *syncRequest, blockRequestBufferSize), responses: []*types.BlockData{}, - justificationResponses: []*types.BlockData{}, responseCh: make(chan []*types.BlockData), benchmarker: newSyncBenchmarker(), buf: make([]byte, maxBlockResponseSize), @@ -456,21 +459,19 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { } } -var ( - errEmptyResponseData = fmt.Errorf("response data is empty") - errEmptyJustificationData = fmt.Errorf("no justifications in response data") -) - func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error { if len(resp.BlockData) == 0 { return errEmptyResponseData } - if _, has := q.justificationRequestData.Load(resp.BlockData[0].Hash); has && !resp.BlockData[0].Header.Exists() { + startHash := resp.BlockData[0].Hash + if _, has := q.justificationRequestData.Load(startHash); has && !resp.BlockData[0].Header.Exists() { numJustifications := 0 + justificationResponses := []*types.BlockData{} + for _, bd := range resp.BlockData { if bd.Justification.Exists() { - q.justificationResponses = append(q.justificationResponses, bd) + justificationResponses = append(justificationResponses, bd) numJustifications++ } } @@ -479,9 +480,15 @@ func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error return errEmptyJustificationData } - logger.Info("pushed justification data to queue", "hash", resp.BlockData[0].Hash) - q.responseCh <- q.justificationResponses - q.justificationResponses = []*types.BlockData{} + q.updatePeerScore(pid, 1) + q.justificationRequestData.Store(startHash, requestData{ + sent: true, + received: true, + from: pid, + }) + + logger.Info("pushed justification data to queue", "hash", startHash) + q.responseCh <- justificationResponses return nil } @@ -547,13 +554,6 @@ func (q *syncQueue) processBlockRequests() { } } - // if d, has := q.justificationRequestData.Load(req.req.StartingBlock.Hash()); has { - // data := d.(requestData) - // if data.sent && data.received { - // continue - // } - // } - q.trySync(req) case <-q.ctx.Done(): return @@ -659,6 +659,7 @@ func (q *syncQueue) processBlockResponses() { for { select { case data := <-q.responseCh: + // if the response doesn't contain a header, then it's a justification-only response if !data[0].Header.Exists() { q.handleBlockJustification(data) continue @@ -672,17 +673,29 @@ func (q *syncQueue) processBlockResponses() { } func (q *syncQueue) handleBlockJustification(data []*types.BlockData) { - logger.Debug("sending justification data to syncer", "start", data[0].Hash, "end", data[len(data)-1].Hash) + startHash, endHash := data[0].Hash, data[len(data)-1].Hash + logger.Debug("sending justification data to syncer", "start", startHash, "end", endHash) _, err := q.s.syncer.ProcessBlockData(data) if err != nil { - //q.handleBlockDataFailure(idx, err, data) logger.Warn("failed to handle block justifications", "error", err) return } - logger.Debug("finished processing justification data", "start", data[0].Hash, "end", data[len(data)-1].Hash) - // TODO: update peer's score + logger.Debug("finished processing justification data", "start", startHash, "end", endHash) + + // update peer's score + var from peer.ID + + d, ok := q.justificationRequestData.Load(startHash) + if !ok { + // this shouldn't happen + logger.Debug("can't find request data for response!", "start", startHash) + } else { + from = d.(requestData).from + q.updatePeerScore(from, 2) + q.requestData.Delete(startHash) + } } func (q *syncQueue) handleBlockData(data []*types.BlockData) { @@ -886,11 +899,6 @@ func sortRequests(reqs []*syncRequest) []*syncRequest { func sortResponses(resps []*types.BlockData) []*types.BlockData { sort.Slice(resps, func(i, j int) bool { - // TODO: store justification-only BlockData separately? - if resps[i].Number() == nil || resps[j].Number() == nil { - return false - } - return resps[i].Number().Int64() < resps[j].Number().Int64() }) diff --git a/dot/network/sync_justification.go b/dot/network/sync_justification.go index 557be84f8f..0a2449b72f 100644 --- a/dot/network/sync_justification.go +++ b/dot/network/sync_justification.go @@ -60,16 +60,7 @@ func (q *syncQueue) pushJustificationRequest(start uint64) { req := createBlockRequestWithHash(startHash, blockRequestSize) req.RequestedData = RequestedDataJustification - // if d, has := q.justificationRequestData.Load(startHash); has { - // data := d.(requestData) - // // we haven't sent the request out yet, or we've already gotten the response - // if !data.sent || data.sent && data.received { - // return - // } - // } - logger.Debug("pushing justification request to queue", "start", start) - q.justificationRequestData.Store(startHash, requestData{ received: false, }) diff --git a/dot/network/transaction.go b/dot/network/transaction.go index f19cb455b9..5131bc23ef 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -101,9 +101,7 @@ func (tm *TransactionMessage) IsHandshake() bool { return false } -type transactionHandshake struct { - //Roles byte -} +type transactionHandshake struct{} // SubProtocol returns the transactions sub-protocol func (hs *transactionHandshake) SubProtocol() string { @@ -112,22 +110,16 @@ func (hs *transactionHandshake) SubProtocol() string { // String formats a transactionHandshake as a string func (hs *transactionHandshake) String() string { - return fmt.Sprintf("transactionHandshake") // Roles=%d", hs.Roles) + return "transactionHandshake" } // Encode encodes a transactionHandshake message using SCALE func (hs *transactionHandshake) Encode() ([]byte, error) { - //return scale.Encode(hs) return []byte{}, nil } // Decode the message into a transactionHandshake func (hs *transactionHandshake) Decode(in []byte) error { - // msg, err := scale.Decode(in, hs) - // if err != nil { - // return err - // } - // hs.Roles = msg.(*transactionHandshake).Roles return nil } @@ -147,19 +139,10 @@ func (hs *transactionHandshake) IsHandshake() bool { } func (s *Service) getTransactionHandshake() (Handshake, error) { - return &transactionHandshake{ - //Roles: s.cfg.Roles, - }, nil + return &transactionHandshake{}, nil } func decodeTransactionHandshake(in []byte) (Handshake, error) { - // if len(in) < 1 { - // return nil, errors.New("invalid handshake") - // } - - // return &transactionHandshake{ - // Roles: in[0], - // }, nil return &transactionHandshake{}, nil } From 9c71b6f24404acb3f41337a17432689602a2351a Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 31 Mar 2021 16:22:35 -0400 Subject: [PATCH 10/14] fix pushRequest --- dot/network/sync.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dot/network/sync.go b/dot/network/sync.go index 8cd3f6fb23..9cd52cf3b2 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -403,14 +403,14 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { start := best.Int64() + 1 req := createBlockRequest(start, 0) - // if d, has := q.requestData.Load(start); has { - // data := d.(requestData) - // // we haven't sent the request out yet, or we've already gotten the response - // if !data.sent || data.sent && data.received { - // logger.Debug("ignoring request, already received data", "start", start) - // return - // } - // } + if d, has := q.requestData.Load(start); has { + data := d.(requestData) + // we haven't sent the request out yet, or we've already gotten the response + if !data.sent { + logger.Debug("ignoring request, haven't sent out previous", "start", start) + return + } + } logger.Debug("pushing request to queue", "start", start) From d56d266a0c113d34051c33ae9e8376086f6b488d Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 31 Mar 2021 19:59:03 -0400 Subject: [PATCH 11/14] add unit tests --- dot/network/sync.go | 40 +++--- dot/network/sync_justification.go | 22 +++- dot/network/sync_justification_test.go | 168 +++++++++++++++++++++++++ dot/state/block_test.go | 26 ++++ 4 files changed, 229 insertions(+), 27 deletions(-) create mode 100644 dot/network/sync_justification_test.go diff --git a/dot/network/sync.go b/dot/network/sync.go index 9cd52cf3b2..f542932f1f 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -85,12 +85,15 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er } var ( - blockRequestSize uint32 = 128 - blockRequestBufferSize int = 6 + blockRequestSize uint32 = 128 + blockRequestBufferSize int = 6 + blockResponseBufferSize int = 6 maxBlockResponseSize uint64 = 1024 * 1024 * 4 // 4mb badPeerThreshold int = -2 protectedPeerThreshold int = 7 + + defaultSlotDuration = time.Duration(time.Second * 6) ) var ( @@ -115,10 +118,11 @@ type requestData struct { } type syncQueue struct { - s *Service - ctx context.Context - cancel context.CancelFunc - peerScore *sync.Map // map[peer.ID]int; peers we have successfully synced from before -> their score; score increases on successful response + s *Service + slotDuration time.Duration + ctx context.Context + cancel context.CancelFunc + peerScore *sync.Map // map[peer.ID]int; peers we have successfully synced from before -> their score; score increases on successful response requestData *sync.Map // map[uint64]requestData; map of start # of request -> requestData justificationRequestData *sync.Map // map[common.Hash]requestData; map of requests of justifications -> requestData @@ -140,6 +144,7 @@ func newSyncQueue(s *Service) *syncQueue { return &syncQueue{ s: s, + slotDuration: defaultSlotDuration, ctx: ctx, cancel: cancel, peerScore: new(sync.Map), @@ -147,7 +152,7 @@ func newSyncQueue(s *Service) *syncQueue { justificationRequestData: new(sync.Map), requestCh: make(chan *syncRequest, blockRequestBufferSize), responses: []*types.BlockData{}, - responseCh: make(chan []*types.BlockData), + responseCh: make(chan []*types.BlockData, blockResponseBufferSize), benchmarker: newSyncBenchmarker(), buf: make([]byte, maxBlockResponseSize), } @@ -177,7 +182,7 @@ func (q *syncQueue) syncAtHead() { for { select { // sleep for average block time TODO: make this configurable from slot duration - case <-time.After(time.Second * 6): + case <-time.After(q.slotDuration): case <-q.ctx.Done(): return } @@ -403,17 +408,7 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { start := best.Int64() + 1 req := createBlockRequest(start, 0) - if d, has := q.requestData.Load(start); has { - data := d.(requestData) - // we haven't sent the request out yet, or we've already gotten the response - if !data.sent { - logger.Debug("ignoring request, haven't sent out previous", "start", start) - return - } - } - logger.Debug("pushing request to queue", "start", start) - q.requestData.Store(start, requestData{ received: false, }) @@ -492,12 +487,7 @@ func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error return nil } - var ( - start, end int64 - err error - ) - - start, end, err = resp.getStartAndEnd() + start, end, err := resp.getStartAndEnd() if err != nil { // update peer's score q.updatePeerScore(pid, -1) @@ -694,7 +684,7 @@ func (q *syncQueue) handleBlockJustification(data []*types.BlockData) { } else { from = d.(requestData).from q.updatePeerScore(from, 2) - q.requestData.Delete(startHash) + q.justificationRequestData.Delete(startHash) } } diff --git a/dot/network/sync_justification.go b/dot/network/sync_justification.go index 0a2449b72f..36b94d41df 100644 --- a/dot/network/sync_justification.go +++ b/dot/network/sync_justification.go @@ -1,3 +1,19 @@ +// Copyright 2019 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + package network import ( @@ -15,7 +31,7 @@ func (q *syncQueue) finalizeAtHead() { for { select { // sleep for average block time TODO: make this configurable from slot duration - case <-time.After(time.Second * 12): + case <-time.After(q.slotDuration * 2): case <-q.ctx.Done(): return } @@ -44,6 +60,8 @@ func (q *syncQueue) finalizeAtHead() { start := head.Uint64() - uint64(blockRequestSize) if curr.Number.Uint64() > start { start = curr.Number.Uint64() + 1 + } else if int(start) < int(blockRequestSize) { + start = 1 } q.pushJustificationRequest(start) @@ -60,7 +78,7 @@ func (q *syncQueue) pushJustificationRequest(start uint64) { req := createBlockRequestWithHash(startHash, blockRequestSize) req.RequestedData = RequestedDataJustification - logger.Debug("pushing justification request to queue", "start", start) + logger.Debug("pushing justification request to queue", "start", start, "hash", startHash) q.justificationRequestData.Store(startHash, requestData{ received: false, }) diff --git a/dot/network/sync_justification_test.go b/dot/network/sync_justification_test.go new file mode 100644 index 0000000000..03e72b359a --- /dev/null +++ b/dot/network/sync_justification_test.go @@ -0,0 +1,168 @@ +// Copyright 2019 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package network + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/ChainSafe/gossamer/dot/types" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/common/optional" + "github.com/ChainSafe/gossamer/lib/utils" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/require" +) + +func TestSyncQueue_PushResponse_Justification(t *testing.T) { + basePath := utils.NewTestBasePath(t, "nodeA") + config := &Config{ + BasePath: basePath, + Port: 7001, + RandSeed: 1, + NoBootstrap: true, + NoMDNS: true, + } + + s := createTestService(t, config) + s.syncQueue.stop() + time.Sleep(time.Second) + + peerID := peer.ID("noot") + msg := &BlockResponseMessage{ + BlockData: []*types.BlockData{}, + } + + for i := 0; i < int(blockRequestSize); i++ { + msg.BlockData = append(msg.BlockData, &types.BlockData{ + Hash: common.Hash{byte(i)}, + Justification: optional.NewBytes(true, []byte{1}), + }) + } + + s.syncQueue.justificationRequestData.Store(common.Hash{byte(0)}, requestData{}) + err := s.syncQueue.pushResponse(msg, peerID) + require.NoError(t, err) + require.Equal(t, 1, len(s.syncQueue.responseCh)) + data, ok := s.syncQueue.justificationRequestData.Load(common.Hash{byte(0)}) + require.True(t, ok) + require.Equal(t, requestData{ + sent: true, + received: true, + from: peerID, + }, data) +} + +func TestSyncQueue_PushResponse_EmptyJustification(t *testing.T) { + basePath := utils.NewTestBasePath(t, "nodeA") + config := &Config{ + BasePath: basePath, + Port: 7001, + RandSeed: 1, + NoBootstrap: true, + NoMDNS: true, + } + + s := createTestService(t, config) + s.syncQueue.stop() + time.Sleep(time.Second) + + peerID := peer.ID("noot") + msg := &BlockResponseMessage{ + BlockData: []*types.BlockData{}, + } + + for i := 0; i < int(blockRequestSize); i++ { + msg.BlockData = append(msg.BlockData, &types.BlockData{ + Hash: common.Hash{byte(i)}, + Justification: optional.NewBytes(false, nil), + }) + } + + s.syncQueue.justificationRequestData.Store(common.Hash{byte(0)}, &requestData{}) + err := s.syncQueue.pushResponse(msg, peerID) + require.Equal(t, errEmptyJustificationData, err) +} + +func TestSyncQueue_processBlockResponses_Justification(t *testing.T) { + q := newTestSyncQueue(t) + q.stop() + time.Sleep(time.Second) + q.ctx = context.Background() + + go func() { + q.responseCh <- []*types.BlockData{ + { + Hash: common.Hash{byte(0)}, + Header: optional.NewHeader(false, nil), + Body: optional.NewBody(false, nil), + Receipt: optional.NewBytes(false, nil), + MessageQueue: optional.NewBytes(false, nil), + Justification: optional.NewBytes(true, []byte{1}), + }, + } + }() + + peerID := peer.ID("noot") + q.justificationRequestData.Store(common.Hash{byte(0)}, requestData{ + from: peerID, + }) + + go q.processBlockResponses() + time.Sleep(time.Second) + + _, has := q.justificationRequestData.Load(common.Hash{byte(0)}) + require.False(t, has) + + score, ok := q.peerScore.Load(peerID) + require.True(t, ok) + require.Equal(t, 2, score) +} + +func TestSyncQueue_finalizeAtHead(t *testing.T) { + q := newTestSyncQueue(t) + q.stop() + time.Sleep(time.Second) + q.ctx = context.Background() + q.slotDuration = time.Duration(time.Millisecond * 500) + + hash, err := q.s.blockState.GetHashByNumber(big.NewInt(1)) + require.NoError(t, err) + + go q.finalizeAtHead() + time.Sleep(time.Second * 2) + + data, has := q.justificationRequestData.Load(hash) + require.True(t, has) + require.Equal(t, requestData{}, data) + + expected := createBlockRequestWithHash(hash, blockRequestSize) + expected.RequestedData = RequestedDataJustification + + select { + case req := <-q.requestCh: + require.Equal(t, &syncRequest{ + req: expected, + to: "", + }, req) + case <-time.After(time.Second): + t.Fatal("did not receive request") + } +} diff --git a/dot/state/block_test.go b/dot/state/block_test.go index 844e092d33..0a54db54a4 100644 --- a/dot/state/block_test.go +++ b/dot/state/block_test.go @@ -365,3 +365,29 @@ func TestFinalization_DeleteBlock(t *testing.T) { // } } } + +func TestGetHashByNumber(t *testing.T) { + bs := newTestBlockState(t, testGenesisHeader) + + res, err := bs.GetHashByNumber(big.NewInt(0)) + require.NoError(t, err) + require.Equal(t, bs.genesisHash, res) + + header := &types.Header{ + Number: big.NewInt(1), + Digest: types.Digest{}, + ParentHash: testGenesisHeader.Hash(), + } + + block := &types.Block{ + Header: header, + Body: &types.Body{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + } + + err = bs.AddBlock(block) + require.NoError(t, err) + + res, err = bs.GetHashByNumber(big.NewInt(1)) + require.NoError(t, err) + require.Equal(t, header.Hash(), res) +} From 40cd906a4fa90d072d5ca39002ec580d3da8ac9d Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 31 Mar 2021 20:01:17 -0400 Subject: [PATCH 12/14] lint --- dot/network/sync.go | 2 +- dot/network/sync_justification_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dot/network/sync.go b/dot/network/sync.go index f542932f1f..9cbc5d21e0 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -93,7 +93,7 @@ var ( badPeerThreshold int = -2 protectedPeerThreshold int = 7 - defaultSlotDuration = time.Duration(time.Second * 6) + defaultSlotDuration = time.Second * 6 ) var ( diff --git a/dot/network/sync_justification_test.go b/dot/network/sync_justification_test.go index 03e72b359a..00d854471a 100644 --- a/dot/network/sync_justification_test.go +++ b/dot/network/sync_justification_test.go @@ -141,13 +141,13 @@ func TestSyncQueue_finalizeAtHead(t *testing.T) { q.stop() time.Sleep(time.Second) q.ctx = context.Background() - q.slotDuration = time.Duration(time.Millisecond * 500) + q.slotDuration = time.Millisecond * 200 hash, err := q.s.blockState.GetHashByNumber(big.NewInt(1)) require.NoError(t, err) go q.finalizeAtHead() - time.Sleep(time.Second * 2) + time.Sleep(time.Second) data, has := q.justificationRequestData.Load(hash) require.True(t, has) From 2b227fc13789afc85cbb1791a5e485bca72c0d9f Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 31 Mar 2021 20:02:29 -0400 Subject: [PATCH 13/14] update some logs --- dot/network/notifications.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index aecdafc8d5..98760d6e18 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -151,12 +151,12 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, _ = stream.Conn().Close() return err } - logger.Debug("receiver: sent handshake", "protocol", info.protocolID, "peer", peer) + logger.Trace("receiver: sent handshake", "protocol", info.protocolID, "peer", peer) } // if we are the initiator and haven't received the handshake already, validate it if hsData, has := info.getHandshakeData(peer); has && !hsData.validated { - logger.Debug("sender: validating handshake") + logger.Trace("sender: validating handshake") err := handshakeValidator(peer, hs) if err != nil { logger.Trace("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err) From fd59122f756ea7c5d1500ba8900a9fa73092aa29 Mon Sep 17 00:00:00 2001 From: noot Date: Mon, 5 Apr 2021 11:08:34 -0400 Subject: [PATCH 14/14] address comments --- dot/network/sync_justification.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dot/network/sync_justification.go b/dot/network/sync_justification.go index 36b94d41df..c5310cc144 100644 --- a/dot/network/sync_justification.go +++ b/dot/network/sync_justification.go @@ -48,15 +48,14 @@ func (q *syncQueue) finalizeAtHead() { continue } + prev = curr + // no new blocks have been finalized, request block justifications from peers head, err := q.s.blockState.BestBlockNumber() if err != nil { - prev = curr continue } - prev = curr - start := head.Uint64() - uint64(blockRequestSize) if curr.Number.Uint64() > start { start = curr.Number.Uint64() + 1