From fd1024c7d45834df4c75014b2b8ac76d5ac537ae Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Mon, 31 May 2021 17:35:20 -0400 Subject: [PATCH 01/21] refactor telemetry messages to map format --- chain/dev/genesis.json | 9 +++++++-- chain/gssmr/genesis.json | 8 +++++++- dot/telemetry/telemetry.go | 14 ++------------ dot/telemetry/telemetry_test.go | 19 +++++++++++++++---- 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/chain/dev/genesis.json b/chain/dev/genesis.json index a5a8ca24e5..6b76f85b45 100644 --- a/chain/dev/genesis.json +++ b/chain/dev/genesis.json @@ -3,7 +3,12 @@ "id": "dev", "chainType": "Local", "bootNodes": [], - "telemetryEndpoints": null, + "telemetryEndpoints": [ + [ + "wss://telemetry.polkadot.io/submit/", + 0 + ] + ], "protocolId": "/gossamer/dev/0", "genesis": { "raw": { @@ -32,4 +37,4 @@ "forkBlocks": null, "badBlocks": null, "consensusEngine": "" -} \ No newline at end of file +} diff --git a/chain/gssmr/genesis.json b/chain/gssmr/genesis.json index 2e6f157d90..080619e282 100644 --- a/chain/gssmr/genesis.json +++ b/chain/gssmr/genesis.json @@ -3,6 +3,12 @@ "id": "gssmr", "chainType": "Local", "bootNodes": [], + "telemetryEndpoints": [ + [ + "wss://telemetry.polkadot.io/submit/", + 0 + ] + ], "protocolId": "/gossamer/gssmr/0", "genesis": { "raw": { @@ -40,4 +46,4 @@ "forkBlocks": null, "badBlocks": null, "consensusEngine": "" -} \ No newline at end of file +} diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index cd11d65207..74a61e45fc 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -134,19 +134,9 @@ func (h *Handler) startListening() { } } -type response struct { - ID int `json:"id"` - Payload map[string]interface{} `json:"payload"` - Timestamp time.Time `json:"ts"` -} - func msgToBytes(message Message) []byte { - res := response{ - ID: 1, // todo (ed) determine how this is used - Payload: message.values, - Timestamp: time.Now(), - } - resB, err := json.Marshal(res) + message.values["ts"] = time.Now() + resB, err := json.Marshal(message.values) if err != nil { return nil } diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index e2b7b9c05b..eec61fc9c2 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -2,6 +2,7 @@ package telemetry import ( "bytes" + "fmt" "log" "math/big" "net/http" @@ -89,10 +90,10 @@ func TestHandler_SendMulti(t *testing.T) { wg.Wait() - expected1 := []byte(`{"id":1,"payload":{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1},"ts":`) - expected2 := []byte(`{"id":1,"payload":{"best":"hash","height":2,"msg":"block.import","origin":"NetworkInitialSync"},"ts":`) - expected3 := []byte(`{"id":1,"payload":{"authority":false,"chain":"chain","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","version":"version"},"ts":`) - expected4 := []byte(`{"id":1,"payload":{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","txcount":2,"used_state_cache_size":1886357},"ts":`) // nolint + expected1 := []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`) + expected2 := []byte(`{"best":"hash","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`) + expected3 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`) + expected4 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`) // nolint expected := [][]byte{expected3, expected1, expected4, expected2} @@ -139,6 +140,16 @@ func TestListenerConcurrency(t *testing.T) { } } +// TestInfiniteListener starts loop that print out data received on websocket ws://localhost:8001/ +// this can be useful to see what data is sent to telemetry server +func TestInfiniteListener(t *testing.T) { + t.Skip() + resultCh = make(chan []byte) + for data := range resultCh { + fmt.Printf("Data %s\n", data) + } +} + func listen(w http.ResponseWriter, r *http.Request) { c, err := upgrader.Upgrade(w, r, nil) if err != nil { From d867d2f3c1a2f5352bf62c52334b7cf42085a72a Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Tue, 1 Jun 2021 16:47:39 -0400 Subject: [PATCH 02/21] add basic network state telemetry message --- dot/network/service.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/dot/network/service.go b/dot/network/service.go index effb2b7506..512db1e51b 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -301,6 +301,12 @@ func (s *Service) logPeerCount() { } } +type peerInfo struct { + Roles byte `json:"roles"` + BestHash string `json:"bestHash"` + BestNumber uint64 `json:"bestNumber"` +} + func (s *Service) publishNetworkTelemetry(done chan interface{}) { ticker := time.NewTicker(s.telemetryInterval) defer ticker.Stop() @@ -321,6 +327,35 @@ main: if err != nil { logger.Debug("problem sending system.interval telemetry message", "error", err) } + netState := make(map[string]interface{}) + netState["peerId"] = s.host.h.ID() + hostAddrs := []string{} + for _, v := range s.host.h.Addrs() { + hostAddrs = append(hostAddrs, v.String()) + } + netState["externalAddressess"] = hostAddrs + listAddrs := []string{} + for _, v := range s.host.h.Network().ListenAddresses() { + listAddrs = append(listAddrs, v.String()) + } + netState["listenedAddressess"] = listAddrs + + peers := make(map[string]interface{}) + for _, v := range s.Peers() { + p := &peerInfo{ + Roles: v.Roles, + BestHash: v.BestHash.String(), + BestNumber: v.BestNumber, + } + peers[v.PeerID] = *p + } + netState["connectedPeers"] = peers + err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( + telemetry.NewKeyValue("msg", "system.network_state"), + telemetry.NewKeyValue("state", netState))) + if err != nil { + logger.Debug("problem sending system.interval telemetry message", "error", err) + } } } } From 74abc1b5053418abd6ffd970359ae90e1d38d0a5 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Fri, 11 Jun 2021 16:53:02 -0400 Subject: [PATCH 03/21] refactor message sender to handle interface{} types --- dot/telemetry/telemetry.go | 50 ++++++++++++++---- dot/telemetry/telemetry_test.go | 92 +++++++++++++++++++-------------- 2 files changed, 93 insertions(+), 49 deletions(-) diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 74a61e45fc..4c9c2706a5 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -19,6 +19,8 @@ package telemetry import ( "encoding/json" "errors" + "fmt" + "reflect" "sync" "time" @@ -40,7 +42,7 @@ type Message struct { // Handler struct for holding telemetry related things type Handler struct { - msg chan Message + msg chan interface{} connections []*telemetryConnection log log.Logger } @@ -62,7 +64,7 @@ func GetInstance() *Handler { //nolint once.Do( func() { handlerInstance = &Handler{ - msg: make(chan Message, 256), + msg: make(chan interface{}, 256), log: log.New("pkg", "telemetry"), } go handlerInstance.startListening() @@ -108,9 +110,9 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { } // SendMessage sends Message to connected telemetry listeners -func (h *Handler) SendMessage(msg *Message) error { +func (h *Handler) SendMessage(msg interface{}) error { select { - case h.msg <- *msg: + case h.msg <- msg: case <-time.After(time.Second * 1): return errors.New("timeout sending message") @@ -124,7 +126,10 @@ func (h *Handler) startListening() { go func() { for _, conn := range h.connections { conn.Lock() - err := conn.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg)) + fmt.Printf("SENDING %s\n", msgToJSON(msg)) + //err := conn.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg)) + err := conn.wsconn.WriteMessage(websocket.TextMessage, msgToJSON(msg)) + if err != nil { h.log.Warn("issue while sending telemetry message", "error", err) } @@ -134,11 +139,38 @@ func (h *Handler) startListening() { } } -func msgToBytes(message Message) []byte { - message.values["ts"] = time.Now() - resB, err := json.Marshal(message.values) +func msgToJSON(message interface{}) []byte { + res, err := json.Marshal(message) + if err != nil { + return nil + } + + objMap := make(map[string]interface{}) + err = json.Unmarshal(res, &objMap) if err != nil { return nil } - return resB + + objMap["ts"] = time.Now() + typ := reflect.TypeOf(message) + f, _ := typ.FieldByName("Msg") + def := f.Tag.Get("default") + objMap["msg"] = def + + fullRes, err := json.Marshal(objMap) + if err != nil { + return nil + } + return fullRes } + +type SystemConnectedTM struct { + Authority bool `json:"authority"` + Chain string `json:"chain"` + GenesisHash string `json:"genesis_hash"` + Implementation string `json:"implementation"` + Msg string `default:"system.connected" json:"msg"` + Name string `json:"name"` + NetworkID string `json:"network_id"` + StartupTime string `json:"startup_time"` +} \ No newline at end of file diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index eec61fc9c2..bcab30dc9f 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -42,51 +42,63 @@ func TestMain(m *testing.M) { func TestHandler_SendMulti(t *testing.T) { var wg sync.WaitGroup - wg.Add(4) + wg.Add(1) resultCh = make(chan []byte) go func() { - GetInstance().SendMessage(NewTelemetryMessage( - NewKeyValue("authority", false), - NewKeyValue("chain", "chain"), - NewKeyValue("genesis_hash", "hash"), - NewKeyValue("implementation", "systemName"), - NewKeyValue("msg", "system.connected"), - NewKeyValue("name", "nodeName"), - NewKeyValue("network_id", "netID"), - NewKeyValue("startup_time", "startTime"), - NewKeyValue("version", "version"))) - wg.Done() - }() - - go func() { - GetInstance().SendMessage(NewTelemetryMessage( - NewKeyValue("best", "hash"), - NewKeyValue("height", big.NewInt(2)), - NewKeyValue("msg", "block.import"), - NewKeyValue("origin", "NetworkInitialSync"))) - wg.Done() - }() - - go func() { - GetInstance().SendMessage(NewTelemetryMessage( - NewKeyValue("bandwidth_download", 2), - NewKeyValue("bandwidth_upload", 3), - NewKeyValue("msg", "system.interval"), - NewKeyValue("peers", 1))) - wg.Done() - }() - - go func() { - GetInstance().SendMessage(NewTelemetryMessage( - NewKeyValue("best", "0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"), - NewKeyValue("finalized_hash", "0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2"), // nolint - NewKeyValue("finalized_height", 32256), NewKeyValue("height", 32375), // nolint - NewKeyValue("msg", "system.interval"), NewKeyValue("txcount", 2), - NewKeyValue("used_state_cache_size", 1886357))) + GetInstance().SendMessage(SystemConnectedTM{ + Authority: false, + Chain: "chain", + GenesisHash: "hash", + Implementation: "systemName", + Name: "nodeName", + NetworkID: "netID", + StartupTime: "startTime", + }) wg.Done() }() + //go func() { + // GetInstance().SendMessage(NewTelemetryMessage( + // NewKeyValue("authority", false), + // NewKeyValue("chain", "chain"), + // NewKeyValue("genesis_hash", "hash"), + // NewKeyValue("implementation", "systemName"), + // NewKeyValue("msg", "system.connected"), + // NewKeyValue("name", "nodeName"), + // NewKeyValue("network_id", "netID"), + // NewKeyValue("startup_time", "startTime"), + // NewKeyValue("version", "version"))) + // wg.Done() + //}() + + //go func() { + // GetInstance().SendMessage(NewTelemetryMessage( + // NewKeyValue("best", "hash"), + // NewKeyValue("height", big.NewInt(2)), + // NewKeyValue("msg", "block.import"), + // NewKeyValue("origin", "NetworkInitialSync"))) + // wg.Done() + //}() + + //go func() { + // GetInstance().SendMessage(NewTelemetryMessage( + // NewKeyValue("bandwidth_download", 2), + // NewKeyValue("bandwidth_upload", 3), + // NewKeyValue("msg", "system.interval"), + // NewKeyValue("peers", 1))) + // wg.Done() + //}() + // + //go func() { + // GetInstance().SendMessage(NewTelemetryMessage( + // NewKeyValue("best", "0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"), + // NewKeyValue("finalized_hash", "0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2"), // nolint + // NewKeyValue("finalized_height", 32256), NewKeyValue("height", 32375), // nolint + // NewKeyValue("msg", "system.interval"), NewKeyValue("txcount", 2), + // NewKeyValue("used_state_cache_size", 1886357))) + // wg.Done() + //}() wg.Wait() @@ -100,7 +112,7 @@ func TestHandler_SendMulti(t *testing.T) { var actual [][]byte for data := range resultCh { actual = append(actual, data) - if len(actual) == 4 { + if len(actual) == 1 { break } } From 6997fa1044e0ca05372677f8fe818bae62f06de0 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Mon, 14 Jun 2021 20:00:27 -0400 Subject: [PATCH 04/21] refactor telemetry messages to be structs --- dot/network/service.go | 38 ++++++---- dot/node.go | 22 +++--- dot/sync/syncer.go | 14 ++-- dot/telemetry/telemetry.go | 128 ++++++++++++++++++-------------- dot/telemetry/telemetry_test.go | 114 ++++++++++++++-------------- 5 files changed, 170 insertions(+), 146 deletions(-) diff --git a/dot/network/service.go b/dot/network/service.go index bdf69fa27d..3f01c81804 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -20,6 +20,7 @@ import ( "context" "errors" "io" + "math/big" "os" "sync" "time" @@ -322,11 +323,11 @@ main: case <-ticker.C: o := s.host.bwc.GetBandwidthTotals() - err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( - telemetry.NewKeyValue("bandwidth_download", o.RateIn), - telemetry.NewKeyValue("bandwidth_upload", o.RateOut), - telemetry.NewKeyValue("msg", "system.interval"), - telemetry.NewKeyValue("peers", s.host.peerCount()))) + err := telemetry.GetInstance().SendMessage(telemetry.SystemIntervalTM{ + BandwidthDownload: o.RateIn, + BandwidthUpload: o.RateOut, + Peers: s.host.peerCount(), + }) if err != nil { logger.Debug("problem sending system.interval telemetry message", "error", err) } @@ -353,9 +354,10 @@ main: peers[v.PeerID] = *p } netState["connectedPeers"] = peers - err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( - telemetry.NewKeyValue("msg", "system.network_state"), - telemetry.NewKeyValue("state", netState))) + + err = telemetry.GetInstance().SendMessage(telemetry.NetworkStateTM{ + State: netState, + }) if err != nil { logger.Debug("problem sending system.interval telemetry message", "error", err) } @@ -369,19 +371,23 @@ func (s *Service) sentBlockIntervalTelemetry() { if err != nil { continue } + bestHash := best.Hash() + finalized, err := s.blockState.GetFinalizedHeader(0, 0) //nolint if err != nil { continue } + finalizedHash := finalized.Hash() + + err = telemetry.GetInstance().SendMessage(telemetry.SystemIntervalTM{ + BestHash: &bestHash, + BestHeight: best.Number, + FinalisedHash: &finalizedHash, + FinalisedHeight: finalized.Number, + TxCount: big.NewInt(0), // todo (ed) determine where to get tx count + UsedStateCacheSize: big.NewInt(0), // todo (ed) determine where to get used_state_cache_size + }) - err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( - telemetry.NewKeyValue("best", best.Hash().String()), - telemetry.NewKeyValue("finalized_hash", finalized.Hash().String()), //nolint - telemetry.NewKeyValue("finalized_height", finalized.Number), //nolint - telemetry.NewKeyValue("height", best.Number), - telemetry.NewKeyValue("msg", "system.interval"), - telemetry.NewKeyValue("txcount", 0), // todo (ed) determine where to get tx count - telemetry.NewKeyValue("used_state_cache_size", 0))) // todo (ed) determine where to get used_state_cache_size if err != nil { logger.Debug("problem sending system.interval telemetry message", "error", err) } diff --git a/dot/node.go b/dot/node.go index 4b535d6adb..204eabb4ab 100644 --- a/dot/node.go +++ b/dot/node.go @@ -350,17 +350,17 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, } telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints) - - err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( - telemetry.NewKeyValue("authority", cfg.Core.GrandpaAuthority), - telemetry.NewKeyValue("chain", sysSrvc.ChainName()), - telemetry.NewKeyValue("genesis_hash", stateSrvc.Block.GenesisHash().String()), - telemetry.NewKeyValue("implementation", sysSrvc.SystemName()), - telemetry.NewKeyValue("msg", "system.connected"), - telemetry.NewKeyValue("name", cfg.Global.Name), - telemetry.NewKeyValue("network_id", networkSrvc.NetworkState().PeerID), - telemetry.NewKeyValue("startup_time", strconv.FormatInt(time.Now().UnixNano(), 10)), - telemetry.NewKeyValue("version", sysSrvc.SystemVersion()))) + genesisHash := stateSrvc.Block.GenesisHash() + err = telemetry.GetInstance().SendMessage(telemetry.SystemConnectedTM{ + Authority: cfg.Core.GrandpaAuthority, + Chain: sysSrvc.ChainName(), + GenesisHash: &genesisHash, + Implementation: sysSrvc.SystemName(), + Name: cfg.Global.Name, + NetworkID: networkSrvc.NetworkState().PeerID, + StartupTime: strconv.FormatInt(time.Now().UnixNano(), 10), + Version: sysSrvc.SystemVersion(), + }) if err != nil { logger.Debug("problem sending system.connected telemetry message", "err", err) } diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index e88f4a2fd8..40e70c2f77 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -363,12 +363,14 @@ func (s *Service) handleBlock(block *types.Block) error { return err } } else { - logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash()) - err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( - telemetry.NewKeyValue("best", block.Header.Hash().String()), - telemetry.NewKeyValue("height", block.Header.Number.Uint64()), - telemetry.NewKeyValue("msg", "block.import"), - telemetry.NewKeyValue("origin", "NetworkInitialSync"))) + blockHash := block.Header.Hash() + logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", blockHash) + + err := telemetry.GetInstance().SendMessage(telemetry.BlockImportTM{ + BestHash: &blockHash, + Height: block.Header.Number, + Origin: "NetworkInitialSync", + }) if err != nil { logger.Debug("problem sending block.import telemetry message", "error", err) } diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 4c9c2706a5..95c0fbf78e 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -19,11 +19,12 @@ package telemetry import ( "encoding/json" "errors" - "fmt" + "math/big" "reflect" "sync" "time" + "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" log "github.com/ChainSafe/log15" "github.com/gorilla/websocket" @@ -35,11 +36,6 @@ type telemetryConnection struct { sync.Mutex } -// Message struct to hold telemetry message data -type Message struct { - values map[string]interface{} -} - // Handler struct for holding telemetry related things type Handler struct { msg chan interface{} @@ -47,12 +43,6 @@ type Handler struct { log log.Logger } -// KeyValue object to hold key value pairs used in telemetry messages -type KeyValue struct { - key string - value interface{} -} - var ( once sync.Once handlerInstance *Handler @@ -73,25 +63,6 @@ func GetInstance() *Handler { //nolint return handlerInstance } -// NewTelemetryMessage builds a telemetry message -func NewTelemetryMessage(values ...*KeyValue) *Message { //nolint - mvals := make(map[string]interface{}) - for _, v := range values { - mvals[v.key] = v.value - } - return &Message{ - values: mvals, - } -} - -// NewKeyValue builds a key value pair for telemetry messages -func NewKeyValue(key string, value interface{}) *KeyValue { //nolint - return &KeyValue{ - key: key, - value: value, - } -} - // AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { for _, v := range conns { @@ -126,51 +97,94 @@ func (h *Handler) startListening() { go func() { for _, conn := range h.connections { conn.Lock() - fmt.Printf("SENDING %s\n", msgToJSON(msg)) - //err := conn.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg)) - err := conn.wsconn.WriteMessage(websocket.TextMessage, msgToJSON(msg)) + defer conn.Unlock() + msgBytes, err := h.msgToJSON(msg) + if err != nil || len(msgBytes) == 0 { + h.log.Debug("issue decoding telemetry message", "error", err) + return + } + err = conn.wsconn.WriteMessage(websocket.TextMessage, msgBytes) if err != nil { h.log.Warn("issue while sending telemetry message", "error", err) } - conn.Unlock() } }() } } -func msgToJSON(message interface{}) []byte { - res, err := json.Marshal(message) +func (h *Handler) msgToJSON(message interface{}) ([]byte, error) { + defer h.recoverMessage() + + messageBytes, err := json.Marshal(message) if err != nil { - return nil + return nil, err } - objMap := make(map[string]interface{}) - err = json.Unmarshal(res, &objMap) + messageMap := make(map[string]interface{}) + err = json.Unmarshal(messageBytes, &messageMap) if err != nil { - return nil + return nil, err } - objMap["ts"] = time.Now() + messageMap["ts"] = time.Now() typ := reflect.TypeOf(message) - f, _ := typ.FieldByName("Msg") - def := f.Tag.Get("default") - objMap["msg"] = def + field, found := typ.FieldByName("Msg") + if !found { + return []byte{}, errors.New("unknown telemetry message type") + } + def := field.Tag.Get("default") + messageMap["msg"] = def - fullRes, err := json.Marshal(objMap) + fullRes, err := json.Marshal(messageMap) if err != nil { - return nil + return nil, err + } + return fullRes, nil +} +func (h *Handler) recoverMessage() { + if r := recover(); r != nil { + h.log.Debug("recovered", "issue", r) } - return fullRes } +// SystemConnectedTM struct to hold system connected telemetry messages type SystemConnectedTM struct { - Authority bool `json:"authority"` - Chain string `json:"chain"` - GenesisHash string `json:"genesis_hash"` - Implementation string `json:"implementation"` - Msg string `default:"system.connected" json:"msg"` - Name string `json:"name"` - NetworkID string `json:"network_id"` - StartupTime string `json:"startup_time"` -} \ No newline at end of file + Authority bool `json:"authority"` + Chain string `json:"chain"` + GenesisHash *common.Hash `json:"genesis_hash"` + Implementation string `json:"implementation"` + Msg string `default:"system.connected" json:"msg"` + Name string `json:"name"` + NetworkID string `json:"network_id"` + StartupTime string `json:"startup_time"` + Version string `json:"version"` +} + +// BlockImportTM struct to hold block import telemetry messages +type BlockImportTM struct { + BestHash *common.Hash `json:"best"` + Height *big.Int `json:"height"` + Msg string `default:"block.import" json:"msg"` + Origin string `json:"origin"` +} + +// SystemIntervalTM struct to hold system interval telemetry messages +type SystemIntervalTM struct { + BandwidthDownload float64 `json:"bandwidth_download,omitempty"` + BandwidthUpload float64 `json:"bandwidth_upload,omitempty"` + Msg string `default:"system.interval" json:"msg"` + Peers int `json:"peers,omitempty"` + BestHash *common.Hash `json:"best,omitempty"` + BestHeight *big.Int `json:"height,omitempty"` + FinalisedHash *common.Hash `json:"finalized_hash,omitempty"` // nolint + FinalisedHeight *big.Int `json:"finalized_height,omitempty"` // nolint + TxCount *big.Int `json:"txcount,omitempty"` + UsedStateCacheSize *big.Int `json:"used_state_cache_size,omitempty"` +} + +// NetworkStateTM struct to hold network state telemetry messages +type NetworkStateTM struct { + Msg string `default:"system.network_state" json:"msg"` + State map[string]interface{} `json:"state"` +} diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index bcab30dc9f..adccb69be8 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" "github.com/gorilla/websocket" "github.com/stretchr/testify/require" @@ -42,69 +43,69 @@ func TestMain(m *testing.M) { func TestHandler_SendMulti(t *testing.T) { var wg sync.WaitGroup - wg.Add(1) + wg.Add(5) resultCh = make(chan []byte) go func() { + genesisHash := common.MustHexToHash("0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3") GetInstance().SendMessage(SystemConnectedTM{ - Authority: false, - Chain: "chain", - GenesisHash: "hash", + Authority: false, + Chain: "chain", + GenesisHash: &genesisHash, Implementation: "systemName", - Name: "nodeName", - NetworkID: "netID", - StartupTime: "startTime", + Name: "nodeName", + NetworkID: "netID", + StartupTime: "startTime", }) wg.Done() }() - //go func() { - // GetInstance().SendMessage(NewTelemetryMessage( - // NewKeyValue("authority", false), - // NewKeyValue("chain", "chain"), - // NewKeyValue("genesis_hash", "hash"), - // NewKeyValue("implementation", "systemName"), - // NewKeyValue("msg", "system.connected"), - // NewKeyValue("name", "nodeName"), - // NewKeyValue("network_id", "netID"), - // NewKeyValue("startup_time", "startTime"), - // NewKeyValue("version", "version"))) - // wg.Done() - //}() - - //go func() { - // GetInstance().SendMessage(NewTelemetryMessage( - // NewKeyValue("best", "hash"), - // NewKeyValue("height", big.NewInt(2)), - // NewKeyValue("msg", "block.import"), - // NewKeyValue("origin", "NetworkInitialSync"))) - // wg.Done() - //}() - - //go func() { - // GetInstance().SendMessage(NewTelemetryMessage( - // NewKeyValue("bandwidth_download", 2), - // NewKeyValue("bandwidth_upload", 3), - // NewKeyValue("msg", "system.interval"), - // NewKeyValue("peers", 1))) - // wg.Done() - //}() - // - //go func() { - // GetInstance().SendMessage(NewTelemetryMessage( - // NewKeyValue("best", "0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"), - // NewKeyValue("finalized_hash", "0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2"), // nolint - // NewKeyValue("finalized_height", 32256), NewKeyValue("height", 32375), // nolint - // NewKeyValue("msg", "system.interval"), NewKeyValue("txcount", 2), - // NewKeyValue("used_state_cache_size", 1886357))) - // wg.Done() - //}() + + go func() { + bh := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6") + GetInstance().SendMessage(BlockImportTM{ + BestHash: &bh, + Height: big.NewInt(2), + Origin: "NetworkInitialSync", + }) + wg.Done() + }() + + go func() { + GetInstance().SendMessage(SystemIntervalTM{ + BandwidthDownload: 2, + BandwidthUpload: 3, + Peers: 1, + }) + wg.Done() + }() + + go func() { + bestHash := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6") + finalisedHash := common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2") + GetInstance().SendMessage(SystemIntervalTM{ + BestHash: &bestHash, + BestHeight: big.NewInt(32375), + FinalisedHash: &finalisedHash, + FinalisedHeight: big.NewInt(32256), + TxCount: big.NewInt(0), + UsedStateCacheSize: big.NewInt(1234), + }) + wg.Done() + }() + + // test for handling of non telemetry message + go func() { + GetInstance().SendMessage( + common.Hash{}) + wg.Done() + }() wg.Wait() expected1 := []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`) - expected2 := []byte(`{"best":"hash","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`) - expected3 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`) + expected2 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`) + expected3 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`) expected4 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`) // nolint expected := [][]byte{expected3, expected1, expected4, expected2} @@ -112,7 +113,7 @@ func TestHandler_SendMulti(t *testing.T) { var actual [][]byte for data := range resultCh { actual = append(actual, data) - if len(actual) == 1 { + if len(actual) == 4 { break } } @@ -134,11 +135,12 @@ func TestListenerConcurrency(t *testing.T) { resultCh = make(chan []byte) for i := 0; i < qty; i++ { go func() { - GetInstance().SendMessage(NewTelemetryMessage( - NewKeyValue("best", "hash"), - NewKeyValue("height", big.NewInt(2)), - NewKeyValue("msg", "block.import"), - NewKeyValue("origin", "NetworkInitialSync"))) + bestHash := common.Hash{} + GetInstance().SendMessage(BlockImportTM{ + BestHash: &bestHash, + Height: big.NewInt(2), + Origin: "NetworkInitialSync", + }) wg.Done() }() } From 7c00e1e8009da250128179ea1f72b4b47614f0e8 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Wed, 16 Jun 2021 11:00:27 -0400 Subject: [PATCH 05/21] lint --- dot/sync/syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index f1d6866ffd..3106c0f43c 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -379,7 +379,7 @@ func (s *Service) handleBlock(block *types.Block) error { blockHash := block.Header.Hash() logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", blockHash) - err := telemetry.GetInstance().SendMessage(telemetry.BlockImportTM{ + err := telemetry.GetInstance().SendMessage(telemetry.BlockImportTM{ // nolint BestHash: &blockHash, Height: block.Header.Number, Origin: "NetworkInitialSync", From 5ceddd9271ee0aa6d394b55fbd4ea405ace9a0ef Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Wed, 16 Jun 2021 11:02:11 -0400 Subject: [PATCH 06/21] go fmt --- dot/sync/syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index 3106c0f43c..727ac4e0eb 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -379,7 +379,7 @@ func (s *Service) handleBlock(block *types.Block) error { blockHash := block.Header.Hash() logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", blockHash) - err := telemetry.GetInstance().SendMessage(telemetry.BlockImportTM{ // nolint + err := telemetry.GetInstance().SendMessage(telemetry.BlockImportTM{ // nolint BestHash: &blockHash, Height: block.Header.Number, Origin: "NetworkInitialSync", From 529bac05415723ab40e8dc43ff9f249ad15967aa Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Fri, 18 Jun 2021 10:35:07 -0400 Subject: [PATCH 07/21] lint --- dot/sync/syncer.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index b20211c24c..3c70fab65c 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -346,15 +346,16 @@ func (s *Service) handleBlock(block *types.Block) error { logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash()) - err := telemetry.GetInstance().SendMessage(telemetry.BlockImportTM{ // nolint - BestHash: &blockHash, - Height: block.Header.Number, - Origin: "NetworkInitialSync", - }) + blockHash := block.Header.Hash() + err = telemetry.GetInstance().SendMessage(telemetry.BlockImportTM{ + BestHash: &blockHash, + Height: block.Header.Number, + Origin: "NetworkInitialSync", + }) - if err != nil { - logger.Debug("problem sending block.import telemetry message", "error", err) - } + if err != nil { + logger.Debug("problem sending block.import telemetry message", "error", err) + } return nil } From 6eb7b63dc3b40aa631c32277bf2f77573d3a7360 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Fri, 18 Jun 2021 10:42:22 -0400 Subject: [PATCH 08/21] move msg building logic outside msg sending loop --- dot/telemetry/telemetry.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 95c0fbf78e..ca4ebf0b6e 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -95,14 +95,14 @@ func (h *Handler) startListening() { for { msg := <-h.msg go func() { + msgBytes, err := h.msgToJSON(msg) + if err != nil || len(msgBytes) == 0 { + h.log.Debug("issue decoding telemetry message", "error", err) + return + } for _, conn := range h.connections { conn.Lock() defer conn.Unlock() - msgBytes, err := h.msgToJSON(msg) - if err != nil || len(msgBytes) == 0 { - h.log.Debug("issue decoding telemetry message", "error", err) - return - } err = conn.wsconn.WriteMessage(websocket.TextMessage, msgBytes) if err != nil { From 13024dd9627b943bf32feb0ea27d8744c81d5ea5 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Sat, 19 Jun 2021 13:41:56 -0400 Subject: [PATCH 09/21] make telemetry messages an interface --- dot/core/messages_test.go | 6 +- dot/network/service.go | 28 ++++----- dot/node.go | 19 +++--- dot/sync/syncer.go | 10 ++-- dot/telemetry/telemetry.go | 102 +++++++++++++++++++++++++------- dot/telemetry/telemetry_test.go | 58 +++++------------- 6 files changed, 123 insertions(+), 100 deletions(-) diff --git a/dot/core/messages_test.go b/dot/core/messages_test.go index 701c5db2b4..4e2547fb4c 100644 --- a/dot/core/messages_test.go +++ b/dot/core/messages_test.go @@ -21,7 +21,7 @@ import ( "testing" "time" - . "github.com/ChainSafe/gossamer/dot/core/mocks" + . "github.com/ChainSafe/gossamer/dot/core/mocks" // nolint "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/state" "github.com/ChainSafe/gossamer/dot/types" @@ -38,7 +38,7 @@ import ( func TestService_ProcessBlockAnnounceMessage(t *testing.T) { // TODO: move to sync package - net := new(MockNetwork) + net := new(MockNetwork) // nolint cfg := &Config{ Network: net, @@ -136,7 +136,7 @@ func TestService_HandleTransactionMessage(t *testing.T) { ks := keystore.NewGlobalKeystore() ks.Acco.Insert(kp) - bp := new(MockBlockProducer) + bp := new(MockBlockProducer) // nolint blockC := make(chan types.Block) bp.On("GetBlockChannel", nil).Return(blockC) diff --git a/dot/network/service.go b/dot/network/service.go index 3f01c81804..59960424bb 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -323,11 +323,8 @@ main: case <-ticker.C: o := s.host.bwc.GetBandwidthTotals() - err := telemetry.GetInstance().SendMessage(telemetry.SystemIntervalTM{ - BandwidthDownload: o.RateIn, - BandwidthUpload: o.RateOut, - Peers: s.host.peerCount(), - }) + err := telemetry.GetInstance().SendMessage(telemetry.NewBandwidthTM(o.RateIn, o.RateOut, s.host.peerCount())) + if err != nil { logger.Debug("problem sending system.interval telemetry message", "error", err) } @@ -355,9 +352,7 @@ main: } netState["connectedPeers"] = peers - err = telemetry.GetInstance().SendMessage(telemetry.NetworkStateTM{ - State: netState, - }) + err = telemetry.GetInstance().SendMessage(telemetry.NewNetworkStateTM(netState)) if err != nil { logger.Debug("problem sending system.interval telemetry message", "error", err) } @@ -379,15 +374,14 @@ func (s *Service) sentBlockIntervalTelemetry() { } finalizedHash := finalized.Hash() - err = telemetry.GetInstance().SendMessage(telemetry.SystemIntervalTM{ - BestHash: &bestHash, - BestHeight: best.Number, - FinalisedHash: &finalizedHash, - FinalisedHeight: finalized.Number, - TxCount: big.NewInt(0), // todo (ed) determine where to get tx count - UsedStateCacheSize: big.NewInt(0), // todo (ed) determine where to get used_state_cache_size - }) - + err = telemetry.GetInstance().SendMessage(telemetry.NewBlockIntervalTM( + &bestHash, + best.Number, + &finalizedHash, + finalized.Number, + big.NewInt(0), // todo (ed) determine where to get tx count + big.NewInt(0), // todo (ed) determine where to get used_state_cache_size + )) if err != nil { logger.Debug("problem sending system.interval telemetry message", "error", err) } diff --git a/dot/node.go b/dot/node.go index c7617b9801..2d2177a34f 100644 --- a/dot/node.go +++ b/dot/node.go @@ -344,16 +344,15 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints) genesisHash := stateSrvc.Block.GenesisHash() - err = telemetry.GetInstance().SendMessage(telemetry.SystemConnectedTM{ - Authority: cfg.Core.GrandpaAuthority, - Chain: sysSrvc.ChainName(), - GenesisHash: &genesisHash, - Implementation: sysSrvc.SystemName(), - Name: cfg.Global.Name, - NetworkID: networkSrvc.NetworkState().PeerID, - StartupTime: strconv.FormatInt(time.Now().UnixNano(), 10), - Version: sysSrvc.SystemVersion(), - }) + err = telemetry.GetInstance().SendMessage(telemetry.NewSystemConnectedTM( + cfg.Core.GrandpaAuthority, + sysSrvc.ChainName(), + &genesisHash, + sysSrvc.SystemName(), + cfg.Global.Name, + networkSrvc.NetworkState().PeerID, + strconv.FormatInt(time.Now().UnixNano(), 10), + sysSrvc.SystemVersion())) if err != nil { logger.Debug("problem sending system.connected telemetry message", "err", err) } diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index 3c70fab65c..9ac360ed3d 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -347,12 +347,10 @@ func (s *Service) handleBlock(block *types.Block) error { logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash()) blockHash := block.Header.Hash() - err = telemetry.GetInstance().SendMessage(telemetry.BlockImportTM{ - BestHash: &blockHash, - Height: block.Header.Number, - Origin: "NetworkInitialSync", - }) - + err = telemetry.GetInstance().SendMessage(telemetry.NewBlockImportTM( + &blockHash, + block.Header.Number, + "NetworkInitialSync")) if err != nil { logger.Debug("problem sending block.import telemetry message", "error", err) } diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index ca4ebf0b6e..5c4e04d88e 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -20,7 +20,6 @@ import ( "encoding/json" "errors" "math/big" - "reflect" "sync" "time" @@ -38,7 +37,7 @@ type telemetryConnection struct { // Handler struct for holding telemetry related things type Handler struct { - msg chan interface{} + msg chan TelemetryMessage connections []*telemetryConnection log log.Logger } @@ -54,7 +53,7 @@ func GetInstance() *Handler { //nolint once.Do( func() { handlerInstance = &Handler{ - msg: make(chan interface{}, 256), + msg: make(chan TelemetryMessage, 256), log: log.New("pkg", "telemetry"), } go handlerInstance.startListening() @@ -81,7 +80,7 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { } // SendMessage sends Message to connected telemetry listeners -func (h *Handler) SendMessage(msg interface{}) error { +func (h *Handler) SendMessage(msg TelemetryMessage) error { select { case h.msg <- msg: @@ -113,9 +112,7 @@ func (h *Handler) startListening() { } } -func (h *Handler) msgToJSON(message interface{}) ([]byte, error) { - defer h.recoverMessage() - +func (h *Handler) msgToJSON(message TelemetryMessage) ([]byte, error) { messageBytes, err := json.Marshal(message) if err != nil { return nil, err @@ -128,13 +125,8 @@ func (h *Handler) msgToJSON(message interface{}) ([]byte, error) { } messageMap["ts"] = time.Now() - typ := reflect.TypeOf(message) - field, found := typ.FieldByName("Msg") - if !found { - return []byte{}, errors.New("unknown telemetry message type") - } - def := field.Tag.Get("default") - messageMap["msg"] = def + + messageMap["msg"] = message.messageType() fullRes, err := json.Marshal(messageMap) if err != nil { @@ -142,10 +134,9 @@ func (h *Handler) msgToJSON(message interface{}) ([]byte, error) { } return fullRes, nil } -func (h *Handler) recoverMessage() { - if r := recover(); r != nil { - h.log.Debug("recovered", "issue", r) - } + +type TelemetryMessage interface { + messageType() string } // SystemConnectedTM struct to hold system connected telemetry messages @@ -154,26 +145,57 @@ type SystemConnectedTM struct { Chain string `json:"chain"` GenesisHash *common.Hash `json:"genesis_hash"` Implementation string `json:"implementation"` - Msg string `default:"system.connected" json:"msg"` + Msg string `json:"msg"` Name string `json:"name"` NetworkID string `json:"network_id"` StartupTime string `json:"startup_time"` Version string `json:"version"` } +func NewSystemConnectedTM(authority bool, chain string, genesisHash *common.Hash, + implementation, name, networkID, startupTime, version string) *SystemConnectedTM { + return &SystemConnectedTM{ + Authority: authority, + Chain: chain, + GenesisHash: genesisHash, + Implementation: implementation, + Msg: "system.connected", + Name: name, + NetworkID: networkID, + StartupTime: startupTime, + Version: version, + } +} +func (tm *SystemConnectedTM) messageType() string { + return tm.Msg +} + // BlockImportTM struct to hold block import telemetry messages type BlockImportTM struct { BestHash *common.Hash `json:"best"` Height *big.Int `json:"height"` - Msg string `default:"block.import" json:"msg"` + Msg string `json:"msg"` Origin string `json:"origin"` } +func NewBlockImportTM(bestHash *common.Hash, height *big.Int, origin string) *BlockImportTM { + return &BlockImportTM{ + BestHash: bestHash, + Height: height, + Msg: "block.import", + Origin: origin, + } +} + +func (tm *BlockImportTM) messageType() string { + return tm.Msg +} + // SystemIntervalTM struct to hold system interval telemetry messages type SystemIntervalTM struct { BandwidthDownload float64 `json:"bandwidth_download,omitempty"` BandwidthUpload float64 `json:"bandwidth_upload,omitempty"` - Msg string `default:"system.interval" json:"msg"` + Msg string `json:"msg"` Peers int `json:"peers,omitempty"` BestHash *common.Hash `json:"best,omitempty"` BestHeight *big.Int `json:"height,omitempty"` @@ -183,8 +205,44 @@ type SystemIntervalTM struct { UsedStateCacheSize *big.Int `json:"used_state_cache_size,omitempty"` } +func NewBandwidthTM(bandwidthDownload, bandwidthUpload float64, peers int) *SystemIntervalTM { + return &SystemIntervalTM{ + BandwidthDownload: bandwidthDownload, + BandwidthUpload: bandwidthUpload, + Msg: "system.interval", + Peers: peers, + } +} + +func NewBlockIntervalTM(beshHash *common.Hash, bestHeight *big.Int, finalisedHash *common.Hash, + finalisedHeight, txCount, usedStateCacheSize *big.Int) *SystemIntervalTM { + return &SystemIntervalTM{ + Msg: "system.interval", + BestHash: beshHash, + BestHeight: bestHeight, + FinalisedHash: finalisedHash, + FinalisedHeight: finalisedHeight, + TxCount: txCount, + UsedStateCacheSize: usedStateCacheSize, + } +} + +func (tm *SystemIntervalTM) messageType() string { + return tm.Msg +} + // NetworkStateTM struct to hold network state telemetry messages type NetworkStateTM struct { - Msg string `default:"system.network_state" json:"msg"` + Msg string `json:"msg"` State map[string]interface{} `json:"state"` } + +func NewNetworkStateTM(state map[string]interface{}) *NetworkStateTM { + return &NetworkStateTM{ + Msg: "system.network_state", + State: state, + } +} +func (tm *NetworkStateTM) messageType() string { + return tm.Msg +} diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index adccb69be8..f9e606b9f8 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -43,72 +43,49 @@ func TestMain(m *testing.M) { func TestHandler_SendMulti(t *testing.T) { var wg sync.WaitGroup - wg.Add(5) + wg.Add(4) resultCh = make(chan []byte) go func() { genesisHash := common.MustHexToHash("0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3") - GetInstance().SendMessage(SystemConnectedTM{ - Authority: false, - Chain: "chain", - GenesisHash: &genesisHash, - Implementation: "systemName", - Name: "nodeName", - NetworkID: "netID", - StartupTime: "startTime", - }) + + GetInstance().SendMessage(NewSystemConnectedTM(false, "chain", &genesisHash, + "systemName", "nodeName", "netID", "startTime", "0.1")) + wg.Done() }() go func() { bh := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6") - GetInstance().SendMessage(BlockImportTM{ - BestHash: &bh, - Height: big.NewInt(2), - Origin: "NetworkInitialSync", - }) + GetInstance().SendMessage(NewBlockImportTM(&bh, big.NewInt(2), "NetworkInitialSync")) + wg.Done() }() go func() { - GetInstance().SendMessage(SystemIntervalTM{ - BandwidthDownload: 2, - BandwidthUpload: 3, - Peers: 1, - }) + GetInstance().SendMessage(NewBandwidthTM(2, 3, 1)) + wg.Done() }() go func() { bestHash := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6") finalisedHash := common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2") - GetInstance().SendMessage(SystemIntervalTM{ - BestHash: &bestHash, - BestHeight: big.NewInt(32375), - FinalisedHash: &finalisedHash, - FinalisedHeight: big.NewInt(32256), - TxCount: big.NewInt(0), - UsedStateCacheSize: big.NewInt(1234), - }) - wg.Done() - }() + GetInstance().SendMessage(NewBlockIntervalTM(&bestHash, big.NewInt(32375), &finalisedHash, + big.NewInt(32256), big.NewInt(0), big.NewInt(1234))) - // test for handling of non telemetry message - go func() { - GetInstance().SendMessage( - common.Hash{}) wg.Done() }() wg.Wait() - expected1 := []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`) + expected1 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`) expected2 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`) - expected3 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`) + expected3 := []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`) expected4 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`) // nolint - expected := [][]byte{expected3, expected1, expected4, expected2} + expected := [][]byte{expected1, expected3, expected4, expected2} var actual [][]byte for data := range resultCh { @@ -136,11 +113,8 @@ func TestListenerConcurrency(t *testing.T) { for i := 0; i < qty; i++ { go func() { bestHash := common.Hash{} - GetInstance().SendMessage(BlockImportTM{ - BestHash: &bestHash, - Height: big.NewInt(2), - Origin: "NetworkInitialSync", - }) + GetInstance().SendMessage(NewBlockImportTM(&bestHash, big.NewInt(2), "NetworkInitialSync")) + wg.Done() }() } From 7263912e38101cf7f74ba50bb55f3cd2e2e16db0 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Sat, 19 Jun 2021 14:20:13 -0400 Subject: [PATCH 10/21] Lookup transactions count from TransactionsState --- dot/core/messages.go | 5 +++++ dot/network/mock_transaction_handler.go | 16 +++++++++++++++- dot/network/service.go | 2 +- dot/network/state.go | 1 + 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/dot/core/messages.go b/dot/core/messages.go index e604da2ff3..1deaf7f584 100644 --- a/dot/core/messages.go +++ b/dot/core/messages.go @@ -49,3 +49,8 @@ func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) erro return nil } + +// TransactionsCount returns number for pending transactions in pool +func (s *Service) TransactionsCount() int { + return len(s.transactionState.PendingInPool()) +} diff --git a/dot/network/mock_transaction_handler.go b/dot/network/mock_transaction_handler.go index 45ad335bdc..03f52824a1 100644 --- a/dot/network/mock_transaction_handler.go +++ b/dot/network/mock_transaction_handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.8.0. DO NOT EDIT. +// Code generated by mockery v1.0.0. DO NOT EDIT. package network @@ -22,3 +22,17 @@ func (_m *MockTransactionHandler) HandleTransactionMessage(_a0 *TransactionMessa return r0 } + +// TransactionsCount provides a mock function with given fields: +func (_m *MockTransactionHandler) TransactionsCount() int { + ret := _m.Called() + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} diff --git a/dot/network/service.go b/dot/network/service.go index 59960424bb..098de18ac9 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -379,7 +379,7 @@ func (s *Service) sentBlockIntervalTelemetry() { best.Number, &finalizedHash, finalized.Number, - big.NewInt(0), // todo (ed) determine where to get tx count + big.NewInt(int64(s.transactionHandler.TransactionsCount())), big.NewInt(0), // todo (ed) determine where to get used_state_cache_size )) if err != nil { diff --git a/dot/network/state.go b/dot/network/state.go index 70b948d11f..433d7d1990 100644 --- a/dot/network/state.go +++ b/dot/network/state.go @@ -56,4 +56,5 @@ type Syncer interface { // TransactionHandler is the interface used by the transactions sub-protocol type TransactionHandler interface { HandleTransactionMessage(*TransactionMessage) error + TransactionsCount() int } From 6f079429d002aada2c53ffef711225d8c35f337f Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Sat, 19 Jun 2021 14:30:43 -0400 Subject: [PATCH 11/21] address comments --- dot/core/messages_test.go | 2 +- dot/network/service.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/dot/core/messages_test.go b/dot/core/messages_test.go index 4e2547fb4c..3622d4c48f 100644 --- a/dot/core/messages_test.go +++ b/dot/core/messages_test.go @@ -38,7 +38,7 @@ import ( func TestService_ProcessBlockAnnounceMessage(t *testing.T) { // TODO: move to sync package - net := new(MockNetwork) // nolint + net := new(MockNetwork) // nolint cfg := &Config{ Network: net, diff --git a/dot/network/service.go b/dot/network/service.go index 098de18ac9..4b5095d741 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -19,6 +19,7 @@ package network import ( "context" "errors" + "fmt" "io" "math/big" "os" @@ -337,7 +338,7 @@ main: netState["externalAddressess"] = hostAddrs listAddrs := []string{} for _, v := range s.host.h.Network().ListenAddresses() { - listAddrs = append(listAddrs, v.String()) + listAddrs = append(listAddrs, fmt.Sprintf("%s/p2p/%s", v, s.host.h.ID())) } netState["listenedAddressess"] = listAddrs From 545c25c59d67d8171ef40ea24ffabd9068179004 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Mon, 21 Jun 2021 15:26:17 -0400 Subject: [PATCH 12/21] fix mocks for tests --- dot/network/transaction_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dot/network/transaction_test.go b/dot/network/transaction_test.go index d38cf14f8d..65ad6a97c6 100644 --- a/dot/network/transaction_test.go +++ b/dot/network/transaction_test.go @@ -56,6 +56,7 @@ func TestHandleTransactionMessage(t *testing.T) { basePath := utils.NewTestBasePath(t, "nodeA") mockhandler := &MockTransactionHandler{} mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(nil) + mockhandler.On("TransactionsCount").Return(0) config := &Config{ BasePath: basePath, From 17bd211bd601bf9da17f4575bcbffaa44f4f8d7c Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Mon, 21 Jun 2021 15:40:02 -0400 Subject: [PATCH 13/21] lint --- dot/telemetry/telemetry.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 5c4e04d88e..77c9a79867 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -135,6 +135,7 @@ func (h *Handler) msgToJSON(message TelemetryMessage) ([]byte, error) { return fullRes, nil } +// TelemetryMessage interface for TelemetryMessage functions type TelemetryMessage interface { messageType() string } @@ -152,6 +153,7 @@ type SystemConnectedTM struct { Version string `json:"version"` } +// NewSystemConnectedTM function to create new System Connected Telemetry Message func NewSystemConnectedTM(authority bool, chain string, genesisHash *common.Hash, implementation, name, networkID, startupTime, version string) *SystemConnectedTM { return &SystemConnectedTM{ @@ -178,6 +180,7 @@ type BlockImportTM struct { Origin string `json:"origin"` } +// NewBlockImportTM function to create new Block Import Telemetry Message func NewBlockImportTM(bestHash *common.Hash, height *big.Int, origin string) *BlockImportTM { return &BlockImportTM{ BestHash: bestHash, @@ -205,6 +208,7 @@ type SystemIntervalTM struct { UsedStateCacheSize *big.Int `json:"used_state_cache_size,omitempty"` } +// NewBandwidthTM function to create new Bandwidth Telemetry Message func NewBandwidthTM(bandwidthDownload, bandwidthUpload float64, peers int) *SystemIntervalTM { return &SystemIntervalTM{ BandwidthDownload: bandwidthDownload, @@ -214,6 +218,7 @@ func NewBandwidthTM(bandwidthDownload, bandwidthUpload float64, peers int) *Syst } } +// NewBlockIntervalTM function to create new Block Interval Telemetry Message func NewBlockIntervalTM(beshHash *common.Hash, bestHeight *big.Int, finalisedHash *common.Hash, finalisedHeight, txCount, usedStateCacheSize *big.Int) *SystemIntervalTM { return &SystemIntervalTM{ @@ -237,6 +242,7 @@ type NetworkStateTM struct { State map[string]interface{} `json:"state"` } +// NewNetworkStateTM function to create new Network State Telemetry Message func NewNetworkStateTM(state map[string]interface{}) *NetworkStateTM { return &NetworkStateTM{ Msg: "system.network_state", From ac904b0b6915b72e2860087bfa393ec2c5f87c71 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Mon, 21 Jun 2021 15:44:23 -0400 Subject: [PATCH 14/21] refactor TelemetryMessage to Message --- dot/telemetry/telemetry.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 77c9a79867..60ddd6a556 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -37,7 +37,7 @@ type telemetryConnection struct { // Handler struct for holding telemetry related things type Handler struct { - msg chan TelemetryMessage + msg chan Message connections []*telemetryConnection log log.Logger } @@ -53,7 +53,7 @@ func GetInstance() *Handler { //nolint once.Do( func() { handlerInstance = &Handler{ - msg: make(chan TelemetryMessage, 256), + msg: make(chan Message, 256), log: log.New("pkg", "telemetry"), } go handlerInstance.startListening() @@ -80,7 +80,7 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { } // SendMessage sends Message to connected telemetry listeners -func (h *Handler) SendMessage(msg TelemetryMessage) error { +func (h *Handler) SendMessage(msg Message) error { select { case h.msg <- msg: @@ -112,7 +112,7 @@ func (h *Handler) startListening() { } } -func (h *Handler) msgToJSON(message TelemetryMessage) ([]byte, error) { +func (h *Handler) msgToJSON(message Message) ([]byte, error) { messageBytes, err := json.Marshal(message) if err != nil { return nil, err @@ -135,8 +135,8 @@ func (h *Handler) msgToJSON(message TelemetryMessage) ([]byte, error) { return fullRes, nil } -// TelemetryMessage interface for TelemetryMessage functions -type TelemetryMessage interface { +// Message interface for Message functions +type Message interface { messageType() string } From b0e43fa31c46f951e1897c8876932911597dcfd4 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Tue, 22 Jun 2021 10:58:12 -0400 Subject: [PATCH 15/21] update mock handler to return result --- dot/network/service_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dot/network/service_test.go b/dot/network/service_test.go index db20565b1d..cbc44f64fd 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -84,6 +84,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { if cfg.TransactionHandler == nil { mocktxhandler := &MockTransactionHandler{} mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*TransactionMessage")).Return(nil) + mocktxhandler.On("TransactionsCount").Return(0) cfg.TransactionHandler = mocktxhandler } From 3247f7e80d754b78fdd6c05bfa72e11bbe156e04 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Tue, 22 Jun 2021 11:17:02 -0400 Subject: [PATCH 16/21] add TransactionsCount to mockhandler --- dot/network/test_helpers.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dot/network/test_helpers.go b/dot/network/test_helpers.go index d5ff38f2ff..cf513983ec 100644 --- a/dot/network/test_helpers.go +++ b/dot/network/test_helpers.go @@ -59,6 +59,7 @@ func NewMockSyncer() *MockSyncer { func NewMockTransactionHandler() *MockTransactionHandler { mocktxhandler := new(MockTransactionHandler) mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(nil) + mocktxhandler.On("TransactionsCount").Return(0) return mocktxhandler } From 9ab53398faafcd7d2c336bf550ed3bfac35f1416 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Wed, 23 Jun 2021 15:43:13 -0400 Subject: [PATCH 17/21] move logic to build new network state message --- dot/network/service.go | 33 ++------------------------------- dot/telemetry/telemetry.go | 38 +++++++++++++++++++++++++++++++++++--- go.mod | 2 +- 3 files changed, 38 insertions(+), 35 deletions(-) diff --git a/dot/network/service.go b/dot/network/service.go index 4b5095d741..ba5af9fdb2 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -19,7 +19,6 @@ package network import ( "context" "errors" - "fmt" "io" "math/big" "os" @@ -306,11 +305,7 @@ func (s *Service) logPeerCount() { } } -type peerInfo struct { - Roles byte `json:"roles"` - BestHash string `json:"bestHash"` - BestNumber uint64 `json:"bestNumber"` -} + func (s *Service) publishNetworkTelemetry(done chan interface{}) { ticker := time.NewTicker(s.telemetryInterval) @@ -325,35 +320,11 @@ main: case <-ticker.C: o := s.host.bwc.GetBandwidthTotals() err := telemetry.GetInstance().SendMessage(telemetry.NewBandwidthTM(o.RateIn, o.RateOut, s.host.peerCount())) - if err != nil { logger.Debug("problem sending system.interval telemetry message", "error", err) } - netState := make(map[string]interface{}) - netState["peerId"] = s.host.h.ID() - hostAddrs := []string{} - for _, v := range s.host.h.Addrs() { - hostAddrs = append(hostAddrs, v.String()) - } - netState["externalAddressess"] = hostAddrs - listAddrs := []string{} - for _, v := range s.host.h.Network().ListenAddresses() { - listAddrs = append(listAddrs, fmt.Sprintf("%s/p2p/%s", v, s.host.h.ID())) - } - netState["listenedAddressess"] = listAddrs - - peers := make(map[string]interface{}) - for _, v := range s.Peers() { - p := &peerInfo{ - Roles: v.Roles, - BestHash: v.BestHash.String(), - BestNumber: v.BestNumber, - } - peers[v.PeerID] = *p - } - netState["connectedPeers"] = peers - err = telemetry.GetInstance().SendMessage(telemetry.NewNetworkStateTM(netState)) + err = telemetry.GetInstance().SendMessage(telemetry.NewNetworkStateTM(s.host.h, s.Peers())) if err != nil { logger.Debug("problem sending system.interval telemetry message", "error", err) } diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 60ddd6a556..5fd2077b06 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -19,6 +19,7 @@ package telemetry import ( "encoding/json" "errors" + "fmt" "math/big" "sync" "time" @@ -27,6 +28,7 @@ import ( "github.com/ChainSafe/gossamer/lib/genesis" log "github.com/ChainSafe/log15" "github.com/gorilla/websocket" + libp2phost "github.com/libp2p/go-libp2p-core/host" ) type telemetryConnection struct { @@ -95,7 +97,7 @@ func (h *Handler) startListening() { msg := <-h.msg go func() { msgBytes, err := h.msgToJSON(msg) - if err != nil || len(msgBytes) == 0 { + if err != nil { h.log.Debug("issue decoding telemetry message", "error", err) return } @@ -236,6 +238,12 @@ func (tm *SystemIntervalTM) messageType() string { return tm.Msg } +type peerInfo struct { + Roles byte `json:"roles"` + BestHash string `json:"bestHash"` + BestNumber uint64 `json:"bestNumber"` +} + // NetworkStateTM struct to hold network state telemetry messages type NetworkStateTM struct { Msg string `json:"msg"` @@ -243,10 +251,34 @@ type NetworkStateTM struct { } // NewNetworkStateTM function to create new Network State Telemetry Message -func NewNetworkStateTM(state map[string]interface{}) *NetworkStateTM { +func NewNetworkStateTM(host libp2phost.Host, peerInfos []common.PeerInfo) *NetworkStateTM { + netState := make(map[string]interface{}) + netState["peerId"] = host.ID() + hostAddrs := []string{} + for _, v := range host.Addrs() { + hostAddrs = append(hostAddrs, v.String()) + } + netState["externalAddressess"] = hostAddrs + listAddrs := []string{} + for _, v := range host.Network().ListenAddresses() { + listAddrs = append(listAddrs, fmt.Sprintf("%s/p2p/%s", v, host.ID())) + } + netState["listenedAddressess"] = listAddrs + + peers := make(map[string]interface{}) + for _, v := range peerInfos { + p := &peerInfo{ + Roles: v.Roles, + BestHash: v.BestHash.String(), + BestNumber: v.BestNumber, + } + peers[v.PeerID] = *p + } + netState["connectedPeers"] = peers + return &NetworkStateTM{ Msg: "system.network_state", - State: state, + State: netState, } } func (tm *NetworkStateTM) messageType() string { diff --git a/go.mod b/go.mod index 519258043d..1410d97cf1 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7 // indirect github.com/golang/protobuf v1.4.3 github.com/golang/snappy v0.0.3-0.20201103224600-674baa8c7fc3 // indirect - github.com/google/go-cmp v0.5.6 // indirect + github.com/google/go-cmp v0.5.6 github.com/google/uuid v1.1.5 // indirect github.com/gorilla/mux v1.7.4 github.com/gorilla/rpc v1.2.0 From 08fd38a5dac339db8df39dc5d565e0e9b3e3c8ed Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Wed, 23 Jun 2021 15:47:25 -0400 Subject: [PATCH 18/21] lint --- dot/network/service.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/dot/network/service.go b/dot/network/service.go index ba5af9fdb2..21ef9ce0f9 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -305,8 +305,6 @@ func (s *Service) logPeerCount() { } } - - func (s *Service) publishNetworkTelemetry(done chan interface{}) { ticker := time.NewTicker(s.telemetryInterval) defer ticker.Stop() From e5c4de93e9ef1ef8ea6d2293edcbdc045b41558b Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Wed, 23 Jun 2021 15:58:50 -0400 Subject: [PATCH 19/21] fix interface --- dot/network/state.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/state.go b/dot/network/state.go index 433d7d1990..323fdf2f02 100644 --- a/dot/network/state.go +++ b/dot/network/state.go @@ -55,6 +55,6 @@ type Syncer interface { // TransactionHandler is the interface used by the transactions sub-protocol type TransactionHandler interface { - HandleTransactionMessage(*TransactionMessage) error + HandleTransactionMessage(*TransactionMessage) (bool, error) TransactionsCount() int } From 02f03dbe25c52efa79e866dcf8b0d0d8aeb95a1a Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Thu, 24 Jun 2021 10:43:53 -0400 Subject: [PATCH 20/21] update mockhandler --- dot/network/transaction_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/transaction_test.go b/dot/network/transaction_test.go index 65ad6a97c6..41b2f38d63 100644 --- a/dot/network/transaction_test.go +++ b/dot/network/transaction_test.go @@ -55,7 +55,7 @@ func TestDecodeTransactionMessage(t *testing.T) { func TestHandleTransactionMessage(t *testing.T) { basePath := utils.NewTestBasePath(t, "nodeA") mockhandler := &MockTransactionHandler{} - mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(nil) + mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil) mockhandler.On("TransactionsCount").Return(0) config := &Config{ From 0321eca9562f986db693b21ce539b1ca8198a3e8 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Tue, 29 Jun 2021 13:32:00 -0400 Subject: [PATCH 21/21] lint --- dot/telemetry/telemetry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index e4e091ebc9..2df26725b7 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -86,7 +86,7 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { } // SendMessage sends Message to connected telemetry listeners -func (h *Handler) SendMessage(msg *Message) error { +func (h *Handler) SendMessage(msg Message) error { t := time.NewTicker(h.sendMessageTimeout) defer t.Stop() select {