2121import it .unimi .dsi .fastutil .ints .IntSet ;
2222import java .util .Iterator ;
2323import java .util .Set ;
24+ import java .util .concurrent .atomic .AtomicReference ;
2425import org .apache .logging .log4j .LogManager ;
2526import org .apache .logging .log4j .Logger ;
2627import tech .pegasys .teku .ethereum .events .SlotEventsChannel ;
@@ -39,6 +40,7 @@ public class AttestationTopicSubscriber implements SlotEventsChannel {
3940 private final Eth2P2PNetwork eth2P2PNetwork ;
4041 private final Spec spec ;
4142 private final SettableLabelledGauge subnetSubscriptionsGauge ;
43+ private final AtomicReference <UInt64 > currentSlot = new AtomicReference <>(null );
4244
4345 public AttestationTopicSubscriber (
4446 final Spec spec ,
@@ -56,6 +58,15 @@ public synchronized void subscribeToCommitteeForAggregation(
5658 aggregationSlot , UInt64 .valueOf (committeeIndex ), committeesAtSlot );
5759 final UInt64 currentUnsubscriptionSlot = subnetIdToUnsubscribeSlot .getOrDefault (subnetId , ZERO );
5860 final UInt64 unsubscribeSlot = currentUnsubscriptionSlot .max (aggregationSlot );
61+ final UInt64 maybeCurrentSlot = currentSlot .get ();
62+ if (maybeCurrentSlot != null && unsubscribeSlot .isLessThan (maybeCurrentSlot )) {
63+ LOG .trace (
64+ "Skipping outdated aggregation subnet {} with unsubscribe due at slot {}" ,
65+ subnetId ,
66+ unsubscribeSlot );
67+ return ;
68+ }
69+
5970 if (currentUnsubscriptionSlot .equals (ZERO )) {
6071 eth2P2PNetwork .subscribeToAttestationSubnetId (subnetId );
6172 toggleAggregateSubscriptionMetric (subnetId , false );
@@ -96,15 +107,25 @@ public synchronized void subscribeToPersistentSubnets(
96107 boolean shouldUpdateENR = false ;
97108
98109 for (SubnetSubscription subnetSubscription : newSubscriptions ) {
99- int subnetId = subnetSubscription .subnetId ();
110+ final int subnetId = subnetSubscription .subnetId ();
111+ final UInt64 maybeCurrentSlot = currentSlot .get ();
112+ if (maybeCurrentSlot != null
113+ && subnetSubscription .unsubscriptionSlot ().isLessThan (maybeCurrentSlot )) {
114+ LOG .trace (
115+ "Skipping outdated persistent subnet {} with unsubscribe due at slot {}" ,
116+ subnetId ,
117+ subnetSubscription .unsubscriptionSlot ());
118+ continue ;
119+ }
120+
100121 shouldUpdateENR = persistentSubnetIdSet .add (subnetId ) || shouldUpdateENR ;
101122 LOG .trace (
102123 "Subscribing to persistent subnet {} with unsubscribe due at slot {}" ,
103124 subnetId ,
104125 subnetSubscription .unsubscriptionSlot ());
105126 if (subnetIdToUnsubscribeSlot .containsKey (subnetId )) {
106- UInt64 existingUnsubscriptionSlot = subnetIdToUnsubscribeSlot .get (subnetId );
107- UInt64 unsubscriptionSlot =
127+ final UInt64 existingUnsubscriptionSlot = subnetIdToUnsubscribeSlot .get (subnetId );
128+ final UInt64 unsubscriptionSlot =
108129 existingUnsubscriptionSlot .max (subnetSubscription .unsubscriptionSlot ());
109130 LOG .trace (
110131 "Already subscribed to subnet {}, updating unsubscription slot to {}" ,
@@ -127,14 +148,15 @@ public synchronized void subscribeToPersistentSubnets(
127148
128149 @ Override
129150 public synchronized void onSlot (final UInt64 slot ) {
151+ currentSlot .set (slot );
130152 boolean shouldUpdateENR = false ;
131153
132154 final Iterator <Int2ObjectMap .Entry <UInt64 >> iterator =
133155 subnetIdToUnsubscribeSlot .int2ObjectEntrySet ().iterator ();
134156 while (iterator .hasNext ()) {
135157 final Int2ObjectMap .Entry <UInt64 > entry = iterator .next ();
136158 if (entry .getValue ().compareTo (slot ) < 0 ) {
137- int subnetId = entry .getIntKey ();
159+ final int subnetId = entry .getIntKey ();
138160 LOG .trace ("Unsubscribing from subnet {}" , subnetId );
139161 eth2P2PNetwork .unsubscribeFromAttestationSubnetId (subnetId );
140162 if (persistentSubnetIdSet .contains (subnetId )) {
0 commit comments