Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions tests/libp2p/pubsub/component/test_gossipsub_heartbeat.nim
Original file line number Diff line number Diff line change
Expand Up @@ -147,35 +147,36 @@ suite "GossipSub Component - Heartbeat":
expectedGrafts &= peer

# Then during heartbeat Peers with lower than median scores are pruned and max 2 Peers are grafted
await waitForHeartbeat(heartbeatInterval)

let actualGrafts = node0.mesh[topic].toSeq().filterIt(it notin startingMesh)
check:
actualGrafts.len == MaxOpportunisticGraftPeers
actualGrafts.allIt(it in expectedGrafts)
untilTimeout:
pre:
let actualGrafts = node0.mesh[topic].toSeq().filterIt(it notin startingMesh)
check:
actualGrafts.len == MaxOpportunisticGraftPeers
actualGrafts.allIt(it in expectedGrafts)

asyncTest "Fanout maintenance during heartbeat - expired peers are dropped":
const
numberOfNodes = 10
topic = "foobar"
heartbeatInterval = 200.milliseconds
let nodes = generateNodes(
numberOfNodes,
gossip = true,
fanoutTTL = 60.milliseconds,
heartbeatInterval = heartbeatInterval,
)
.toGossipSub()
let
nodes = generateNodes(
numberOfNodes,
gossip = true,
fanoutTTL = 60.milliseconds,
heartbeatInterval = heartbeatInterval,
)
.toGossipSub()
node0 = nodes[0]

startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)

# All nodes but Node0 are subscribed to the topic
for node in nodes[1 .. ^1]:
node.subscribe(topic, voidTopicHandler)
subscribeAllNodes(nodes[1 .. ^1], topic, voidTopicHandler)
await waitForHeartbeat(heartbeatInterval)

let node0 = nodes[0]
checkUntilTimeout:
node0.gossipsub.hasKey(topic)

Expand Down Expand Up @@ -207,8 +208,7 @@ suite "GossipSub Component - Heartbeat":
await connectNodesStar(nodes)

# All nodes but Node0 are subscribed to the topic
for node in nodes[1 .. ^1]:
node.subscribe(topic, voidTopicHandler)
subscribeAllNodes(nodes[1 .. ^1], topic, voidTopicHandler)
await waitForHeartbeat(heartbeatInterval)

# When Node0 sends a message to the topic
Expand Down
28 changes: 18 additions & 10 deletions tests/libp2p/pubsub/component/test_gossipsub_mesh_management.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import chronos, std/[sequtils]
import ../../../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, pubsubpeer]
import ../../../tools/[unittest]
import ../../../tools/[unittest, futures]
import ../utils

suite "GossipSub Component - Mesh Management":
Expand Down Expand Up @@ -176,21 +176,24 @@ suite "GossipSub Component - Mesh Management":
subscribeAllNodes(nodes, topic, voidTopicHandler)

# Then mesh and gossipsub should be populated
for node in nodes:
check node.topics.contains(topic)
check node.gossipsub.hasKey(topic)
check node.gossipsub[topic].len() == numberOfNodes - 1
check node.mesh.hasKey(topic)
check node.mesh[topic].len() == numberOfNodes - 1
for n in nodes:
let node = n
checkUntilTimeout:
node.topics.contains(topic)
node.gossipsub.hasKey(topic)
node.gossipsub[topic].len() == numberOfNodes - 1
node.mesh.hasKey(topic)
node.mesh[topic].len() == numberOfNodes - 1

# When all nodes unsubscribe from the topic
unsubscribeAllNodes(nodes, topic, voidTopicHandler)

# Then the topic should be removed from mesh and gossipsub
for node in nodes:
check topic notin node.topics
check topic notin node.mesh
check topic notin node.gossipsub
check:
topic notin node.topics
topic notin node.mesh
topic notin node.gossipsub

asyncTest "handle subscribe and unsubscribe for multiple topics":
let
Expand Down Expand Up @@ -333,6 +336,11 @@ suite "GossipSub Component - Mesh Management":
await connectNodes(nodes[0], nodes[2]) # Out
await connectNodes(nodes[3], nodes[0]) # In
subscribeAllNodes(nodes, topic, voidTopicHandler, wait = false)
await allFuturesThrowing(
waitSub(nodes[0], nodes[1], topic),
waitSub(nodes[0], nodes[2], topic),
waitSub(nodes[3], nodes[0], topic),
)

checkUntilTimeout:
nodes[0].mesh.outboundPeers(topic) == 2
Expand Down
10 changes: 5 additions & 5 deletions tests/libp2p/pubsub/component/test_gossipsub_message_cache.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import chronos, std/[sequtils], stew/byteutils
import
../../../../libp2p/protocols/pubsub/
[gossipsub, mcache, peertable, floodsub, rpc/messages, rpc/message]
import ../../../tools/[unittest]
import ../../../tools/[unittest, futures]
import ../utils

suite "GossipSub Component - Message Cache":
Expand Down Expand Up @@ -222,7 +222,10 @@ suite "GossipSub Component - Message Cache":
await connectNodes(nodes[0], nodes[1])
nodes[0].subscribe(topic, voidTopicHandler)
nodes[1].subscribe(topic, voidTopicHandler)
await waitSub(nodes[0], nodes[1], topic)
await allFuturesThrowing(
waitSub(nodes[0], nodes[1], topic), #
waitSub(nodes[1], nodes[0], topic),
)

# When Node0 publishes two messages to the topic
tryPublish await nodes[0].publish(topic, "Hello".toBytes()), 1
Expand All @@ -246,7 +249,6 @@ suite "GossipSub Component - Message Cache":
await waitSub(nodes[0], nodes[2], topic)

# And messageIds are added to node0PeerNode2 sentIHaves to allow processing IWant
# let node0PeerNode2 =
let node0PeerNode2 = nodes[0].getPeerByPeerId(topic, nodes[2].peerInfo.peerId)
node0PeerNode2.sentIHaves[0].incl(messageId1)
node0PeerNode2.sentIHaves[0].incl(messageId2)
Expand All @@ -263,8 +265,6 @@ suite "GossipSub Component - Message Cache":
@[node2PeerNode0], RPCMsg(control: some(iWantMessage)), isHighPriority = false
)

await waitForHeartbeat()

# Then Node2 receives only messageId2 and messageId1 is dropped
checkUntilTimeout:
nodes[2].mcache.window(topic).toSeq() == @[messageId2]
Expand Down
7 changes: 4 additions & 3 deletions tests/libp2p/pubsub/component/test_gossipsub_scoring.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ suite "GossipSub Component - Scoring":
# Nodes are subscribed to the same topic
nodes[1].subscribe(topic, handler1)
nodes[2].subscribe(topic, handler2)
await waitForHeartbeat()
await waitSub(nodes[0], nodes[1], topic)
await waitSub(nodes[0], nodes[2], topic)

# Given node 2's score is below the threshold
for peer in g0.gossipsub.getOrDefault(topic):
Expand Down Expand Up @@ -245,7 +246,7 @@ suite "GossipSub Component - Scoring":
var (handlerFut, handler) = createCompleteHandler()
nodes[0].subscribe(topic, voidTopicHandler)
nodes[1].subscribe(topic, handler)
await waitForHeartbeat()
await waitSub(nodes[0], nodes[1], topic)

nodes[1].updateScores()

Expand Down Expand Up @@ -345,7 +346,7 @@ suite "GossipSub Component - Scoring":
var (handlerFut, handler) = createCompleteHandler()
nodes[0].subscribe(topic, voidTopicHandler)
nodes[1].subscribe(topic, handler)
await waitForHeartbeat()
await waitSub(nodes[0], nodes[1], topic)

tryPublish await nodes[0].publish(topic, toBytes("hello")), 1

Expand Down
35 changes: 35 additions & 0 deletions tests/tools/test_unittest.nim
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,38 @@ suite "checkUntilTimeout helpers - failed":
asyncTest "checkUntilTimeoutCustom should timeout if condition is never true":
checkUntilTimeoutCustom(100.milliseconds, 10.milliseconds):
false

suite "untilTimeout helpers":
asyncTest "untilTimeout should pass after few attempts":
let a = 2
var b = 0

untilTimeout:
pre:
b.inc
check:
a == b

# final check ensures that untilTimeout is actually called
check:
a == b

asyncTest "untilTimeout should pass after few attempts: multi condition":
let goal1 = 2
let goal2 = 4
var val1 = 0
var val2 = 0

untilTimeout:
pre:
val1.inc
val2.inc
val2.inc
check:
val1 == goal1
val2 == goal2

# final check ensures that untilTimeout is actually called
check:
val1 == goal1
val2 == goal2
80 changes: 69 additions & 11 deletions tests/tools/unittest.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,74 @@ template asyncTest*(name: string, body: untyped): untyped =
)()
)

proc buildAndExpr(n: NimNode): NimNode =
# Helper proc to recursively build a combined boolean expression

if n.kind == nnkStmtList and n.len > 0:
var combinedExpr = n[0] # Start with the first expression
for i in 1 ..< n.len:
# Combine the current expression with the next using 'and'
combinedExpr = newCall("and", combinedExpr, n[i])
return combinedExpr
else:
return n

const
timeoutDefault: Duration = 30.seconds
sleepIntervalDefault: Duration = 50.milliseconds

macro untilTimeout*(args: untyped): untyped =
## Periodically checks a given condition until it is true or a timeout occurs.
##
## `pre`: untyped - Any logic that needs to be updated before calling `check`.
## `check`: untyped - A condition expression that should eventually evaluate to true.
##
## Examples:
## ```nim
## # Example 1:
## untilTimeout:
## pre:
## let value = getLatestValue()
## check:
## value == 3
if args.kind != nnkStmtList:
error "untilTimeout requires a block with check: and pre:"

var checkBlock: NimNode = nil
var preconditionBlock: NimNode = nil

for stmt in args:
if stmt.kind == nnkCall and $stmt[0] == "check":
checkBlock = stmt[1]
elif stmt.kind == nnkCall and $stmt[0] == "pre":
preconditionBlock = stmt[1]

if checkBlock.isNil or preconditionBlock.isNil:
error "untilTimeout block must contain both `check:` and `pre:` sections."

let combinedBoolExpr = buildAndExpr(checkBlock)

result = quote:
proc checkExpiringInternal(): Future[void] {.gensym, async.} =
let start = Moment.now()
while true:
if Moment.now() > (start + `timeoutDefault`):
checkpoint(
"[TIMEOUT] Timeout was reached and the conditions were not true. Check if the code is working as " &
"expected or consider increasing the timeout param."
)
`preconditionBlock`
check `checkBlock`
return
else:
`preconditionBlock`
if `combinedBoolExpr`:
return
else:
await sleepAsync(`sleepIntervalDefault`)

await checkExpiringInternal()

macro checkUntilTimeoutCustom*(
timeout: Duration, sleepInterval: Duration, code: untyped
): untyped =
Expand All @@ -76,16 +144,6 @@ macro checkUntilTimeoutCustom*(
## a == 2
## b == 1
## ```
# Helper proc to recursively build a combined boolean expression
proc buildAndExpr(n: NimNode): NimNode =
if n.kind == nnkStmtList and n.len > 0:
var combinedExpr = n[0] # Start with the first expression
for i in 1 ..< n.len:
# Combine the current expression with the next using 'and'
combinedExpr = newCall("and", combinedExpr, n[i])
return combinedExpr
else:
return n

# Build the combined expression
let combinedBoolExpr = buildAndExpr(code)
Expand Down Expand Up @@ -131,7 +189,7 @@ macro checkUntilTimeout*(code: untyped): untyped =
## b == 1
## ```
result = quote:
checkUntilTimeoutCustom(30.seconds, 50.milliseconds, `code`)
checkUntilTimeoutCustom(timeoutDefault, sleepIntervalDefault, `code`)

template finalCheckTrackers*(): untyped =
# finalCheckTrackers is a utility used for performing a final tracker check
Expand Down
Loading