Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 49 additions & 44 deletions beacon_node/network/src/subnet_service/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,16 @@ fn get_subnet_service() -> SubnetService<TestBeaconChainType> {
)
}

// gets a number of events from the subscription service, or returns none if it times out after a number
// of slots
async fn get_events<S: Stream<Item = SubnetServiceMessage> + Unpin>(
// gets a number of events from the subscription service, or returns none if it times out after a
// specified duration.
async fn get_events_until_timeout<S: Stream<Item = SubnetServiceMessage> + Unpin>(
stream: &mut S,
num_events: Option<usize>,
num_slots_before_timeout: u32,
timeout: Duration,
) -> Vec<SubnetServiceMessage> {
let mut events = Vec::new();

let timeout =
tokio::time::sleep(Duration::from_millis(SLOT_DURATION_MILLIS) * num_slots_before_timeout);
futures::pin_mut!(timeout);
let sleep = tokio::time::sleep(timeout);
futures::pin_mut!(sleep);

loop {
tokio::select! {
Expand All @@ -139,7 +137,7 @@ async fn get_events<S: Stream<Item = SubnetServiceMessage> + Unpin>(
}
}
}
_ = timeout.as_mut() => {
_ = sleep.as_mut() => {
break;
}

Expand All @@ -149,6 +147,17 @@ async fn get_events<S: Stream<Item = SubnetServiceMessage> + Unpin>(
events
}

// gets a number of events from the subscription service, or returns none if it times out after a number
// of slots
async fn get_events_until_num_slots<S: Stream<Item = SubnetServiceMessage> + Unpin>(
stream: &mut S,
num_events: Option<usize>,
num_slots_before_timeout: u32,
) -> Vec<SubnetServiceMessage> {
let timeout = Duration::from_millis(SLOT_DURATION_MILLIS) * num_slots_before_timeout;
get_events_until_timeout(stream, num_events, timeout).await
}

mod test {

#[cfg(not(windows))]
Expand Down Expand Up @@ -196,7 +205,7 @@ mod test {

// create the attestation service and subscriptions
let mut subnet_service = get_subnet_service();
let _events = get_events(&mut subnet_service, None, 1).await;
let _events = get_events_until_num_slots(&mut subnet_service, None, 1).await;

let current_slot = subnet_service
.beacon_chain
Expand Down Expand Up @@ -249,7 +258,7 @@ mod test {
];

// Wait for 1 slot duration to get the unsubscription event
let events = get_events(
let events = get_events_until_num_slots(
&mut subnet_service,
Some(2),
(MainnetEthSpec::slots_per_epoch()) as u32,
Expand Down Expand Up @@ -281,7 +290,7 @@ mod test {

// create the subnet service and subscriptions
let mut subnet_service = get_subnet_service();
let _events = get_events(&mut subnet_service, None, 0).await;
let _events = get_events_until_num_slots(&mut subnet_service, None, 0).await;
let current_slot = subnet_service
.beacon_chain
.slot_clock
Expand Down Expand Up @@ -330,14 +339,14 @@ mod test {

if subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
// If we are permanently subscribed to this subnet, we won't see a subscribe message
let _ = get_events(&mut subnet_service, None, 1).await;
let _ = get_events_until_num_slots(&mut subnet_service, None, 1).await;
} else {
let subscription = get_events(&mut subnet_service, None, 1).await;
let subscription = get_events_until_num_slots(&mut subnet_service, None, 1).await;
assert_eq!(subscription, [expected]);
}

// Get event for 1 more slot duration, we should get the unsubscribe event now.
let unsubscribe_event = get_events(&mut subnet_service, None, 1).await;
let unsubscribe_event = get_events_until_num_slots(&mut subnet_service, None, 1).await;

// If the long lived and short lived subnets are different, we should get an unsubscription
// event.
Expand Down Expand Up @@ -376,7 +385,7 @@ mod test {
// submit the subscriptions
subnet_service.validator_subscriptions(subscriptions.into_iter());

let events = get_events(&mut subnet_service, Some(130), 10).await;
let events = get_events_until_num_slots(&mut subnet_service, Some(130), 10).await;
let mut discover_peer_count = 0;
let mut enr_add_count = 0;
let mut unsubscribe_event_count = 0;
Expand Down Expand Up @@ -445,7 +454,7 @@ mod test {
// submit the subscriptions
subnet_service.validator_subscriptions(subscriptions.into_iter());

let events = get_events(&mut subnet_service, None, 3).await;
let events = get_events_until_num_slots(&mut subnet_service, None, 3).await;
let mut discover_peer_count = 0;
let mut enr_add_count = 0;
let mut unexpected_msg_count = 0;
Expand Down Expand Up @@ -495,7 +504,7 @@ mod test {
// create the attestation service and subscriptions
let mut subnet_service = get_subnet_service();
// Remove permanent events
let _events = get_events(&mut subnet_service, None, 0).await;
let _events = get_events_until_num_slots(&mut subnet_service, None, 0).await;

let current_slot = subnet_service
.beacon_chain
Expand Down Expand Up @@ -560,7 +569,7 @@ mod test {

// Unsubscription event should happen at the end of the slot.
// We wait for 2 slots, to avoid timeout issues
let events = get_events(&mut subnet_service, None, 2).await;
let events = get_events_until_num_slots(&mut subnet_service, None, 2).await;

let expected_subscription =
SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1));
Expand All @@ -577,28 +586,26 @@ mod test {
println!("{events:?}");
let subscription_slot = current_slot + subscription_slot2 - 1; // one less do to the
// advance subscription time
let wait_slots = subnet_service
let wait_duration = subnet_service
.beacon_chain
.slot_clock
.duration_to_slot(subscription_slot)
.unwrap()
.as_millis() as u64
/ SLOT_DURATION_MILLIS;
.unwrap();

let no_events = dbg!(get_events(&mut subnet_service, None, wait_slots as u32).await);
let no_events =
dbg!(get_events_until_timeout(&mut subnet_service, None, wait_duration).await);

assert_eq!(no_events, []);

let subscription_end_slot = current_slot + subscription_slot2 + 2; // +1 to get to the end of the duty slot, +1 for the slot to complete
let wait_slots = subnet_service
let wait_duration = subnet_service
.beacon_chain
.slot_clock
.duration_to_slot(subscription_end_slot)
.unwrap()
.as_millis() as u64
/ SLOT_DURATION_MILLIS;
.unwrap();

let second_subscribe_event = get_events(&mut subnet_service, None, wait_slots as u32).await;
let second_subscribe_event =
get_events_until_timeout(&mut subnet_service, None, wait_duration).await;
// If the permanent and short lived subnets are different, we should get an unsubscription event.
if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) {
assert_eq!(
Expand All @@ -612,28 +619,26 @@ mod test {

let subscription_slot = current_slot + subscription_slot3 - 1;

let wait_slots = subnet_service
let wait_duration = subnet_service
.beacon_chain
.slot_clock
.duration_to_slot(subscription_slot)
.unwrap()
.as_millis() as u64
/ SLOT_DURATION_MILLIS;
.unwrap();

let no_events = dbg!(get_events(&mut subnet_service, None, wait_slots as u32).await);
let no_events =
dbg!(get_events_until_timeout(&mut subnet_service, None, wait_duration).await);

assert_eq!(no_events, []);

let subscription_end_slot = current_slot + subscription_slot3 + 2; // +1 to get to the end of the duty slot, +1 for the slot to complete
let wait_slots = subnet_service
let wait_duration = subnet_service
.beacon_chain
.slot_clock
.duration_to_slot(subscription_end_slot)
.unwrap()
.as_millis() as u64
/ SLOT_DURATION_MILLIS;
.unwrap();

let third_subscribe_event = get_events(&mut subnet_service, None, wait_slots as u32).await;
let third_subscribe_event =
get_events_until_timeout(&mut subnet_service, None, wait_duration).await;

if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) {
assert_eq!(
Expand All @@ -652,7 +657,7 @@ mod test {

// create the attestation service and subscriptions
let mut subnet_service = get_subnet_service();
let _events = get_events(&mut subnet_service, None, 0).await;
let _events = get_events_until_num_slots(&mut subnet_service, None, 0).await;

let subscriptions =
std::iter::once(Subscription::SyncCommittee(SyncCommitteeSubscription {
Expand All @@ -673,7 +678,7 @@ mod test {
let subnet_id = subnet_ids.iter().next().unwrap();

// Note: the unsubscription event takes 2 epochs (8 * 2 * 0.4 secs = 3.2 secs)
let events = get_events(
let events = get_events_until_num_slots(
&mut subnet_service,
Some(5),
(MainnetEthSpec::slots_per_epoch() * 3) as u32, // Have some buffer time before getting 5 events
Expand Down Expand Up @@ -709,7 +714,7 @@ mod test {
// create the attestation service and subscriptions
let mut subnet_service = get_subnet_service();
// Get the initial events from permanent subnet subscriptions
let _events = get_events(&mut subnet_service, None, 1).await;
let _events = get_events_until_num_slots(&mut subnet_service, None, 1).await;

let subscriptions =
std::iter::once(Subscription::SyncCommittee(SyncCommitteeSubscription {
Expand All @@ -722,7 +727,7 @@ mod test {
subnet_service.validator_subscriptions(subscriptions);

// Get all immediate events (won't include unsubscriptions)
let events = get_events(&mut subnet_service, None, 1).await;
let events = get_events_until_num_slots(&mut subnet_service, None, 1).await;
matches::assert_matches!(
events[..],
[
Expand Down Expand Up @@ -752,7 +757,7 @@ mod test {
subnet_service.validator_subscriptions(subscriptions.into_iter());

// Get all immediate events (won't include unsubscriptions)
let events = get_events(&mut subnet_service, None, 1).await;
let events = get_events_until_num_slots(&mut subnet_service, None, 1).await;
matches::assert_matches!(events[..], [SubnetServiceMessage::DiscoverPeers(_),]);

// Should be unsubscribed at the end.
Expand Down
Loading