diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/executionproofs/ExecutionProofManager.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/executionproofs/ExecutionProofManager.java new file mode 100644 index 00000000000..14d3f745f04 --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/executionproofs/ExecutionProofManager.java @@ -0,0 +1,53 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.executionproofs; + +import java.util.Optional; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; +import tech.pegasys.teku.statetransition.blobs.RemoteOrigin; +import tech.pegasys.teku.statetransition.validation.InternalValidationResult; + +public interface ExecutionProofManager { + + ExecutionProofManager NOOP = + new ExecutionProofManager() { + @Override + public SafeFuture onExecutionProofGossip( + ExecutionProof executionProof, Optional arrivalTimestamp) { + return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); + } + + @Override + public void onExecutionProofPublish( + final ExecutionProof executionProof, final RemoteOrigin remoteOrigin) {} + + @Override + public void subscribeToValidExecutionProofs( + final ValidExecutionProofListener sidecarsListener) {} + }; + + void onExecutionProofPublish(ExecutionProof executionProof, RemoteOrigin remoteOrigin); + + SafeFuture onExecutionProofGossip( + ExecutionProof executionProof, Optional arrivalTimestamp); + + void subscribeToValidExecutionProofs( + ExecutionProofManager.ValidExecutionProofListener executionProofListener); + + interface ValidExecutionProofListener { + void onNewValidExecutionProof(ExecutionProof executionProof, RemoteOrigin remoteOrigin); + } +} diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/executionproofs/ExecutionProofManagerImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/executionproofs/ExecutionProofManagerImpl.java new file mode 100644 index 00000000000..29be05353dc --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/executionproofs/ExecutionProofManagerImpl.java @@ -0,0 +1,61 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.executionproofs; + +import java.util.Optional; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.subscribers.Subscribers; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; +import tech.pegasys.teku.statetransition.blobs.RemoteOrigin; +import tech.pegasys.teku.statetransition.validation.ExecutionProofGossipValidator; +import tech.pegasys.teku.statetransition.validation.InternalValidationResult; + +public class ExecutionProofManagerImpl implements ExecutionProofManager { + + final ExecutionProofGossipValidator executionProofGossipValidator; + private final Subscribers receivedExecutionProofSubscribers = + Subscribers.create(true); + + private static final Logger LOG = LogManager.getLogger(); + + public ExecutionProofManagerImpl( + final ExecutionProofGossipValidator executionProofGossipValidator) { + this.executionProofGossipValidator = executionProofGossipValidator; + } + + @Override + public void onExecutionProofPublish( + final ExecutionProof executionProof, final RemoteOrigin remoteOrigin) { + LOG.trace("Published execution proof {}", executionProof); + } + + @Override + public SafeFuture onExecutionProofGossip( + final ExecutionProof executionProof, final Optional arrivalTimestamp) { + LOG.debug("Received execution proof for block {}", executionProof); + // TODO fix this as this needs to obtain subnetId from the topic we received the proof from? + // this is probably a validation that only makes sense for the stub we're doing + return executionProofGossipValidator.validate( + executionProof, executionProof.getSubnetId().get()); + } + + @Override + public void subscribeToValidExecutionProofs( + final ValidExecutionProofListener executionProofListener) { + receivedExecutionProofSubscribers.subscribe(executionProofListener); + } +} diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/ExecutionProofGossipValidator.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/ExecutionProofGossipValidator.java new file mode 100644 index 00000000000..89723ccddc2 --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/ExecutionProofGossipValidator.java @@ -0,0 +1,68 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.validation; + +import static tech.pegasys.teku.spec.config.Constants.MAX_EXECUTION_PROOF_SUBNETS; + +import java.util.Set; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.collections.LimitedSet; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; + +public class ExecutionProofGossipValidator { + private static final Logger LOG = LogManager.getLogger(); + + private final Set receivedValidExecutionProofSet; + + public static ExecutionProofGossipValidator create() { + return new ExecutionProofGossipValidator( + // max subnets * 2 epochs * slots per epoch 32 based on mainnet for now + LimitedSet.createSynchronized(MAX_EXECUTION_PROOF_SUBNETS.intValue() * 64)); + } + + public ExecutionProofGossipValidator(final Set receivedValidExecutionProofSet) { + + this.receivedValidExecutionProofSet = receivedValidExecutionProofSet; + } + + public SafeFuture validate( + final ExecutionProof executionProof, final UInt64 subnetId) { + + // TODO need to check for other validations done in the prototype and spec + if (!executionProof.getSubnetId().get().equals(subnetId)) { + LOG.trace( + "ExecutionProof for block root {} / block hash {} does not match the gossip subnetId", + executionProof.getBlockRoot(), + executionProof.getBlockHash()); + return SafeFuture.completedFuture(InternalValidationResult.reject("SubnetId mismatch")); + } + + // Already seen and valid + if (receivedValidExecutionProofSet.contains(executionProof)) { + LOG.trace("Received duplicate execution proof {}", executionProof); + return SafeFuture.completedFuture(InternalValidationResult.IGNORE); + } + + // Validated the execution proof + LOG.trace( + "Received and validated execution proof for block root {}, block hash {}", + executionProof.getBlockRoot(), + executionProof.getBlockHash()); + receivedValidExecutionProofSet.add(executionProof); + return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); + } +} diff --git a/networking/eth2/build.gradle b/networking/eth2/build.gradle index 13f9dad2d2f..7004af275b4 100644 --- a/networking/eth2/build.gradle +++ b/networking/eth2/build.gradle @@ -15,6 +15,7 @@ dependencies { implementation project(':infrastructure:ssz') implementation project(':storage') implementation project(':storage:api') + implementation project(':services:zkchain') implementation project(':infrastructure:serviceutils') implementation 'io.libp2p:jvm-libp2p' diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java index f9617f7940e..7fbcaa62107 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java @@ -30,6 +30,7 @@ import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.DataColumnSidecarGossipChannel; +import tech.pegasys.teku.networking.eth2.gossip.ExecutionProofGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.config.Eth2Context; import tech.pegasys.teku.networking.eth2.gossip.config.GossipConfigurator; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; @@ -72,6 +73,7 @@ public class ActiveEth2P2PNetwork extends DelegatingP2PNetwork impleme private final SubnetSubscriptionService attestationSubnetService; private final SubnetSubscriptionService syncCommitteeSubnetService; private final SubnetSubscriptionService dataColumnSidecarSubnetService; + private final SubnetSubscriptionService executionProofSubnetService; private final ProcessedAttestationSubscriptionProvider processedAttestationSubscriptionProvider; private final AtomicBoolean gossipStarted = new AtomicBoolean(false); private final int dasTotalCustodySubnetCount; @@ -97,6 +99,7 @@ public ActiveEth2P2PNetwork( final SubnetSubscriptionService attestationSubnetService, final SubnetSubscriptionService syncCommitteeSubnetService, final SubnetSubscriptionService dataColumnSidecarSubnetService, + final SubnetSubscriptionService executionProofSubnetService, final GossipEncoding gossipEncoding, final GossipConfigurator gossipConfigurator, final ProcessedAttestationSubscriptionProvider processedAttestationSubscriptionProvider, @@ -115,6 +118,7 @@ public ActiveEth2P2PNetwork( this.attestationSubnetService = attestationSubnetService; this.syncCommitteeSubnetService = syncCommitteeSubnetService; this.dataColumnSidecarSubnetService = dataColumnSidecarSubnetService; + this.executionProofSubnetService = executionProofSubnetService; this.processedAttestationSubscriptionProvider = processedAttestationSubscriptionProvider; this.dasTotalCustodySubnetCount = dasTotalCustodySubnetCount; this.allTopicsFilterEnabled = allTopicsFilterEnabled; @@ -144,6 +148,9 @@ private synchronized void startup() { eventChannels.subscribe( DataColumnSidecarGossipChannel.class, (sidecar, __) -> gossipForkManager.publishDataColumnSidecar(sidecar)); + eventChannels.subscribe( + ExecutionProofGossipChannel.class, gossipForkManager::publishExecutionProof); + if (recentChainData.isCloseToInSync()) { startGossip(); } @@ -361,6 +368,18 @@ public void unsubscribeFromDataColumnSidecarSubnetId(final int subnetId) { dataColumnSidecarSubnetService.removeSubscription(subnetId); } + @Override + public void subscribeToExecutionProofSubnetId(final int subnetId) { + gossipForkManager.subscribeToExecutionProofSubnetId(subnetId); + executionProofSubnetService.addSubscription(subnetId); + } + + @Override + public void unsubscribeFromExecutionProofSubnetId(final int subnetId) { + gossipForkManager.unsubscribeFromExecutionProofSubnetId(subnetId); + executionProofSubnetService.removeSubscription(subnetId); + } + @Override public MetadataMessage getMetadata() { return peerManager.getMetadataMessage(); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java index 8d1b926bb79..33fb8b9ea10 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java @@ -44,6 +44,10 @@ public interface Eth2P2PNetwork extends P2PNetwork { void unsubscribeFromDataColumnSidecarSubnetId(int subnetId); + void subscribeToExecutionProofSubnetId(int subnetId); + + void unsubscribeFromExecutionProofSubnetId(int subnetId); + MetadataMessage getMetadata(); void publishSyncCommitteeMessage(ValidatableSyncCommitteeMessage message); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java index b707c66fc89..0700f3ad873 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java @@ -88,6 +88,7 @@ import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.PayloadAttestationMessage; import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.SignedExecutionPayloadBid; import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.SignedExecutionPayloadEnvelope; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange; @@ -152,6 +153,7 @@ public class Eth2P2PNetworkBuilder { protected OperationProcessor gossipedSyncCommitteeMessageProcessor; protected OperationProcessor dataColumnSidecarOperationProcessor; + protected OperationProcessor executionProofOperationProcessor; protected StatusMessageFactory statusMessageFactory; protected KZG kzg; protected boolean recordMessageArrival; @@ -173,6 +175,7 @@ public Eth2P2PNetwork build() { final SubnetSubscriptionService syncCommitteeSubnetService = new SubnetSubscriptionService(); final SubnetSubscriptionService dataColumnSidecarSubnetService = new SubnetSubscriptionService(); + final SubnetSubscriptionService executionProofSubnetService = new SubnetSubscriptionService(); final DiscoveryNodeIdExtractor discoveryNodeIdExtractor = new LibP2PDiscoveryNodeIdExtractor(); final RpcEncoding rpcEncoding = RpcEncoding.createSszSnappyEncoding(spec.getNetworkingConfig().getMaxPayloadSize()); @@ -236,6 +239,7 @@ public Eth2P2PNetwork build() { attestationSubnetService, syncCommitteeSubnetService, dataColumnSidecarSubnetService, + executionProofSubnetService, gossipEncoding, config.getGossipConfigurator(), processedAttestationSubscriptionProvider, @@ -385,7 +389,9 @@ private GossipForkSubscriptions createSubscriptions( gossipedSignedContributionAndProofProcessor, gossipedSyncCommitteeMessageProcessor, gossipedSignedBlsToExecutionChangeProcessor, - debugDataDumper); + debugDataDumper, + executionProofOperationProcessor, + config.isExecutionProofTopicEnabled()); case FULU -> new GossipForkSubscriptionsFulu( forkAndSpecMilestone.getFork(), @@ -407,7 +413,9 @@ private GossipForkSubscriptions createSubscriptions( gossipedSignedBlsToExecutionChangeProcessor, dataColumnSidecarOperationProcessor, debugDataDumper, - dasGossipLogger); + dasGossipLogger, + executionProofOperationProcessor, + config.isExecutionProofTopicEnabled()); case GLOAS -> new GossipForkSubscriptionsGloas( forkAndSpecMilestone.getFork(), @@ -432,7 +440,9 @@ private GossipForkSubscriptions createSubscriptions( payloadAttestationMessageProcessor, executionPayloadBidProcessor, debugDataDumper, - dasGossipLogger); + dasGossipLogger, + executionProofOperationProcessor, + config.isExecutionProofTopicEnabled()); }; } @@ -462,9 +472,11 @@ private GossipForkSubscriptions createBpoSubscriptions( gossipedSyncCommitteeMessageProcessor, gossipedSignedBlsToExecutionChangeProcessor, dataColumnSidecarOperationProcessor, + executionProofOperationProcessor, debugDataDumper, dasGossipLogger, - bpo); + bpo, + config.isExecutionProofTopicEnabled()); case GLOAS -> new GossipForkSubscriptionsGloasBpo( forkAndSpecMilestone.getFork(), @@ -485,12 +497,14 @@ private GossipForkSubscriptions createBpoSubscriptions( gossipedSyncCommitteeMessageProcessor, gossipedSignedBlsToExecutionChangeProcessor, dataColumnSidecarOperationProcessor, + executionProofOperationProcessor, executionPayloadProcessor, payloadAttestationMessageProcessor, executionPayloadBidProcessor, debugDataDumper, dasGossipLogger, - bpo); + bpo, + config.isExecutionProofTopicEnabled()); default -> throw new IllegalStateException( "BPO is not supported for: " + forkAndSpecMilestone.getSpecMilestone()); @@ -510,7 +524,7 @@ protected DiscoveryNetwork buildNetwork( combinedChainDataClient.getRecentChainData()::getMilestoneByForkDigest); final GossipTopicFilter gossipTopicsFilter = new Eth2GossipTopicFilter( - combinedChainDataClient.getRecentChainData(), gossipEncoding, spec); + combinedChainDataClient.getRecentChainData(), gossipEncoding, spec, config); final NetworkConfig networkConfig = config.getNetworkConfig(); final DiscoveryConfig discoConfig = config.getDiscoveryConfig(); @@ -625,6 +639,7 @@ private void validate() { "gossipedDataColumnSidecarOperationProcessor", dataColumnSidecarOperationProcessor); assertNotNull("gossipedExecutionPayloadProcessor", executionPayloadProcessor); assertNotNull("gossipedPayloadAttestationMessageProcessor", payloadAttestationMessageProcessor); + assertNotNull("gossipedExecutionProofOperationProcessor", executionProofOperationProcessor); assertNotNull("gossipedExecutionPayloadBidProcessor", executionPayloadBidProcessor); } @@ -765,6 +780,13 @@ public Eth2P2PNetworkBuilder gossipedDataColumnSidecarOperationProcessor( return this; } + public Eth2P2PNetworkBuilder gossipedExecutionProofOperationProcessor( + final OperationProcessor executionProofOperationProcessor) { + checkNotNull(executionProofOperationProcessor); + this.executionProofOperationProcessor = executionProofOperationProcessor; + return this; + } + public Eth2P2PNetworkBuilder gossipedExecutionPayloadProcessor( final OperationProcessor gossipedExecutionPayloadProcessor) { checkNotNull(gossipedExecutionPayloadProcessor); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java index 14173c52d40..d5af5d4e7da 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java @@ -58,6 +58,7 @@ public class P2PConfig { // queries to 3 public static final int DEFAULT_HISTORICAL_DATA_MAX_CONCURRENT_QUERIES = 3; public static final int DEFAULT_HISTORICAL_MAX_QUERY_QUEUE_SIZE = 500; + public static final boolean DEFAULT_EXECUTION_PROOF_GOSSIP_ENABLED = false; private final Spec spec; private final NetworkConfig networkConfig; @@ -80,6 +81,7 @@ public class P2PConfig { private final boolean batchVerifyStrictThreadLimitEnabled; private final boolean isGossipBlobsAfterBlockEnabled; private final boolean allTopicsFilterEnabled; + private final boolean executionProofTopicEnabled; private P2PConfig( final Spec spec, @@ -100,7 +102,8 @@ private P2PConfig( final int batchVerifyMaxBatchSize, final boolean batchVerifyStrictThreadLimitEnabled, final boolean allTopicsFilterEnabled, - final boolean isGossipBlobsAfterBlockEnabled) { + final boolean isGossipBlobsAfterBlockEnabled, + final boolean executionProofTopicEnabled) { this.spec = spec; this.networkConfig = networkConfig; this.discoveryConfig = discoveryConfig; @@ -121,6 +124,7 @@ private P2PConfig( this.networkingSpecConfig = spec.getNetworkingConfig(); this.allTopicsFilterEnabled = allTopicsFilterEnabled; this.isGossipBlobsAfterBlockEnabled = isGossipBlobsAfterBlockEnabled; + this.executionProofTopicEnabled = executionProofTopicEnabled; } public static Builder builder() { @@ -208,6 +212,10 @@ public boolean isAllTopicsFilterEnabled() { return allTopicsFilterEnabled; } + public boolean isExecutionProofTopicEnabled() { + return executionProofTopicEnabled; + } + public boolean isGossipBlobsAfterBlockEnabled() { return isGossipBlobsAfterBlockEnabled; } @@ -237,6 +245,7 @@ public static class Builder { private int floodPublishMaxMessageSizeThreshold = DEFAULT_FLOOD_PUBLISH_MAX_MESSAGE_SIZE_THRESHOLD; private boolean gossipBlobsAfterBlockEnabled = DEFAULT_GOSSIP_BLOBS_AFTER_BLOCK_ENABLED; + private boolean executionProofTopicEnabled = DEFAULT_EXECUTION_PROOF_GOSSIP_ENABLED; private Builder() {} @@ -292,7 +301,8 @@ public P2PConfig build() { batchVerifyMaxBatchSize, batchVerifyStrictThreadLimitEnabled, allTopicsFilterEnabled, - gossipBlobsAfterBlockEnabled); + gossipBlobsAfterBlockEnabled, + executionProofTopicEnabled); } private void validate() { @@ -445,5 +455,10 @@ public Builder allTopicsFilterEnabled(final boolean allTopicsFilterEnabled) { this.allTopicsFilterEnabled = allTopicsFilterEnabled; return this; } + + public Builder executionProofTopicEnabled(final boolean executionProofTopicEnabled) { + this.executionProofTopicEnabled = executionProofTopicEnabled; + return this; + } } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/ExecutionProofGossipChannel.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/ExecutionProofGossipChannel.java new file mode 100644 index 00000000000..15f4b34218f --- /dev/null +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/ExecutionProofGossipChannel.java @@ -0,0 +1,29 @@ +/* + * Copyright Consensys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.networking.eth2.gossip; + +import java.util.List; +import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; + +public interface ExecutionProofGossipChannel extends VoidReturningChannelInterface { + + ExecutionProofGossipChannel NOOP = executionProof -> {}; + + default void publishExecutionProofs(final List executionProofs) { + executionProofs.forEach(this::publishExecutionProof); + } + + void publishExecutionProof(ExecutionProof executionProof); +} diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/ExecutionProofGossipManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/ExecutionProofGossipManager.java new file mode 100644 index 00000000000..92d7cca593b --- /dev/null +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/ExecutionProofGossipManager.java @@ -0,0 +1,68 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.networking.eth2.gossip; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import tech.pegasys.teku.networking.eth2.gossip.subnets.ExecutionProofSubnetSubscriptions; +import tech.pegasys.teku.spec.config.Constants; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; + +public class ExecutionProofGossipManager implements GossipManager { + + private static final Logger LOG = LogManager.getLogger(); + + private final ExecutionProofSubnetSubscriptions executionProofSubnetSubscriptions; + + public ExecutionProofGossipManager( + final ExecutionProofSubnetSubscriptions executionProofSubnetSubscriptions) { + this.executionProofSubnetSubscriptions = executionProofSubnetSubscriptions; + + for (int i = 0; i < Constants.MAX_EXECUTION_PROOF_SUBNETS.intValue(); i++) { + executionProofSubnetSubscriptions.subscribeToSubnetId(i); + } + } + + @Override + public void subscribe() { + executionProofSubnetSubscriptions.subscribe(); + } + + @Override + public void unsubscribe() { + executionProofSubnetSubscriptions.unsubscribe(); + } + + @Override + public boolean isEnabledDuringOptimisticSync() { + return true; + } + + public void subscribeToSubnetId(final int subnetId) { + executionProofSubnetSubscriptions.subscribeToSubnetId(subnetId); + } + + public void unsubscribeFromSubnetId(final int subnetId) { + executionProofSubnetSubscriptions.unsubscribeFromSubnetId(subnetId); + } + + public void publish(final ExecutionProof executionProof) { + executionProofSubnetSubscriptions + .gossip(executionProof) + .finish( + __ -> LOG.trace("{} published successfully", executionProof), + error -> + LOG.trace("Failed to publish {}, error: {}", executionProof, error.getMessage())); + } +} diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java index 6e7523b0976..5ebff6ee844 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java @@ -38,6 +38,7 @@ import tech.pegasys.teku.spec.datastructures.blobs.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange; @@ -65,6 +66,7 @@ public class GossipForkManager { private final IntSet currentAttestationSubnets = new IntOpenHashSet(); private final IntSet currentSyncCommitteeSubnets = new IntOpenHashSet(); private final IntSet currentDataColumnSidecarSubnets = new IntOpenHashSet(); + private final IntSet currentExecutionProofSubnets = new IntOpenHashSet(); private Optional currentEpoch = Optional.empty(); private boolean isHeadOptimistic; @@ -188,6 +190,15 @@ public synchronized void publishDataColumnSidecar(final DataColumnSidecar dataCo GossipForkSubscriptions::publishDataColumnSidecar); } + public void publishExecutionProof(final ExecutionProof executionProof) { + // for now we don't have a slot in the message data (execution proof) to use + // I believe it's safe to just check the current epoch + // TODO: talk to Kev and see if it makes sense to include the slot in the message + UInt64 slot = spec.computeStartSlotAtEpoch(spec.getCurrentEpoch(recentChainData.getStore())); + publishMessage( + slot, executionProof, "execution proof", GossipForkSubscriptions::publishExecutionProof); + } + public void publishSyncCommitteeMessage(final ValidatableSyncCommitteeMessage message) { publishMessage( message.getSlot(), @@ -317,6 +328,20 @@ public void unsubscribeFromDataColumnSidecarSubnetId(final int subnetId) { } } + public void subscribeToExecutionProofSubnetId(final int subnetId) { + if (currentExecutionProofSubnets.add(subnetId)) { + activeSubscriptions.forEach( + subscription -> subscription.subscribeToExecutionProofSubnet(subnetId)); + } + } + + public void unsubscribeFromExecutionProofSubnetId(final int subnetId) { + if (currentExecutionProofSubnets.remove(subnetId)) { + activeSubscriptions.forEach( + subscription -> subscription.unsubscribeFromExecutionProofSubnet(subnetId)); + } + } + private boolean isActive(final GossipForkSubscriptions subscriptions) { return activeSubscriptions.contains(subscriptions); } @@ -329,6 +354,7 @@ private void startSubscriptions(final GossipForkSubscriptions subscription) { currentAttestationSubnets.forEach(subscription::subscribeToAttestationSubnetId); currentSyncCommitteeSubnets.forEach(subscription::subscribeToSyncCommitteeSubnet); currentDataColumnSidecarSubnets.forEach(subscription::subscribeToDataColumnSidecarSubnet); + currentExecutionProofSubnets.forEach(subscription::subscribeToExecutionProofSubnet); } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java index 6235e9e84c8..42fec34ddca 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java @@ -23,6 +23,7 @@ import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.PayloadAttestationMessage; import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.SignedExecutionPayloadBid; import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.SignedExecutionPayloadEnvelope; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange; @@ -48,6 +49,10 @@ default SafeFuture publishBlobSidecar(final BlobSidecar blobSidecar) { return SafeFuture.COMPLETE; } + default void publishExecutionProof(final ExecutionProof executionProof) { + // since Electra for now + } + void subscribeToAttestationSubnetId(int subnetId); void unsubscribeFromAttestationSubnetId(int subnetId); @@ -92,6 +97,14 @@ default void publishExecutionPayload(final SignedExecutionPayloadEnvelope messag // since Gloas } + default void subscribeToExecutionProofSubnet(final int subnetId) { + // since Electra + } + + default void unsubscribeFromExecutionProofSubnet(final int subnetId) { + // since Electra + } + default void publishPayloadAttestationMessage(final PayloadAttestationMessage message) { // since Gloas } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsElectra.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsElectra.java index 72a91a1dabc..7dbee4eb244 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsElectra.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsElectra.java @@ -13,15 +13,22 @@ package tech.pegasys.teku.networking.eth2.gossip.forks.versions; +import java.util.Optional; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.hyperledger.besu.plugin.services.MetricsSystem; import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.bytes.Bytes4; +import tech.pegasys.teku.networking.eth2.gossip.ExecutionProofGossipManager; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; +import tech.pegasys.teku.networking.eth2.gossip.subnets.ExecutionProofSubnetSubscriptions; import tech.pegasys.teku.networking.eth2.gossip.topics.OperationProcessor; import tech.pegasys.teku.networking.p2p.discovery.DiscoveryNetwork; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange; @@ -29,11 +36,18 @@ import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.ValidatableSyncCommitteeMessage; import tech.pegasys.teku.spec.datastructures.state.Fork; +import tech.pegasys.teku.spec.datastructures.state.ForkInfo; import tech.pegasys.teku.statetransition.util.DebugDataDumper; import tech.pegasys.teku.storage.client.RecentChainData; public class GossipForkSubscriptionsElectra extends GossipForkSubscriptionsDeneb { + final OperationProcessor executionProofOperationProcessor; + + private Optional executionProofGossipManager; + private final boolean isExecutionProofTopicEnabled; + private static final Logger LOG = LogManager.getLogger(); + public GossipForkSubscriptionsElectra( final Fork fork, final Spec spec, @@ -55,7 +69,9 @@ public GossipForkSubscriptionsElectra( syncCommitteeMessageOperationProcessor, final OperationProcessor signedBlsToExecutionChangeOperationProcessor, - final DebugDataDumper debugDataDumper) { + final DebugDataDumper debugDataDumper, + final OperationProcessor executionProofOperationProcessor, + final boolean isExecutionProofTopicEnabled) { super( fork, spec, @@ -75,5 +91,55 @@ public GossipForkSubscriptionsElectra( syncCommitteeMessageOperationProcessor, signedBlsToExecutionChangeOperationProcessor, debugDataDumper); + this.executionProofOperationProcessor = executionProofOperationProcessor; + this.isExecutionProofTopicEnabled = isExecutionProofTopicEnabled; + } + + @Override + protected void addGossipManagers(final ForkInfo forkInfo, final Bytes4 forkDigest) { + super.addGossipManagers(forkInfo, forkDigest); + addExecutionProofGossipManager(forkInfo, forkDigest); + } + + private void addExecutionProofGossipManager(final ForkInfo forkInfo, final Bytes4 forkDigest) { + if (isExecutionProofTopicEnabled) { + LOG.debug("Creating ExecutionProofSubnetSubscriptions"); + ExecutionProofSubnetSubscriptions executionProofSubnetSubscriptions = + new ExecutionProofSubnetSubscriptions( + spec, + asyncRunner, + discoveryNetwork, + gossipEncoding, + recentChainData, + executionProofOperationProcessor, + debugDataDumper, + forkInfo, + forkDigest); + + executionProofGossipManager = + Optional.of(new ExecutionProofGossipManager(executionProofSubnetSubscriptions)); + addGossipManager(executionProofGossipManager.get()); + } else { + LOG.debug("Using ExecutionProofGossipManager.NOOP"); + executionProofGossipManager = Optional.empty(); + } + } + + @Override + public void publishExecutionProof(final ExecutionProof executionProof) { + executionProofGossipManager.ifPresent( + epGossipManager -> epGossipManager.publish(executionProof)); + } + + @Override + public void subscribeToExecutionProofSubnet(final int subnetId) { + executionProofGossipManager.ifPresent( + epGossipManager -> epGossipManager.subscribeToSubnetId(subnetId)); + } + + @Override + public void unsubscribeFromExecutionProofSubnet(final int subnetId) { + executionProofGossipManager.ifPresent( + epGossipManager -> epGossipManager.unsubscribeFromSubnetId(subnetId)); } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsFulu.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsFulu.java index 01a68bf07c9..4b6bf7bc5cf 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsFulu.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsFulu.java @@ -27,6 +27,7 @@ import tech.pegasys.teku.spec.datastructures.blobs.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange; @@ -68,7 +69,9 @@ public GossipForkSubscriptionsFulu( signedBlsToExecutionChangeOperationProcessor, final OperationProcessor dataColumnSidecarOperationProcessor, final DebugDataDumper debugDataDumper, - final DasGossipLogger dasGossipLogger) { + final DasGossipLogger dasGossipLogger, + final OperationProcessor executionProofOperationProcessor, + final boolean isExecutionProofTopicEnabled) { super( fork, spec, @@ -87,7 +90,9 @@ public GossipForkSubscriptionsFulu( signedContributionAndProofOperationProcessor, syncCommitteeMessageOperationProcessor, signedBlsToExecutionChangeOperationProcessor, - debugDataDumper); + debugDataDumper, + executionProofOperationProcessor, + isExecutionProofTopicEnabled); this.dataColumnSidecarOperationProcessor = dataColumnSidecarOperationProcessor; this.dasGossipLogger = dasGossipLogger; } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsFuluBpo.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsFuluBpo.java index f2f15e3f549..4061d62db09 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsFuluBpo.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsFuluBpo.java @@ -24,6 +24,7 @@ import tech.pegasys.teku.spec.datastructures.blobs.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange; @@ -62,9 +63,11 @@ public GossipForkSubscriptionsFuluBpo( final OperationProcessor signedBlsToExecutionChangeOperationProcessor, final OperationProcessor dataColumnSidecarOperationProcessor, + final OperationProcessor executionProofOperationProcessor, final DebugDataDumper debugDataDumper, final DasGossipLogger dasGossipLogger, - final BlobParameters bpo) { + final BlobParameters bpo, + final boolean isExecutionProofTopicEnabled) { super( fork, spec, @@ -85,7 +88,9 @@ public GossipForkSubscriptionsFuluBpo( signedBlsToExecutionChangeOperationProcessor, dataColumnSidecarOperationProcessor, debugDataDumper, - dasGossipLogger); + dasGossipLogger, + executionProofOperationProcessor, + isExecutionProofTopicEnabled); this.bpo = bpo; } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloas.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloas.java index 26cfd889034..1f9b592b14c 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloas.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloas.java @@ -30,6 +30,7 @@ import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.PayloadAttestationMessage; import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.SignedExecutionPayloadBid; import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.SignedExecutionPayloadEnvelope; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange; @@ -79,7 +80,9 @@ public GossipForkSubscriptionsGloas( payloadAttestationMessageOperationProcessor, final OperationProcessor executionPayloadBidOperationProcessor, final DebugDataDumper debugDataDumper, - final DasGossipLogger dasGossipLogger) { + final DasGossipLogger dasGossipLogger, + final OperationProcessor executionProcessorOperationProcessor, + final boolean isExecutionProofTopicEnabled) { super( fork, spec, @@ -100,7 +103,9 @@ public GossipForkSubscriptionsGloas( signedBlsToExecutionChangeOperationProcessor, dataColumnSidecarOperationProcessor, debugDataDumper, - dasGossipLogger); + dasGossipLogger, + executionProcessorOperationProcessor, + isExecutionProofTopicEnabled); this.executionPayloadProcessor = executionPayloadOperationProcessor; this.payloadAttestationMessageProcessor = payloadAttestationMessageOperationProcessor; this.executionPayloadBidProcessor = executionPayloadBidOperationProcessor; diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloasBpo.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloasBpo.java index 7a6649d7ae5..1588bfd8695 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloasBpo.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloasBpo.java @@ -27,6 +27,7 @@ import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.PayloadAttestationMessage; import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.SignedExecutionPayloadBid; import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.SignedExecutionPayloadEnvelope; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange; @@ -65,13 +66,15 @@ public GossipForkSubscriptionsGloasBpo( final OperationProcessor signedBlsToExecutionChangeOperationProcessor, final OperationProcessor dataColumnSidecarOperationProcessor, + final OperationProcessor executionProofOperationProcessor, final OperationProcessor executionPayloadOperationProcessor, final OperationProcessor payloadAttestationMessageOperationProcessor, final OperationProcessor executionPayloadBidOperationProcessor, final DebugDataDumper debugDataDumper, final DasGossipLogger dasGossipLogger, - final BlobParameters bpo) { + final BlobParameters bpo, + final boolean isExecutionProofTopicEnabled) { super( fork, spec, @@ -95,7 +98,9 @@ public GossipForkSubscriptionsGloasBpo( payloadAttestationMessageOperationProcessor, executionPayloadBidOperationProcessor, debugDataDumper, - dasGossipLogger); + dasGossipLogger, + executionProofOperationProcessor, + isExecutionProofTopicEnabled); this.bpo = bpo; } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/ExecutionProofSubnetSubscriptions.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/ExecutionProofSubnetSubscriptions.java new file mode 100644 index 00000000000..04e60c9ea81 --- /dev/null +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/ExecutionProofSubnetSubscriptions.java @@ -0,0 +1,95 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.networking.eth2.gossip.subnets; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.bytes.Bytes4; +import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; +import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName; +import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopics; +import tech.pegasys.teku.networking.eth2.gossip.topics.OperationProcessor; +import tech.pegasys.teku.networking.eth2.gossip.topics.topichandlers.Eth2TopicHandler; +import tech.pegasys.teku.networking.eth2.gossip.topics.topichandlers.ExecutionProofTopicHandler; +import tech.pegasys.teku.networking.p2p.gossip.GossipNetwork; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.SpecVersion; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProofSchema; +import tech.pegasys.teku.spec.datastructures.state.ForkInfo; +import tech.pegasys.teku.spec.schemas.SchemaDefinitionsElectra; +import tech.pegasys.teku.statetransition.util.DebugDataDumper; +import tech.pegasys.teku.storage.client.RecentChainData; + +public class ExecutionProofSubnetSubscriptions extends CommitteeSubnetSubscriptions { + + private final AsyncRunner asyncRunner; + private final RecentChainData recentChainData; + private final OperationProcessor processor; + private final ForkInfo forkInfo; + private final Bytes4 forkDigest; + private final ExecutionProofSchema executionProofSchema; + private final DebugDataDumper debugDataDumper; + + private static final Logger LOG = LogManager.getLogger(); + + public ExecutionProofSubnetSubscriptions( + final Spec spec, + final AsyncRunner asyncRunner, + final GossipNetwork gossipNetwork, + final GossipEncoding gossipEncoding, + final RecentChainData recentChainData, + final OperationProcessor processor, + final DebugDataDumper debugDataDumper, + final ForkInfo forkInfo, + final Bytes4 forkDigest) { + super(gossipNetwork, gossipEncoding); + this.asyncRunner = asyncRunner; + this.recentChainData = recentChainData; + this.processor = processor; + this.debugDataDumper = debugDataDumper; + this.forkInfo = forkInfo; + this.forkDigest = forkDigest; + final SpecVersion specVersion = + spec.forMilestone(spec.getForkSchedule().getHighestSupportedMilestone()); + this.executionProofSchema = + SchemaDefinitionsElectra.required(specVersion.getSchemaDefinitions()) + .getExecutionProofSchema(); + } + + public SafeFuture gossip(final ExecutionProof executionProof) { + int subnetId = executionProof.getSubnetId().get().intValue(); + final String topic = + GossipTopics.getExecutionProofSubnetTopic(forkDigest, subnetId, gossipEncoding); + return gossipNetwork.gossip(topic, gossipEncoding.encode(executionProof)); + } + + @Override + protected Eth2TopicHandler createTopicHandler(final int subnetId) { + LOG.debug("Creating ExecutionProof topic handler for subnet {}", subnetId); + final String topicName = GossipTopicName.getExecutionProofSubnetTopicName(subnetId); + return ExecutionProofTopicHandler.createHandler( + recentChainData, + asyncRunner, + processor, + gossipEncoding, + forkInfo, + forkDigest, + topicName, + executionProofSchema, + debugDataDumper); + } +} diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2GossipTopicFilter.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2GossipTopicFilter.java index 648e4ae4b68..c00f1047c5a 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2GossipTopicFilter.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2GossipTopicFilter.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import tech.pegasys.teku.infrastructure.bytes.Bytes4; import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.networking.eth2.P2PConfig; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; import tech.pegasys.teku.networking.p2p.libp2p.gossip.GossipTopicFilter; import tech.pegasys.teku.spec.Spec; @@ -33,11 +34,16 @@ public class Eth2GossipTopicFilter implements GossipTopicFilter { private static final Logger LOG = LogManager.getLogger(); private final Spec spec; + private final P2PConfig p2pConfig; private final Supplier> relevantTopics; public Eth2GossipTopicFilter( - final RecentChainData recentChainData, final GossipEncoding gossipEncoding, final Spec spec) { + final RecentChainData recentChainData, + final GossipEncoding gossipEncoding, + final Spec spec, + final P2PConfig p2pConfig) { this.spec = spec; + this.p2pConfig = p2pConfig; relevantTopics = Suppliers.memoize(() -> computeRelevantTopics(recentChainData, gossipEncoding)); } @@ -56,7 +62,7 @@ private Set computeRelevantTopics( final Bytes4 forkDigest = recentChainData.getCurrentForkDigest().orElseThrow(); final SpecMilestone milestone = recentChainData.getMilestoneByForkDigest(forkDigest).orElseThrow(); - final Set topics = getAllTopics(gossipEncoding, forkDigest, spec, milestone); + final Set topics = getAllTopics(gossipEncoding, forkDigest, spec, milestone, p2pConfig); final UInt64 forkEpoch = recentChainData .getBpoForkByForkDigest(forkDigest) @@ -70,7 +76,8 @@ private Set computeRelevantTopics( spec.getForkSchedule().getSpecMilestoneAtEpoch(futureFork.getEpoch()); final Bytes4 futureForkDigest = recentChainData.getForkDigestByMilestone(futureMilestone).orElseThrow(); - topics.addAll(getAllTopics(gossipEncoding, futureForkDigest, spec, futureMilestone)); + topics.addAll( + getAllTopics(gossipEncoding, futureForkDigest, spec, futureMilestone, p2pConfig)); }); // BPO spec.getBpoForks().stream() @@ -81,7 +88,8 @@ private Set computeRelevantTopics( spec.getForkSchedule().getSpecMilestoneAtEpoch(futureBpoFork.epoch()); final Bytes4 futureForkDigest = recentChainData.getForkDigestByBpoFork(futureBpoFork).orElseThrow(); - topics.addAll(getAllTopics(gossipEncoding, futureForkDigest, spec, futureMilestone)); + topics.addAll( + getAllTopics(gossipEncoding, futureForkDigest, spec, futureMilestone, p2pConfig)); }); return topics; } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopicName.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopicName.java index faf108fe3b1..e2d77cb6285 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopicName.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopicName.java @@ -47,4 +47,8 @@ public static String getBlobSidecarSubnetTopicName(final int subnetId) { public static String getDataColumnSidecarSubnetTopicName(final int subnetId) { return "data_column_sidecar_" + subnetId; } + + public static String getExecutionProofSubnetTopicName(final int subnetId) { + return "execution_proof_" + subnetId; + } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopics.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopics.java index ac23dbc48bc..71debc16f7c 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopics.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopics.java @@ -19,9 +19,11 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import tech.pegasys.teku.infrastructure.bytes.Bytes4; +import tech.pegasys.teku.networking.eth2.P2PConfig; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.config.Constants; import tech.pegasys.teku.spec.constants.NetworkConstants; /** @@ -73,6 +75,12 @@ public static String getDataColumnSidecarSubnetTopic( forkDigest, GossipTopicName.getDataColumnSidecarSubnetTopicName(subnetId), gossipEncoding); } + public static String getExecutionProofSubnetTopic( + final Bytes4 forkDigest, final int subnetId, final GossipEncoding gossipEncoding) { + return getTopic( + forkDigest, GossipTopicName.getExecutionProofSubnetTopicName(subnetId), gossipEncoding); + } + public static Set getAllDataColumnSidecarSubnetTopics( final GossipEncoding gossipEncoding, final Bytes4 forkDigest, final Spec spec) { @@ -89,7 +97,8 @@ public static Set getAllTopics( final GossipEncoding gossipEncoding, final Bytes4 forkDigest, final Spec spec, - final SpecMilestone specMilestone) { + final SpecMilestone specMilestone, + final P2PConfig p2pConfig) { final Set topics = new HashSet<>(); for (int i = 0; i < spec.getNetworkingConfig().getAttestationSubnetCount(); i++) { @@ -108,6 +117,12 @@ public static Set getAllTopics( topics.addAll(getAllDataColumnSidecarSubnetTopics(gossipEncoding, forkDigest, spec)); + if (p2pConfig.isExecutionProofTopicEnabled()) { + for (int i = 0; i < Constants.MAX_EXECUTION_PROOF_SUBNETS.intValue(); i++) { + topics.add(getExecutionProofSubnetTopic(forkDigest, i, gossipEncoding)); + } + } + for (GossipTopicName topicName : GossipTopicName.values()) { topics.add(GossipTopics.getTopic(forkDigest, topicName, gossipEncoding)); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/topichandlers/ExecutionProofTopicHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/topichandlers/ExecutionProofTopicHandler.java new file mode 100644 index 00000000000..04efc1cf1b6 --- /dev/null +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/topichandlers/ExecutionProofTopicHandler.java @@ -0,0 +1,56 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.networking.eth2.gossip.topics.topichandlers; + +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.bytes.Bytes4; +import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; +import tech.pegasys.teku.networking.eth2.gossip.topics.OperationMilestoneValidator; +import tech.pegasys.teku.networking.eth2.gossip.topics.OperationProcessor; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProofSchema; +import tech.pegasys.teku.spec.datastructures.state.ForkInfo; +import tech.pegasys.teku.statetransition.util.DebugDataDumper; +import tech.pegasys.teku.storage.client.RecentChainData; + +public class ExecutionProofTopicHandler { + + public static Eth2TopicHandler createHandler( + final RecentChainData recentChainData, + final AsyncRunner asyncRunner, + final OperationProcessor operationProcessor, + final GossipEncoding gossipEncoding, + final ForkInfo forkInfo, + final Bytes4 forkDigest, + final String topicName, + final ExecutionProofSchema executionProofSchema, + final DebugDataDumper debugDataDumper) { + final Spec spec = recentChainData.getSpec(); + return new Eth2TopicHandler<>( + recentChainData, + asyncRunner, + operationProcessor, + gossipEncoding, + forkDigest, + topicName, + new OperationMilestoneValidator<>( + recentChainData.getSpec(), + forkInfo.getFork(), + __ -> spec.computeEpochAtSlot(recentChainData.getHeadSlot())), + executionProofSchema, + recentChainData.getSpec().getNetworkingConfig(), + debugDataDumper); + } +} diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java index 15579a5a28c..7ae510a76e8 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java @@ -60,6 +60,12 @@ public void subscribeToDataColumnSidecarSubnetId(final int subnetId) {} @Override public void unsubscribeFromDataColumnSidecarSubnetId(final int subnetId) {} + @Override + public void subscribeToExecutionProofSubnetId(final int subnetId) {} + + @Override + public void unsubscribeFromExecutionProofSubnetId(final int subnetId) {} + @Override public MetadataMessage getMetadata() { return spec.getGenesisSchemaDefinitions().getMetadataMessageSchema().createDefault(); diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java index 516ed3691c3..54102c73c3f 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java @@ -80,6 +80,8 @@ public class ActiveEth2P2PNetworkTest { new SubnetSubscriptionService(); private final SubnetSubscriptionService dataColumnSidecarCommitteeSubnetService = new SubnetSubscriptionService(); + private final SubnetSubscriptionService executionProofCommitteeSubnetService = + new SubnetSubscriptionService(); private RecentChainData recentChainData; private final GossipEncoding gossipEncoding = GossipEncoding.SSZ_SNAPPY; private final GossipConfigurator gossipConfigurator = GossipConfigurator.NOOP; @@ -451,6 +453,7 @@ ActiveEth2P2PNetwork createNetwork() { attestationSubnetService, syncCommitteeSubnetService, dataColumnSidecarCommitteeSubnetService, + executionProofCommitteeSubnetService, gossipEncoding, gossipConfigurator, processedAttestationSubscriptionProvider, diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloasTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloasTest.java index 040d1e352b9..c8520b700cd 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloasTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsGloasTest.java @@ -98,6 +98,8 @@ private GossipForkSubscriptionsGloas createGossipForkSubscriptionGloas() { noopOperationProcessor, noopOperationProcessor, DebugDataDumper.NOOP, - DasGossipLogger.NOOP); + DasGossipLogger.NOOP, + noopOperationProcessor, + false); } } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2GossipTopicFilterTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2GossipTopicFilterTest.java index eca9c14fa1b..887d42feb89 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2GossipTopicFilterTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2GossipTopicFilterTest.java @@ -21,6 +21,7 @@ import static tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding.SSZ_SNAPPY; import static tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName.getAttestationSubnetTopicName; import static tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName.getBlobSidecarSubnetTopicName; +import static tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName.getExecutionProofSubnetTopicName; import static tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName.getSyncCommitteeSubnetTopicName; import static tech.pegasys.teku.spec.SpecMilestone.DENEB; import static tech.pegasys.teku.spec.SpecMilestone.ELECTRA; @@ -34,12 +35,14 @@ import org.junit.jupiter.api.TestTemplate; import tech.pegasys.teku.infrastructure.bytes.Bytes4; import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.networking.eth2.P2PConfig; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; import tech.pegasys.teku.spec.TestSpecContext; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.TestSpecInvocationContextProvider.SpecContext; import tech.pegasys.teku.spec.config.BlobScheduleEntry; +import tech.pegasys.teku.spec.config.Constants; import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.config.SpecConfigDeneb; import tech.pegasys.teku.spec.logic.versions.fulu.helpers.BlobParameters; @@ -58,6 +61,7 @@ class Eth2GossipTopicFilterTest { private Bytes4 currentForkDigest; private Eth2GossipTopicFilter filter; private Bytes4 nextForkDigest; + private P2PConfig p2pConfig; @BeforeEach void setUp(final SpecContext specContext) { @@ -66,6 +70,7 @@ void setUp(final SpecContext specContext) { // current milestone is actually the previous milestone currentSpecMilestone = specContext.getSpecMilestone().getPreviousMilestone(); nextSpecMilestone = specContext.getSpecMilestone(); + spec = switch (nextSpecMilestone) { case PHASE0, ALTAIR, BELLATRIX, CAPELLA -> @@ -93,12 +98,12 @@ void setUp(final SpecContext specContext) { new BlobScheduleEntry( bpoFork.epoch(), bpoFork.maxBlobsPerBlock()))))); }; - + p2pConfig = P2PConfig.builder().specProvider(spec).build(); final StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault(spec); storageSystem.chainUpdater().initializeGenesis(); recentChainData = spy(storageSystem.recentChainData()); - filter = new Eth2GossipTopicFilter(recentChainData, SSZ_SNAPPY, spec); + filter = new Eth2GossipTopicFilter(recentChainData, SSZ_SNAPPY, spec, p2pConfig); currentForkDigest = recentChainData.getCurrentForkDigest().orElseThrow(); nextForkDigest = @@ -222,7 +227,7 @@ void shouldAllowTopicsForBpoForkSameEpochAsFulu() { storageSystem.chainUpdater().initializeGenesis(); recentChainData = spy(storageSystem.recentChainData()); - filter = new Eth2GossipTopicFilter(recentChainData, SSZ_SNAPPY, spec); + filter = new Eth2GossipTopicFilter(recentChainData, SSZ_SNAPPY, spec, p2pConfig); currentForkDigest = recentChainData.getCurrentForkDigest().orElseThrow(); nextForkDigest = @@ -237,6 +242,27 @@ void shouldAllowTopicsForBpoForkSameEpochAsFulu() { assertThat(filter.isRelevantTopic(bpoTopic)).isTrue(); } + @TestTemplate + void shouldNotConsiderExecutionProofSubnetsRelevantByDefault() { + assumeThat(nextSpecMilestone).isEqualTo(ELECTRA); + for (int i = 0; i < Constants.MAX_EXECUTION_PROOF_SUBNETS.intValue(); i++) { + assertThat(filter.isRelevantTopic(getTopicName(getExecutionProofSubnetTopicName(i)))) + .isFalse(); + } + } + + @TestTemplate + void shouldConsiderExecutionProofSubnetsRelevantWhenEnabled() { + P2PConfig p2pConfigOverwritten = + P2PConfig.builder().specProvider(spec).executionProofTopicEnabled(true).build(); + filter = new Eth2GossipTopicFilter(recentChainData, SSZ_SNAPPY, spec, p2pConfigOverwritten); + assumeThat(nextSpecMilestone).isEqualTo(ELECTRA); + for (int i = 0; i < Constants.MAX_EXECUTION_PROOF_SUBNETS.intValue(); i++) { + assertThat(filter.isRelevantTopic(getTopicName(getExecutionProofSubnetTopicName(i)))) + .isTrue(); + } + } + private String getTopicName(final GossipTopicName name) { return GossipTopics.getTopic(currentForkDigest, name, SSZ_SNAPPY); } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopicsTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopicsTest.java index 794b24b18d6..38ab3cb19fa 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopicsTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/GossipTopicsTest.java @@ -20,6 +20,7 @@ import org.apache.tuweni.bytes.Bytes32; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.bytes.Bytes4; +import tech.pegasys.teku.networking.eth2.P2PConfig; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; import tech.pegasys.teku.networking.p2p.libp2p.gossip.LibP2PGossipNetworkBuilder; import tech.pegasys.teku.spec.Spec; @@ -58,6 +59,7 @@ public void extractForkDigest_invalid() { public void maxSubscribedTopicsConstantIsLargeEnough() { final SpecMilestone latestMilestone = SpecMilestone.getHighestMilestone(); final Spec spec = TestSpecFactory.createMainnet(latestMilestone); + final P2PConfig p2pConfig = P2PConfig.builder().specProvider(spec).build(); final StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault(spec); storageSystem.chainUpdater().initializeGenesis(); @@ -76,7 +78,8 @@ public void maxSubscribedTopicsConstantIsLargeEnough() { spec.forMilestone(milestone) .miscHelpers() .computeForkDigest(fork.getCurrentVersion(), genesisValidatorsRoot); - return GossipTopics.getAllTopics(gossipEncoding, forkDigest, spec, milestone) + return GossipTopics.getAllTopics( + gossipEncoding, forkDigest, spec, milestone, p2pConfig) .size(); }) .sum(); diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java index 9b4beda0357..89e90eb5267 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java @@ -101,6 +101,7 @@ import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.PayloadAttestationMessage; import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.SignedExecutionPayloadBid; import tech.pegasys.teku.spec.datastructures.epbs.versions.gloas.SignedExecutionPayloadEnvelope; +import tech.pegasys.teku.spec.datastructures.execution.ExecutionProof; import tech.pegasys.teku.spec.datastructures.operations.Attestation; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; @@ -165,6 +166,7 @@ public class Eth2P2PNetworkBuilder { protected OperationProcessor syncCommitteeMessageProcessor; protected OperationProcessor signedBlsToExecutionChangeProcessor; protected OperationProcessor dataColumnSidecarOperationProcessor; + protected OperationProcessor executionProofOperationProcessor; protected OperationProcessor executionPayloadProcessor; protected OperationProcessor payloadAttestationMessageProcessor; protected OperationProcessor executionPayloadBidProcessor; @@ -226,6 +228,8 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { new SubnetSubscriptionService(); final SubnetSubscriptionService dataColumnSidecarSubnetService = new SubnetSubscriptionService(); + final SubnetSubscriptionService executionProofSubnetService = + new SubnetSubscriptionService(); final CombinedChainDataClient combinedChainDataClient = new CombinedChainDataClient( recentChainData, historicalChainData, spec, LateBlockReorgPreparationHandler.NOOP); @@ -287,7 +291,7 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { final SyncCommitteeSubnetTopicProvider syncCommitteeTopicProvider = new SyncCommitteeSubnetTopicProvider(recentChainData, gossipEncoding); final GossipTopicFilter gossipTopicsFilter = - new Eth2GossipTopicFilter(recentChainData, gossipEncoding, spec); + new Eth2GossipTopicFilter(recentChainData, gossipEncoding, spec, config); final KeyValueStore keyValueStore = new MemKeyValueStore<>(); final DiscoveryConfig discoConfig = config.getDiscoveryConfig(); final TargetPeerRange targetPeerRange = @@ -366,7 +370,11 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { .map( forkAndSpecMilestone -> createSubscriptions( - forkAndSpecMilestone, metricsSystem, network, gossipEncoding)) + forkAndSpecMilestone, + metricsSystem, + network, + gossipEncoding, + config.isExecutionProofTopicEnabled())) .forEach(gossipForkManagerBuilder::fork); final GossipForkManager gossipForkManager = gossipForkManagerBuilder.build(); @@ -382,6 +390,7 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { attestationSubnetService, syncCommitteeSubnetService, dataColumnSidecarSubnetService, + executionProofSubnetService, gossipEncoding, GossipConfigurator.NOOP, processedAttestationSubscriptionProvider, @@ -394,7 +403,8 @@ private GossipForkSubscriptions createSubscriptions( final ForkAndSpecMilestone forkAndSpecMilestone, final NoOpMetricsSystem metricsSystem, final DiscoveryNetwork network, - final GossipEncoding gossipEncoding) { + final GossipEncoding gossipEncoding, + final boolean isExecutionProofTopicEnabled) { return switch (forkAndSpecMilestone.getSpecMilestone()) { case PHASE0 -> new GossipForkSubscriptionsPhase0( @@ -506,7 +516,9 @@ private GossipForkSubscriptions createSubscriptions( signedContributionAndProofProcessor, syncCommitteeMessageProcessor, signedBlsToExecutionChangeProcessor, - debugDataDumper); + debugDataDumper, + executionProofOperationProcessor, + isExecutionProofTopicEnabled); case FULU -> new GossipForkSubscriptionsFulu( forkAndSpecMilestone.getFork(), @@ -528,7 +540,9 @@ private GossipForkSubscriptions createSubscriptions( signedBlsToExecutionChangeProcessor, dataColumnSidecarOperationProcessor, debugDataDumper, - DasGossipLogger.NOOP); + DasGossipLogger.NOOP, + executionProofOperationProcessor, + isExecutionProofTopicEnabled); case GLOAS -> new GossipForkSubscriptionsGloas( forkAndSpecMilestone.getFork(), @@ -553,7 +567,9 @@ private GossipForkSubscriptions createSubscriptions( payloadAttestationMessageProcessor, executionPayloadBidProcessor, debugDataDumper, - DasGossipLogger.NOOP); + DasGossipLogger.NOOP, + executionProofOperationProcessor, + isExecutionProofTopicEnabled); }; } @@ -648,6 +664,9 @@ private void setDefaults() { if (signedBlsToExecutionChangeProcessor == null) { signedBlsToExecutionChangeProcessor = OperationProcessor.noop(); } + if (executionProofOperationProcessor == null) { + executionProofOperationProcessor = OperationProcessor.noop(); + } if (executionPayloadProcessor == null) { executionPayloadProcessor = OperationProcessor.noop(); } @@ -786,6 +805,13 @@ public Eth2P2PNetworkBuilder gossipedDataColumnSidecarOperationProcessor( return this; } + public Eth2P2PNetworkBuilder gossipedExecutionProofOperationProcessor( + final OperationProcessor executionProofOperationProcessor) { + checkNotNull(executionProofOperationProcessor); + this.executionProofOperationProcessor = executionProofOperationProcessor; + return this; + } + public Eth2P2PNetworkBuilder gossipedExecutionPayloadProcessor( final OperationProcessor gossipedExecutionPayloadProcessor) { diff --git a/services/beaconchain/build.gradle b/services/beaconchain/build.gradle index fc903a0a4ff..c7a112330ca 100644 --- a/services/beaconchain/build.gradle +++ b/services/beaconchain/build.gradle @@ -27,6 +27,7 @@ dependencies { implementation project(':services:powchain') implementation project(':services:timer') implementation project(':services:executionlayer') + implementation project(':services:zkchain') implementation project(':infrastructure:ssz') implementation project(':storage') implementation project(':storage:api') diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainConfiguration.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainConfiguration.java index e0cb4f8dad2..28bf4cc6100 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainConfiguration.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainConfiguration.java @@ -19,6 +19,7 @@ import tech.pegasys.teku.networking.eth2.P2PConfig; import tech.pegasys.teku.networks.Eth2NetworkConfiguration; import tech.pegasys.teku.services.powchain.PowchainConfiguration; +import tech.pegasys.teku.services.zkchain.ZkChainConfiguration; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.storage.store.StoreConfig; import tech.pegasys.teku.validator.api.InteropConfig; @@ -36,6 +37,7 @@ public class BeaconChainConfiguration { private final StoreConfig storeConfig; private final PowchainConfiguration powchainConfiguration; private final Spec spec; + private final ZkChainConfiguration zkChainConfiguration; private final BeaconChainControllerFactory beaconChainControllerFactory; private final MetricsConfig metricsConfig; @@ -52,7 +54,8 @@ public BeaconChainConfiguration( final StoreConfig storeConfig, final Spec spec, final BeaconChainControllerFactory beaconChainControllerFactory, - final MetricsConfig metricsConfig) { + final MetricsConfig metricsConfig, + final ZkChainConfiguration zkChainConfiguration) { this.eth2NetworkConfiguration = eth2NetworkConfiguration; this.weakSubjectivityConfig = weakSubjectivityConfig; this.validatorConfig = validatorConfig; @@ -65,6 +68,7 @@ public BeaconChainConfiguration( this.spec = spec; this.beaconChainControllerFactory = beaconChainControllerFactory; this.metricsConfig = metricsConfig; + this.zkChainConfiguration = zkChainConfiguration; } public Spec getSpec() { @@ -114,4 +118,8 @@ public MetricsConfig getMetricsConfig() { public BeaconChainControllerFactory getBeaconChainControllerFactory() { return beaconChainControllerFactory; } + + public ZkChainConfiguration zkChainConfiguration() { + return zkChainConfiguration; + } } diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index c3c8b5e9a84..905fb876053 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -105,6 +105,7 @@ import tech.pegasys.teku.service.serviceutils.layout.DataDirLayout; import tech.pegasys.teku.services.executionlayer.ExecutionLayerBlockManagerFactory; import tech.pegasys.teku.services.timer.TimerService; +import tech.pegasys.teku.services.zkchain.ZkChainConfiguration; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; import tech.pegasys.teku.spec.SpecVersion; @@ -190,6 +191,8 @@ import tech.pegasys.teku.statetransition.datacolumns.retriever.DataColumnSidecarRetriever; import tech.pegasys.teku.statetransition.datacolumns.retriever.RecoveringSidecarRetriever; import tech.pegasys.teku.statetransition.datacolumns.retriever.SimpleSidecarRetriever; +import tech.pegasys.teku.statetransition.executionproofs.ExecutionProofManager; +import tech.pegasys.teku.statetransition.executionproofs.ExecutionProofManagerImpl; import tech.pegasys.teku.statetransition.forkchoice.ForkChoice; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifierImpl; @@ -219,6 +222,7 @@ import tech.pegasys.teku.statetransition.validation.BlockGossipValidator; import tech.pegasys.teku.statetransition.validation.BlockValidator; import tech.pegasys.teku.statetransition.validation.DataColumnSidecarGossipValidator; +import tech.pegasys.teku.statetransition.validation.ExecutionProofGossipValidator; import tech.pegasys.teku.statetransition.validation.GossipValidationHelper; import tech.pegasys.teku.statetransition.validation.InternalValidationResult; import tech.pegasys.teku.statetransition.validation.ProposerSlashingValidator; @@ -354,6 +358,7 @@ public class BeaconChainController extends Service implements BeaconChainControl protected volatile BlobSidecarManager blobSidecarManager; protected volatile BlobSidecarGossipValidator blobSidecarValidator; protected volatile DataColumnSidecarManager dataColumnSidecarManager; + protected volatile ExecutionProofManager executionProofManager; protected volatile Optional dasCustodySync = Optional.empty(); protected volatile Optional recoveringSidecarRetriever = Optional.empty(); @@ -459,6 +464,10 @@ protected void startServices() { blobSidecar -> recentBlobSidecarsFetcher.cancelRecentBlobSidecarRequest( new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()))); + executionProofManager.subscribeToValidExecutionProofs( + (executionProof, remoteOrigin) -> + // TODO add actual logic to handle valid execution proofs + LOG.debug("Received valid execution proof: {}", executionProof)); final Optional network = beaconConfig.eth2NetworkConfig().getEth2Network(); if (network.isPresent() && network.get() == Eth2Network.EPHEMERY) { @@ -611,6 +620,7 @@ public void initAll() { initBlobSidecarManager(); initDasSamplerManager(); initDataColumnSidecarManager(); + initZkChain(); initForkChoiceStateProvider(); initForkChoiceNotifier(); initMergeMonitors(); @@ -661,6 +671,25 @@ protected void initExecutionLayer() { executionLayer = eventChannels.getPublisher(ExecutionLayerChannel.class, beaconAsyncRunner); } + protected void initZkChain() { + LOG.debug("BeaconChainController.initZkChain()"); + final ZkChainConfiguration zkConfig = beaconConfig.zkChainConfiguration(); + // TODO: We will eventually need the Gossip in the EP Manager for publishing the proofs we + // produce? + // comment for now this will be used in the future + // final ExecutionProofGossipChannel executionProofGossipChannel = + // eventChannels.getPublisher(ExecutionProofGossipChannel.class, networkAsyncRunner); + if (zkConfig.isStatelessValidationEnabled()) { + final ExecutionProofGossipValidator executionProofGossipValidator = + ExecutionProofGossipValidator.create(); + + executionProofManager = new ExecutionProofManagerImpl(executionProofGossipValidator); + + } else { + executionProofManager = ExecutionProofManager.NOOP; + } + } + protected void initKzg() { if (spec.isMilestoneSupported(SpecMilestone.DENEB)) { kzg = KZG.getInstance(beaconConfig.eth2NetworkConfig().isRustKzgEnabled()); @@ -1597,6 +1626,7 @@ protected void initP2PNetwork() { .gossipedBlobSidecarProcessor(blobSidecarManager::validateAndPrepareForBlockImport) .gossipedDataColumnSidecarOperationProcessor( dataColumnSidecarManager::onDataColumnSidecarGossip) + .gossipedExecutionProofOperationProcessor(executionProofManager::onExecutionProofGossip) .gossipedAttestationProcessor(attestationManager::addAttestation) .gossipedAggregateProcessor(attestationManager::addAggregate) .gossipedAttesterSlashingProcessor(attesterSlashingPool::addRemote) diff --git a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java index ab744609be3..9a6096e7982 100644 --- a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java +++ b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java @@ -395,6 +395,16 @@ The network interface(s) on which the node listens for P2P communication. fallbackValue = "true") private boolean allTopicsFilterEnabled = P2PConfig.DEFAULT_PEER_ALL_TOPIC_FILTER_ENABLED; + @Option( + names = {"--Xexecution-proof-topics-enabled"}, + paramLabel = "", + showDefaultValue = Visibility.ALWAYS, + description = "Enable all execution proof topics", + arity = "0..1", + hidden = true, + fallbackValue = "true") + private boolean executionProofTopicEnabled = P2PConfig.DEFAULT_EXECUTION_PROOF_GOSSIP_ENABLED; + @Option( names = {"--Xpeer-request-limit"}, paramLabel = "", @@ -592,7 +602,8 @@ public void configure(final TekuConfiguration.Builder builder) { .gossipBlobsAfterBlockEnabled(gossipBlobsAfterBlockEnabled) .dasExtraCustodyGroupCount(dasExtraCustodyGroupCount) .historicalDataMaxConcurrentQueries(historicalDataMaxConcurrentQueries) - .historicalDataMaxQueryQueueSize(historicalDataMaxQueryQueueSize); + .historicalDataMaxQueryQueueSize(historicalDataMaxQueryQueueSize) + .executionProofTopicEnabled(executionProofTopicEnabled); batchVerifyQueueCapacity.ifPresent(b::batchVerifyQueueCapacity); }) .discovery( diff --git a/teku/src/main/java/tech/pegasys/teku/config/TekuConfiguration.java b/teku/src/main/java/tech/pegasys/teku/config/TekuConfiguration.java index 09d27c8fa8f..046ded0a600 100644 --- a/teku/src/main/java/tech/pegasys/teku/config/TekuConfiguration.java +++ b/teku/src/main/java/tech/pegasys/teku/config/TekuConfiguration.java @@ -98,7 +98,8 @@ private TekuConfiguration( storeConfig, spec, beaconChainControllerFactory, - metricsConfig); + metricsConfig, + zkChainConfiguration); this.validatorClientConfig = new ValidatorClientConfiguration( validatorConfig, interopConfig, validatorRestApiConfig, spec);