From c3e9d9e91d3ae4179e103ffe2a91c99b84362164 Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Tue, 15 Oct 2024 13:36:21 +0400 Subject: [PATCH 1/4] isOptimisticHead flag should have a single source for GossipForkManager --- .../networking/eth2/ActiveEth2P2PNetwork.java | 3 +- .../eth2/gossip/forks/GossipForkManager.java | 28 +++++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java index 4851bf01133..6dd13b98f27 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java @@ -168,6 +168,8 @@ private synchronized void stopGossip() { @Override public void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic) { + gossipForkManager.onOptimisticHeadChanged(isOptimistic); + if (state.get() != State.RUNNING) { return; } @@ -176,7 +178,6 @@ public void onSyncStateChanged(final boolean isInSync, final boolean isOptimisti } else { stopGossip(); } - gossipForkManager.onOptimisticHeadChanged(isOptimistic); } @VisibleForTesting diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java index 52462fa0390..4077d1425b1 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; @@ -56,20 +57,23 @@ public class GossipForkManager { private static final Logger LOG = LogManager.getLogger(); private static final int EPOCHS_PRIOR_TO_FORK_TO_ACTIVATE = 2; private final Spec spec; - private final RecentChainData recentChainData; + private final Bytes32 genesisValidatorsRoot; private final NavigableMap forksByActivationEpoch; private final Set activeSubscriptions = new HashSet<>(); private final IntSet currentAttestationSubnets = new IntOpenHashSet(); private final IntSet currentSyncCommitteeSubnets = new IntOpenHashSet(); private Optional currentEpoch = Optional.empty(); + private boolean isHeadOptimistic; private GossipForkManager( final Spec spec, - final RecentChainData recentChainData, + final Bytes32 genesisValidatorsRoot, + final boolean isHeadOptimistic, final NavigableMap forksByActivationEpoch) { this.spec = spec; - this.recentChainData = recentChainData; + this.genesisValidatorsRoot = genesisValidatorsRoot; + this.isHeadOptimistic = isHeadOptimistic; this.forksByActivationEpoch = forksByActivationEpoch; } @@ -141,14 +145,12 @@ public synchronized void stopGossip() { } public synchronized void onOptimisticHeadChanged(final boolean isHeadOptimistic) { + this.isHeadOptimistic = isHeadOptimistic; if (isHeadOptimistic) { activeSubscriptions.forEach(GossipForkSubscriptions::stopGossipForOptimisticSync); } else { activeSubscriptions.forEach( - subscriptions -> - subscriptions.startGossip( - recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot(), - false)); + subscriptions -> subscriptions.startGossip(genesisValidatorsRoot, false)); } } @@ -275,11 +277,9 @@ private boolean isActive(final GossipForkSubscriptions subscriptions) { return activeSubscriptions.contains(subscriptions); } - private void startSubscriptions(final GossipForkSubscriptions subscription) { + private synchronized void startSubscriptions(final GossipForkSubscriptions subscription) { if (activeSubscriptions.add(subscription)) { - subscription.startGossip( - recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot(), - recentChainData.isChainHeadOptimistic()); + subscription.startGossip(genesisValidatorsRoot, isHeadOptimistic); currentAttestationSubnets.forEach(subscription::subscribeToAttestationSubnetId); currentSyncCommitteeSubnets.forEach(subscription::subscribeToSyncCommitteeSubnet); } @@ -330,7 +330,11 @@ public GossipForkManager build() { checkNotNull(spec, "Must supply spec"); checkNotNull(recentChainData, "Must supply recentChainData"); checkState(!forksByActivationEpoch.isEmpty(), "Must specify at least one fork"); - return new GossipForkManager(spec, recentChainData, forksByActivationEpoch); + return new GossipForkManager( + spec, + recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot(), + recentChainData.isChainHeadOptimistic(), + forksByActivationEpoch); } } } From 881f39c3be846f07601926a719ee88c06ed610bf Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Tue, 15 Oct 2024 20:12:56 +0400 Subject: [PATCH 2/4] Remove obsolete synchronized --- .../teku/networking/eth2/gossip/forks/GossipForkManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java index 4077d1425b1..5ad99287e46 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java @@ -277,7 +277,7 @@ private boolean isActive(final GossipForkSubscriptions subscriptions) { return activeSubscriptions.contains(subscriptions); } - private synchronized void startSubscriptions(final GossipForkSubscriptions subscription) { + private void startSubscriptions(final GossipForkSubscriptions subscription) { if (activeSubscriptions.add(subscription)) { subscription.startGossip(genesisValidatorsRoot, isHeadOptimistic); currentAttestationSubnets.forEach(subscription::subscribeToAttestationSubnetId); From eba5f2ab8722d35b652f288065c2fcd69f3dda76 Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Tue, 15 Oct 2024 20:27:21 +0400 Subject: [PATCH 3/4] Add unit test --- .../eth2/gossip/forks/GossipForkManagerTest.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java index f4460d3fe01..06a7cd3c39b 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java @@ -568,6 +568,19 @@ void shouldStartSubscriptionsInOptimisticSyncMode() { verify(subscriptions).startGossip(GENESIS_VALIDATORS_ROOT, true); } + @Test + void shouldStartSubscriptionsInNonOptimisticSyncModeWhenSyncStateChangedBeforeStart() { + when(recentChainData.isChainHeadOptimistic()).thenReturn(true); + + final GossipForkSubscriptions subscriptions = forkAtEpoch(0); + final GossipForkManager manager = managerForForks(subscriptions); + + manager.onOptimisticHeadChanged(false); + manager.configureGossipForEpoch(UInt64.ZERO); + + verify(subscriptions).startGossip(GENESIS_VALIDATORS_ROOT, false); + } + private GossipForkSubscriptions forkAtEpoch(final long epoch) { final GossipForkSubscriptions subscriptions = mock(GossipForkSubscriptions.class, "subscriptionsForEpoch" + epoch); From 95708f94300a2aa624a05f32e7edff98e0edf983 Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Tue, 15 Oct 2024 20:53:53 +0400 Subject: [PATCH 4/4] Resolve merge issue --- .../eth2/gossip/forks/GossipForkManager.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java index 5ad99287e46..3220469395c 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java @@ -29,7 +29,6 @@ import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; @@ -57,7 +56,7 @@ public class GossipForkManager { private static final Logger LOG = LogManager.getLogger(); private static final int EPOCHS_PRIOR_TO_FORK_TO_ACTIVATE = 2; private final Spec spec; - private final Bytes32 genesisValidatorsRoot; + private final RecentChainData recentChainData; private final NavigableMap forksByActivationEpoch; private final Set activeSubscriptions = new HashSet<>(); private final IntSet currentAttestationSubnets = new IntOpenHashSet(); @@ -68,13 +67,12 @@ public class GossipForkManager { private GossipForkManager( final Spec spec, - final Bytes32 genesisValidatorsRoot, - final boolean isHeadOptimistic, + final RecentChainData recentChainData, final NavigableMap forksByActivationEpoch) { this.spec = spec; - this.genesisValidatorsRoot = genesisValidatorsRoot; - this.isHeadOptimistic = isHeadOptimistic; + this.recentChainData = recentChainData; this.forksByActivationEpoch = forksByActivationEpoch; + this.isHeadOptimistic = recentChainData.isChainHeadOptimistic(); } public static GossipForkManager.Builder builder() { @@ -150,7 +148,10 @@ public synchronized void onOptimisticHeadChanged(final boolean isHeadOptimistic) activeSubscriptions.forEach(GossipForkSubscriptions::stopGossipForOptimisticSync); } else { activeSubscriptions.forEach( - subscriptions -> subscriptions.startGossip(genesisValidatorsRoot, false)); + subscriptions -> + subscriptions.startGossip( + recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot(), + false)); } } @@ -279,7 +280,9 @@ private boolean isActive(final GossipForkSubscriptions subscriptions) { private void startSubscriptions(final GossipForkSubscriptions subscription) { if (activeSubscriptions.add(subscription)) { - subscription.startGossip(genesisValidatorsRoot, isHeadOptimistic); + subscription.startGossip( + recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot(), + isHeadOptimistic); currentAttestationSubnets.forEach(subscription::subscribeToAttestationSubnetId); currentSyncCommitteeSubnets.forEach(subscription::subscribeToSyncCommitteeSubnet); } @@ -330,11 +333,7 @@ public GossipForkManager build() { checkNotNull(spec, "Must supply spec"); checkNotNull(recentChainData, "Must supply recentChainData"); checkState(!forksByActivationEpoch.isEmpty(), "Must specify at least one fork"); - return new GossipForkManager( - spec, - recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot(), - recentChainData.isChainHeadOptimistic(), - forksByActivationEpoch); + return new GossipForkManager(spec, recentChainData, forksByActivationEpoch); } } }