diff --git a/relayer/cmd/run/parachain-v2/command.go b/relayer/cmd/run/parachain-v2/command.go index 204e66561..c551daa23 100644 --- a/relayer/cmd/run/parachain-v2/command.go +++ b/relayer/cmd/run/parachain-v2/command.go @@ -10,6 +10,7 @@ import ( "github.com/sirupsen/logrus" "github.com/snowfork/snowbridge/relayer/chain/ethereum" + "github.com/snowfork/snowbridge/relayer/relays/beefy" "github.com/snowfork/snowbridge/relayer/relays/parachain-v2" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -17,10 +18,12 @@ import ( ) var ( - configFile string - privateKey string - privateKeyFile string - privateKeyID string + configFile string + privateKey string + privateKeyFile string + privateKeyID string + beefyConfigFile string + instantVerification bool ) func Command() *cobra.Command { @@ -38,6 +41,9 @@ func Command() *cobra.Command { cmd.Flags().StringVar(&privateKeyFile, "ethereum.private-key-file", "", "The file from which to read the private key") cmd.Flags().StringVar(&privateKeyID, "ethereum.private-key-id", "", "The secret id to lookup the private key in AWS Secrets Manager") + cmd.Flags().StringVar(&beefyConfigFile, "beefy.config", "", "Path to beefy configuration file") + cmd.Flags().BoolVarP(&instantVerification, "instant-verification", "", false, "Enable instant verification of parachain messages") + return cmd } @@ -92,7 +98,26 @@ func run(_ *cobra.Command, _ []string) error { return nil }) - err = relay.Start(ctx, eg) + if !instantVerification { + err = relay.Start(ctx, eg) + } else { + viper.SetConfigFile(beefyConfigFile) + if err := viper.ReadInConfig(); err != nil { + return err + } + + var beefyConfig beefy.Config + err = viper.UnmarshalExact(&beefyConfig) + if err != nil { + return err + } + var instantRelay *parachain.InstantRelay + instantRelay, err = parachain.NewInstantRelay(&config, &beefyConfig, keypair) + if err != nil { + return err + } + err = instantRelay.Start(ctx, eg) + } if err != nil { logrus.WithError(err).Fatal("Unhandled error") cancel() diff --git a/relayer/relays/beefy/ethereum-writer.go b/relayer/relays/beefy/ethereum-writer.go index b022c3c97..55c0a6d89 100644 --- a/relayer/relays/beefy/ethereum-writer.go +++ b/relayer/relays/beefy/ethereum-writer.go @@ -174,6 +174,20 @@ func (wr *EthereumWriter) submit(ctx context.Context, task *Request) error { } // Commit PrevRandao which will be used as seed to randomly select subset of validators // https://github.com/Snowfork/snowbridge/blob/75a475cbf8fc8e13577ad6b773ac452b2bf82fbb/contracts/contracts/BeefyClient.sol#L446-L447 + state, err := wr.queryBeefyClientState(ctx) + if err != nil { + return fmt.Errorf("query beefy client state: %w", err) + } + + // Ignore beefy block already synced + if uint64(task.SignedCommitment.Commitment.BlockNumber) <= state.LatestBeefyBlock { + log.WithFields(log.Fields{ + "validatorSetID": state.CurrentValidatorSetID, + "beefyBlock": state.LatestBeefyBlock, + "relayBlock": task.SignedCommitment.Commitment.BlockNumber, + }).Info("Beefy block already synced, just ignore") + return nil + } tx, err = wr.contract.CommitPrevRandao( wr.conn.MakeTxOpts(ctx), *commitmentHash, @@ -210,6 +224,20 @@ func (wr *EthereumWriter) submit(ctx context.Context, task *Request) error { return nil } // Final submission + state, err = wr.queryBeefyClientState(ctx) + if err != nil { + return fmt.Errorf("query beefy client state: %w", err) + } + + // Ignore beefy block already synced + if uint64(task.SignedCommitment.Commitment.BlockNumber) <= state.LatestBeefyBlock { + log.WithFields(log.Fields{ + "validatorSetID": state.CurrentValidatorSetID, + "beefyBlock": state.LatestBeefyBlock, + "relayBlock": task.SignedCommitment.Commitment.BlockNumber, + }).Info("Beefy block already synced, just ignore") + return nil + } tx, err = wr.doSubmitFinal(ctx, *commitmentHash, initialBitfield, task) if err != nil { if isDuplicateBeefyError(err) { diff --git a/relayer/relays/beefy/on-demand-sync.go b/relayer/relays/beefy/on-demand-sync.go index 83c35fce7..850715816 100644 --- a/relayer/relays/beefy/on-demand-sync.go +++ b/relayer/relays/beefy/on-demand-sync.go @@ -720,3 +720,7 @@ func (relay *OnDemandRelay) queueAll(ctx context.Context) error { } return nil } + +func (relay *OnDemandRelay) GetConfig() *Config { + return relay.config +} diff --git a/relayer/relays/parachain-v2/beefy-instant-syncer.go b/relayer/relays/parachain-v2/beefy-instant-syncer.go new file mode 100644 index 000000000..8795743aa --- /dev/null +++ b/relayer/relays/parachain-v2/beefy-instant-syncer.go @@ -0,0 +1,149 @@ +package parachain + +import ( + "context" + "fmt" + "math/big" + "time" + + log "github.com/sirupsen/logrus" + "github.com/snowfork/snowbridge/relayer/relays/beefy" + "golang.org/x/sync/errgroup" +) + +type BeefyInstantSyncer struct { + config *Config + beefyListener *BeefyListener + beefyOnDemandRelay *beefy.OnDemandRelay +} + +func NewBeefyInstantSyncer( + config *Config, + beefyListener *BeefyListener, + beefyOnDemandRelay *beefy.OnDemandRelay, +) *BeefyInstantSyncer { + return &BeefyInstantSyncer{ + config: config, + beefyListener: beefyListener, + beefyOnDemandRelay: beefyOnDemandRelay, + } +} + +// Todo: consider using subscription to listen for new finalized beefy headers +func (li *BeefyInstantSyncer) Start(ctx context.Context, eg *errgroup.Group) error { + // Initialize the beefy listener to setup the scanner + err := li.beefyListener.initialize(ctx) + if err != nil { + return fmt.Errorf("initialize beefy listener: %w", err) + } + var fetchInterval time.Duration + if li.config.FetchInterval == 0 { + fetchInterval = 180 * time.Second + } else { + fetchInterval = time.Duration(li.config.FetchInterval) * time.Second + } + + ticker := time.NewTicker(fetchInterval) + + eg.Go(func() error { + + for { + finalizedBeefyBlockHash, err := li.beefyListener.relaychainConn.API().RPC.Beefy.GetFinalizedHead() + if err != nil { + return fmt.Errorf("fetch beefy finalized head: %w", err) + } + finalizedBeefyBlockHeader, err := li.beefyListener.relaychainConn.API().RPC.Chain.GetHeader(finalizedBeefyBlockHash) + if err != nil { + return fmt.Errorf("fetch block header: %w", err) + } + latestBeefyBlockNumber := uint64(finalizedBeefyBlockHeader.Number) + err = li.doScanAndUpdate(ctx, latestBeefyBlockNumber) + if err != nil { + return fmt.Errorf("scan for sync tasks: %w", err) + } + + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + continue + } + } + }) + + return nil +} + +func (li *BeefyInstantSyncer) isRelayConsensusProfitable(ctx context.Context, tasks []*Task) (bool, error) { + totalFee := new(big.Int) + for _, task := range tasks { + if task == nil || task.MessageProofs == nil || len(*task.MessageProofs) == 0 { + continue + } + for _, messageProof := range *task.MessageProofs { + totalFee.Add(totalFee, &messageProof.Message.Fee) + } + } + gasPrice, err := li.beefyListener.ethereumConn.Client().SuggestGasPrice(ctx) + if err != nil { + return false, fmt.Errorf("suggest gas price: %w", err) + } + var requireFee *big.Int + if li.beefyOnDemandRelay.GetConfig().Sink.EnableFiatShamir { + requireFee = new(big.Int).Mul(gasPrice, new(big.Int).SetUint64(li.config.Sink.Fees.BaseBeefyFiatShamirGas)) + } else { + requireFee = new(big.Int).Mul(gasPrice, new(big.Int).SetUint64(li.config.Sink.Fees.BaseBeefyTwoPhaseCommitGas)) + } + isProfitable := totalFee.Cmp(requireFee) >= 0 + log.WithFields(log.Fields{ + "totalFee": totalFee.String(), + "requireFee": requireFee.String(), + "isProfitable": isProfitable, + }).Info("isProfitable") + return isProfitable, nil +} + +func (li *BeefyInstantSyncer) doScanAndUpdate(ctx context.Context, beefyBlockNumber uint64) error { + // Scan for undelivered orders using the latest BEEFY block number on the relay chain. + tasks, err := li.beefyListener.scanner.Scan(ctx, beefyBlockNumber) + if err != nil { + return fmt.Errorf("scan for sync tasks: %w", err) + } + if len(tasks) == 0 { + log.Info("No tasks found, skipping") + return nil + } + // Check if the relay consensus is profitable + isProfitable, err := li.isRelayConsensusProfitable(ctx, tasks) + if err != nil { + return fmt.Errorf("check is relay consensus profitable: %w", err) + } + if !isProfitable { + log.Info("Relay consensus is not profitable, skipping") + return nil + } + // Oneshot sync with FiatShamir and ensure light client is synced to the BEEFY block number + // before submitting any messages to the parachain + // This is to ensure the light client has the necessary BEEFY proofs + // to verify the parachain headers being submitted + log.Info(fmt.Sprintf("Syncing light client to BEEFY block number %d\n", beefyBlockNumber)) + err = li.beefyOnDemandRelay.OneShotStart(ctx, beefyBlockNumber) + if err != nil { + return fmt.Errorf("sync beefy update on demand: %w", err) + } + beefyBlockSynced, _, err := li.beefyListener.fetchLatestBeefyBlock(ctx) + if err != nil { + return fmt.Errorf("fetch latest beefy block: %w", err) + } + if beefyBlockSynced < beefyBlockNumber { + return fmt.Errorf("beefy block %d not synced to light client, recent synced %d", beefyBlockNumber, beefyBlockSynced) + } + for _, task := range tasks { + err = li.beefyListener.sendTask(ctx, task) + if err != nil { + return fmt.Errorf("send task: %w", err) + } + } + + return nil +} diff --git a/relayer/relays/parachain-v2/beefy-listener.go b/relayer/relays/parachain-v2/beefy-listener.go index 6806cfd9d..ea13cf363 100644 --- a/relayer/relays/parachain-v2/beefy-listener.go +++ b/relayer/relays/parachain-v2/beefy-listener.go @@ -360,3 +360,38 @@ func (li *BeefyListener) sendTask(ctx context.Context, task *Task) error { } return nil } + +func (li *BeefyListener) initialize(ctx context.Context) error { + // Set up light client bridge contract + address := common.HexToAddress(li.config.Contracts.BeefyClient) + beefyClientContract, err := contracts.NewBeefyClient(address, li.ethereumConn.Client()) + if err != nil { + return err + } + li.beefyClientContract = beefyClientContract + + // fetch ParaId + paraIDKey, err := types.CreateStorageKey(li.parachainConnection.Metadata(), "ParachainInfo", "ParachainId", nil, nil) + if err != nil { + return err + } + var paraID uint32 + ok, err := li.parachainConnection.API().RPC.State.GetStorageLatest(paraIDKey, ¶ID) + if err != nil { + return fmt.Errorf("fetch parachain id: %w", err) + } + if !ok { + return fmt.Errorf("parachain id missing") + } + li.paraID = paraID + + li.scanner = &Scanner{ + config: li.config, + ethConn: li.ethereumConn, + relayConn: li.relaychainConn, + paraConn: li.parachainConnection, + paraID: paraID, + ofac: li.ofac, + } + return nil +} diff --git a/relayer/relays/parachain-v2/config.go b/relayer/relays/parachain-v2/config.go index a68bcfc7b..2fc0e5af2 100644 --- a/relayer/relays/parachain-v2/config.go +++ b/relayer/relays/parachain-v2/config.go @@ -12,6 +12,7 @@ type Config struct { Sink SinkConfig `mapstructure:"sink"` RewardAddress string `mapstructure:"reward-address"` OFAC config.OFACConfig `mapstructure:"ofac"` + FetchInterval uint32 `mapstructure:"fetch-interval"` } type SourceConfig struct { @@ -37,6 +38,10 @@ type SinkContractsConfig struct { } type FeeConfig struct { + // The gas cost of two phase commit + BaseBeefyTwoPhaseCommitGas uint64 `mapstructure:"base-beefy-two-phase-commit-gas"` + // The gas cost of fiat shamir commit + BaseBeefyFiatShamirGas uint64 `mapstructure:"base-beefy-fiat-shamir-gas"` // The gas cost of v2_submit excludes command execution, mainly covers the verification BaseDeliveryGas uint64 `mapstructure:"base-delivery-gas"` // The gas cost of unlock ERC20 token diff --git a/relayer/relays/parachain-v2/instant-sync.go b/relayer/relays/parachain-v2/instant-sync.go new file mode 100644 index 000000000..312455f5a --- /dev/null +++ b/relayer/relays/parachain-v2/instant-sync.go @@ -0,0 +1,123 @@ +package parachain + +import ( + "context" + "fmt" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/snowfork/snowbridge/relayer/chain/ethereum" + "github.com/snowfork/snowbridge/relayer/chain/parachain" + "github.com/snowfork/snowbridge/relayer/chain/relaychain" + "github.com/snowfork/snowbridge/relayer/crypto/secp256k1" + "github.com/snowfork/snowbridge/relayer/relays/beefy" + + "github.com/snowfork/snowbridge/relayer/ofac" + + log "github.com/sirupsen/logrus" +) + +type InstantRelay struct { + config *Config + parachainConn *parachain.Connection + relaychainConn *relaychain.Connection + ethereumConnWriter *ethereum.Connection + ethereumConnBeefy *ethereum.Connection + ethereumChannelWriter *EthereumWriter + beefyListener *BeefyListener + beefyInstantSyncer *BeefyInstantSyncer +} + +func NewInstantRelay(config *Config, beefyConfig *beefy.Config, keypair *secp256k1.Keypair) (*InstantRelay, error) { + log.Info("Creating worker") + + parachainConn := parachain.NewConnection(config.Source.Parachain.Endpoint, nil) + relaychainConn := relaychain.NewConnection(config.Source.Polkadot.Endpoint, nil) + + ethereumConnWriter := ethereum.NewConnection(&config.Sink.Ethereum, keypair) + ethereumConnBeefy := ethereum.NewConnection(&config.Source.Ethereum, keypair) + + ofacClient := ofac.New(config.OFAC.Enabled, config.OFAC.ApiKey) + + // channel for messages from beefy listener to ethereum writer + var tasks = make(chan *Task, 1) + + ethereumChannelWriter, err := NewEthereumWriter( + &config.Sink, + ethereumConnWriter, + ethereumConnBeefy, + tasks, + config, + ) + if err != nil { + return nil, err + } + + beefyListener := NewBeefyListener( + &config.Source, + ethereumConnBeefy, + relaychainConn, + parachainConn, + ofacClient, + tasks, + ) + + beefyOnDemandRelay, err := beefy.NewOnDemandRelay(beefyConfig, keypair) + if err != nil { + return nil, err + } + + beefyInstantSyncer := NewBeefyInstantSyncer( + config, + beefyListener, + beefyOnDemandRelay, + ) + + return &InstantRelay{ + config: config, + parachainConn: parachainConn, + relaychainConn: relaychainConn, + ethereumConnWriter: ethereumConnWriter, + ethereumConnBeefy: ethereumConnBeefy, + ethereumChannelWriter: ethereumChannelWriter, + beefyListener: beefyListener, + beefyInstantSyncer: beefyInstantSyncer, + }, nil +} + +func (relay *InstantRelay) Start(ctx context.Context, eg *errgroup.Group) error { + err := relay.parachainConn.ConnectWithHeartBeat(ctx, eg, time.Second*time.Duration(relay.config.Source.Parachain.HeartbeatSecs)) + if err != nil { + return err + } + + err = relay.ethereumConnWriter.ConnectWithHeartBeat(ctx, eg, time.Second*time.Duration(relay.config.Sink.Ethereum.HeartbeatSecs)) + if err != nil { + return fmt.Errorf("unable to connect to ethereum: writer: %w", err) + } + + err = relay.ethereumConnBeefy.ConnectWithHeartBeat(ctx, eg, time.Second*time.Duration(relay.config.Source.Ethereum.HeartbeatSecs)) + if err != nil { + return fmt.Errorf("unable to connect to ethereum: beefy: %w", err) + } + + err = relay.relaychainConn.ConnectWithHeartBeat(ctx, eg, time.Second*time.Duration(relay.config.Source.Polkadot.HeartbeatSecs)) + if err != nil { + return err + } + + log.Info("Starting beefy instant listener") + err = relay.beefyInstantSyncer.Start(ctx, eg) + if err != nil { + return err + } + + log.Info("Starting ethereum writer") + err = relay.ethereumChannelWriter.Start(ctx, eg) + if err != nil { + return err + } + + return nil +}