Skip to content
174 changes: 164 additions & 10 deletions tests/pubsub/integration/testgossipsubscoring.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

{.used.}

import std/[sequtils]
import std/[sequtils, strutils]
import stew/byteutils
import ../utils
import ../../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, pubsubpeer]
Expand All @@ -18,13 +18,14 @@ import ../../helpers
import ../../utils/[futures]

suite "GossipSub Integration - Scoring":
const topic = "foobar"

teardown:
checkTrackers()

asyncTest "Flood publish to all peers with score above threshold, regardless of subscription":
let
numberOfNodes = 3
topic = "foobar"
nodes = generateNodes(numberOfNodes, gossip = true, floodPublish = true)
g0 = GossipSub(nodes[0])

Expand Down Expand Up @@ -58,7 +59,6 @@ suite "GossipSub Integration - Scoring":
results[1].isPending()

asyncTest "Should not rate limit decodable messages below the size allowed":
const topic = "foobar"
let
nodes = generateNodes(
2,
Expand Down Expand Up @@ -101,7 +101,6 @@ suite "GossipSub Integration - Scoring":
currentRateLimitHits() == rateLimitHits

asyncTest "Should rate limit undecodable messages above the size allowed":
const topic = "foobar"
let
nodes = generateNodes(
2,
Expand Down Expand Up @@ -140,7 +139,6 @@ suite "GossipSub Integration - Scoring":
currentRateLimitHits() == rateLimitHits + 2

asyncTest "Should rate limit decodable messages above the size allowed":
const topic = "foobar"
let
nodes = generateNodes(
2,
Expand Down Expand Up @@ -202,7 +200,6 @@ suite "GossipSub Integration - Scoring":
currentRateLimitHits() == rateLimitHits + 2

asyncTest "Should rate limit invalid messages above the size allowed":
const topic = "foobar"
let
nodes = generateNodes(
2,
Expand Down Expand Up @@ -250,7 +247,6 @@ suite "GossipSub Integration - Scoring":
currentRateLimitHits() == rateLimitHits + 2

asyncTest "DirectPeers: don't kick direct peer with low score":
const topic = "foobar"
let nodes = generateNodes(2, gossip = true).toGossipSub()

startNodesAndDeferStop(nodes)
Expand Down Expand Up @@ -280,9 +276,7 @@ suite "GossipSub Integration - Scoring":
futResult.isCompleted(true)

asyncTest "Peers disconnections mechanics":
const
numberOfNodes = 10
topic = "foobar"
const numberOfNodes = 10
let nodes =
generateNodes(numberOfNodes, gossip = true, triggerSelf = true).toGossipSub()

Expand Down Expand Up @@ -357,6 +351,8 @@ suite "GossipSub Integration - Scoring":
let nodes =
generateNodes(2, gossip = true, decayInterval = decayInterval).toGossipSub()

nodes.setDefaultTopicParams(topic)

startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)

Expand All @@ -379,3 +375,161 @@ suite "GossipSub Integration - Scoring":
check:
nodes[0].peerStats[nodes[1].peerInfo.peerId].topicInfos[topic].meshMessageDeliveries in
50.0 .. 66.0

asyncTest "Nodes publishing invalid messages are penalised and disconnected":
# Given GossipSub nodes with Topic Params
const numberOfNodes = 3

let
nodes = generateNodes(
numberOfNodes,
gossip = true,
verifySignature = false,
# Disable signature verification to isolate validation penalties
decayInterval = 200.milliseconds, # scoring heartbeat interval
heartbeatInterval = 5.seconds,
# heartbeatInterval >>> decayInterval to prevent prunning peers with bad score
publishThreshold = -150.0,
graylistThreshold = -200.0,
disconnectBadPeers = false,
)
.toGossipSub()
centerNode = nodes[0]
node1peerId = nodes[1].peerInfo.peerId
node2peerId = nodes[2].peerInfo.peerId

nodes.setDefaultTopicParams(topic)
for node in nodes:
node.topicParams[topic].invalidMessageDeliveriesWeight = -10.0
node.topicParams[topic].invalidMessageDeliveriesDecay = 0.9

startNodesAndDeferStop(nodes)

# And Node 0 is center node, connected to others
await connectNodes(nodes[0], nodes[1]) # center to Node 1 (valid messages)
await connectNodes(nodes[0], nodes[2]) # center to Node 2 (invalid messages)

nodes.subscribeAllNodes(topic, voidTopicHandler)

# And center node has message validator: accept from node 1, reject from node 2
var validatedMessageCount = 0
proc validationHandler(
topic: string, message: Message
): Future[ValidationResult] {.async.} =
validatedMessageCount.inc
if string.fromBytes(message.data).contains("invalid"):
return ValidationResult.Reject # reject invalid messages
else:
return ValidationResult.Accept

nodes[0].addValidator(topic, validationHandler)

# 1st scoring heartbeat
checkUntilTimeout:
centerNode.gossipsub.getOrDefault(topic).len == numberOfNodes - 1
centerNode.getPeerScore(node1peerId) > 0
centerNode.getPeerScore(node2peerId) > 0

# When messages are broadcasted
const messagesToSend = 5
for i in 0 ..< messagesToSend:
nodes[1].broadcast(
nodes[1].mesh[topic],
RPCMsg(messages: @[Message(topic: topic, data: ("valid_" & $i).toBytes())]),
isHighPriority = true,
)
nodes[2].broadcast(
nodes[2].mesh[topic],
RPCMsg(messages: @[Message(topic: topic, data: ("invalid_" & $i).toBytes())]),
isHighPriority = true,
)

# And messages are processed
# Then invalidMessageDeliveries stats are applied
checkUntilTimeout:
validatedMessageCount == messagesToSend * (numberOfNodes - 1)
centerNode.getPeerTopicInfo(node1peerId, topic).invalidMessageDeliveries == 0.0
# valid messages
centerNode.getPeerTopicInfo(node2peerId, topic).invalidMessageDeliveries == 5.0
# invalid messages

# When scoring hartbeat occurs (2nd scoring heartbeat)
# Then peer scores are calculated
checkUntilTimeout:
# node1: p1 (time in mesh) + p2 (first message deliveries)
centerNode.getPeerScore(node1peerId) > 5.0 and
centerNode.getPeerScore(node1peerId) < 6.0
# node2: p1 (time in mesh) - p4 (invalid message deliveries)
centerNode.getPeerScore(node2peerId) < -249.0 and
centerNode.getPeerScore(node2peerId) > -250.0
# all peers are still connected
centerNode.mesh[topic].toSeq().len == 2

# When disconnecting peers with bad score (score < graylistThreshold) is enabled
for node in nodes:
node.parameters.disconnectBadPeers = true

# Then peers with bad score are disconnected on scoring heartbeat (3rd scoring heartbeat)
checkUntilTimeout:
centerNode.mesh[topic].toSeq().len == 1

asyncTest "Nodes not meeting Mesh Message Deliveries Threshold are penalised":
# Given GossipSub nodes with Topic Params
const numberOfNodes = 2

let
nodes = generateNodes(
numberOfNodes,
gossip = true,
decayInterval = 200.milliseconds, # scoring heartbeat interval
heartbeatInterval = 5.seconds,
# heartbeatInterval >>> decayInterval to prevent prunning peers with bad score
disconnectBadPeers = false,
)
.toGossipSub()
node1PeerId = nodes[1].peerInfo.peerId

nodes.setDefaultTopicParams(topic)
for node in nodes:
node.topicParams[topic].meshMessageDeliveriesThreshold = 5
node.topicParams[topic].meshMessageDeliveriesActivation = 1.milliseconds
# active from the start
node.topicParams[topic].meshMessageDeliveriesDecay = 0.9
node.topicParams[topic].meshMessageDeliveriesWeight = -10.0
node.topicParams[topic].meshFailurePenaltyDecay = 0.9
node.topicParams[topic].meshFailurePenaltyWeight = -5.0

startNodesAndDeferStop(nodes)

# And Nodes are connected and subscribed to the topic
await connectNodes(nodes[0], nodes[1])
nodes.subscribeAllNodes(topic, voidTopicHandler)

# When scoring heartbeat occurs
# Then Peer has negative score due to active meshMessageDeliveries deficit
checkUntilTimeout:
nodes[0].gossipsub.getOrDefault(topic).len == numberOfNodes - 1
nodes[0].mesh.getOrDefault(topic).len == numberOfNodes - 1
# p1 (time in mesh) - p3 (mesh message deliveries)
nodes[0].getPeerScore(node1PeerId) < -249.0

# When Peer is unsubscribed
nodes[1].unsubscribe(topic, voidTopicHandler)

# Then meshFailurePenalty is applied due to active meshMessageDeliveries deficit
checkUntilTimeout:
nodes[0].getPeerTopicInfo(node1PeerId, topic).meshFailurePenalty == 25

# When next scoring heartbeat occurs
# Then Peer has negative score
checkUntilTimeout:
# p3b (mesh failure penalty) [p1 and p3 not calculated when peer was pruned]
nodes[0].getPeerScore(node1PeerId) == -125.0

# When Peer subscribes again
nodes[1].subscribe(topic, voidTopicHandler)

# Then Peer is not grafted to the mesh due to negative score (score was retained)
checkUntilTimeout:
nodes[0].gossipsub.getOrDefault(topic).len == numberOfNodes - 1
nodes[0].mesh.getOrDefault(topic).len == 0
27 changes: 22 additions & 5 deletions tests/pubsub/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ proc generateNodes*(
historyGossip = 5,
gossipThreshold = -100.0,
decayInterval = 1.seconds,
publishThreshold = -1000.0,
graylistThreshold = -10000.0,
disconnectBadPeers: bool = false,
): seq[PubSub] =
for i in 0 ..< num:
let switch = newStandardSwitch(
Expand Down Expand Up @@ -225,15 +228,14 @@ proc generateNodes*(
p.opportunisticGraftThreshold = opportunisticGraftThreshold
p.gossipThreshold = gossipThreshold
p.decayInterval = decayInterval
p.publishThreshold = publishThreshold
p.graylistThreshold = graylistThreshold
p.disconnectBadPeers = disconnectBadPeers
if gossipFactor.isSome: p.gossipFactor = gossipFactor.get
applyDValues(p, dValues)
p
),
)
# set some testing params, to enable scores
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0
g.topicParams.mgetOrPut("bar", TopicParams.init()).topicWeight = 1.0
if codecs.len != 0:
g.codecs = codecs
g.PubSub
Expand All @@ -254,18 +256,33 @@ proc generateNodes*(
proc toGossipSub*(nodes: seq[PubSub]): seq[GossipSub] =
return nodes.mapIt(GossipSub(it))

proc setDefaultTopicParams*(nodes: seq[GossipSub], topic: string): void =
for node in nodes:
node.topicParams.mgetOrPut(topic, TopicParams.init()).topicWeight = 1.0

proc getNodeByPeerId*[T: PubSub](nodes: seq[T], peerId: PeerId): GossipSub =
let filteredNodes = nodes.filterIt(it.peerInfo.peerId == peerId)
check:
filteredNodes.len == 1
return filteredNodes[0]

proc getPeerByPeerId*[T: PubSub](node: T, topic: string, peerId: PeerId): PubSubPeer =
let filteredPeers = node.gossipsub[topic].toSeq().filterIt(it.peerId == peerId)
let filteredPeers =
node.gossipsub.getOrDefault(topic).toSeq().filterIt(it.peerId == peerId)
check:
filteredPeers.len == 1
return filteredPeers[0]

proc getPeerStats*(node: GossipSub, peerId: PeerId): PeerStats =
node.peerStats.withValue(peerId, stats):
return stats[]

proc getPeerScore*(node: GossipSub, peerId: PeerId): float64 =
return node.getPeerStats(peerId).score

proc getPeerTopicInfo*(node: GossipSub, peerId: PeerId, topic: string): TopicInfo =
return node.getPeerStats(peerId).topicInfos.getOrDefault(topic)

proc connectNodes*[T: PubSub](dialer: T, target: T) {.async.} =
doAssert dialer.switch.peerInfo.peerId != target.switch.peerInfo.peerId,
"Could not connect same peer"
Expand Down
Loading