diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index dd9bc0f98..5ca86203b 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -2,12 +2,12 @@ package parachain import ( "context" - "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" - gethTypes "github.com/ethereum/go-ethereum/core/types" + "golang.org/x/exp/rand" "golang.org/x/sync/errgroup" "github.com/snowfork/go-substrate-rpc-client/v4/types" @@ -22,6 +22,7 @@ import ( type BeefyListener struct { config *SourceConfig + scheduleConfig *ScheduleConfig ethereumConn *ethereum.Connection beefyClientContract *contracts.BeefyClient relaychainConn *relaychain.Connection @@ -33,6 +34,7 @@ type BeefyListener struct { func NewBeefyListener( config *SourceConfig, + scheduleConfig *ScheduleConfig, ethereumConn *ethereum.Connection, relaychainConn *relaychain.Connection, parachainConnection *parachain.Connection, @@ -40,6 +42,7 @@ func NewBeefyListener( ) *BeefyListener { return &BeefyListener{ config: config, + scheduleConfig: scheduleConfig, ethereumConn: ethereumConn, relaychainConn: relaychainConn, parachainConnection: parachainConnection, @@ -82,125 +85,63 @@ func (li *BeefyListener) Start(ctx context.Context, eg *errgroup.Group) error { eg.Go(func() error { defer close(li.tasks) - // Subscribe NewMMRRoot event logs and fetch parachain message commitments - // since latest beefy block - beefyBlockNumber, _, err := li.fetchLatestBeefyBlock(ctx) - if err != nil { - return fmt.Errorf("fetch latest beefy block: %w", err) - } - - err = li.doScan(ctx, beefyBlockNumber) - if err != nil { - return fmt.Errorf("scan for sync tasks bounded by BEEFY block %v: %w", beefyBlockNumber, err) - } - - err = li.subscribeNewMMRRoots(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { + for { + select { + case <-ctx.Done(): return nil + // Add some randomness here in case one relayer is down and other relayers won't compete for + // that failed message at same time. + case <-time.After(time.Second*90 + time.Duration(rand.Intn(30))*time.Second): + beefyBlockNumber, _, err := li.fetchLatestBeefyBlock(ctx) + if err != nil { + return fmt.Errorf("fetch latest beefy block: %w", err) + } + + err = li.doScan(ctx, beefyBlockNumber) + if err != nil { + return fmt.Errorf("scan for sync tasks bounded by BEEFY block %v: %w", beefyBlockNumber, err) + } } - return err } - - return nil }) return nil } -func (li *BeefyListener) subscribeNewMMRRoots(ctx context.Context) error { - headers := make(chan *gethTypes.Header, 5) - - sub, err := li.ethereumConn.Client().SubscribeNewHead(ctx, headers) +func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) error { + tasks, err := li.scanner.Scan(ctx, beefyBlockNumber) if err != nil { - return fmt.Errorf("creating ethereum header subscription: %w", err) + return err } - defer sub.Unsubscribe() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-sub.Err(): - return fmt.Errorf("header subscription: %w", err) - case gethheader := <-headers: - blockNumber := gethheader.Number.Uint64() - contractEvents, err := li.queryBeefyClientEvents(ctx, blockNumber, &blockNumber) + if len(tasks) > 0 { + task := tasks[0] + paraNonce := (*task.MessageProofs)[0].Message.Nonce + if paraNonce%li.scheduleConfig.Num == li.scheduleConfig.ID { + // Task self assigned + err = li.addTask(ctx, task) if err != nil { - return fmt.Errorf("query NewMMRRoot event logs in block %v: %w", blockNumber, err) + return fmt.Errorf("add task for nonce %d: %w", paraNonce, err) } - - if len(contractEvents) > 0 { - log.Info(fmt.Sprintf("Found %d BeefyLightClient.NewMMRRoot events in block %d", len(contractEvents), blockNumber)) - // Only process the last emitted event in the block - event := contractEvents[len(contractEvents)-1] - log.WithFields(log.Fields{ - "beefyBlockNumber": event.BlockNumber, - "ethereumBlockNumber": event.Raw.BlockNumber, - "ethereumTxHash": event.Raw.TxHash.Hex(), - }).Info("Witnessed a new MMRRoot event") - - err = li.doScan(ctx, event.BlockNumber) + log.Info(fmt.Sprintf("nonce %d self assigned to relay(%d)", paraNonce, li.scheduleConfig.ID)) + } else { + // Task wait for picked up by another relayer, submit anyway if timeout + done, err := li.waitForTask(ctx, task) + if err != nil { + return fmt.Errorf("wait task for nonce %d: %w", paraNonce, err) + } + if !done { + err = li.addTask(ctx, task) if err != nil { - return fmt.Errorf("scan for sync tasks bounded by BEEFY block %v: %w", event.BlockNumber, err) + return fmt.Errorf("add task for nonce %d: %w", paraNonce, err) } + log.Info(fmt.Sprintf("nonce %d timeout but picked up by relay(%d)", paraNonce, li.scheduleConfig.ID)) } } } -} - -func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) error { - tasks, err := li.scanner.Scan(ctx, beefyBlockNumber) - if err != nil { - return err - } - - for _, task := range tasks { - // do final proof generation right before sending. The proof needs to be fresh. - task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) - if err != nil { - return err - } - select { - case <-ctx.Done(): - return ctx.Err() - case li.tasks <- task: - log.Info("Beefy Listener emitted new task") - } - } return nil } -// queryBeefyClientEvents queries ContractNewMMRRoot events from the BeefyClient contract -func (li *BeefyListener) queryBeefyClientEvents( - ctx context.Context, start uint64, - end *uint64, -) ([]*contracts.BeefyClientNewMMRRoot, error) { - var events []*contracts.BeefyClientNewMMRRoot - filterOps := bind.FilterOpts{Start: start, End: end, Context: ctx} - - iter, err := li.beefyClientContract.FilterNewMMRRoot(&filterOps) - if err != nil { - return nil, err - } - - for { - more := iter.Next() - if !more { - err = iter.Error() - if err != nil { - return nil, err - } - break - } - - events = append(events, iter.Event) - } - - return events, nil -} - // Fetch the latest verified beefy block number and hash from Ethereum func (li *BeefyListener) fetchLatestBeefyBlock(ctx context.Context) (uint64, types.Hash, error) { number, err := li.beefyClientContract.LatestBeefyBlock(&bind.CallOpts{ @@ -285,3 +226,39 @@ func (li *BeefyListener) generateProof(ctx context.Context, input *ProofInput, h return &output, nil } + +func (li *BeefyListener) addTask(ctx context.Context, task *Task) (err error) { + task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case li.tasks <- task: + log.Info("Beefy Listener emitted new task") + } + return nil +} + +func (li *BeefyListener) waitForTask(ctx context.Context, task *Task) (bool, error) { + paraNonce := (*task.MessageProofs)[0].Message.Nonce + log.Info(fmt.Sprintf("waiting for nonce %d to be picked up by another relayer", paraNonce)) + cnt := 0 + for { + ethInboundNonce, err := li.scanner.findLatestNonce(ctx) + if err != nil { + return false, err + } + if ethInboundNonce >= paraNonce { + log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce)) + return true, nil + } + time.Sleep(5 * time.Second) + if cnt == 12 { + break + } + cnt++ + } + return false, nil +} diff --git a/relayer/relays/parachain/config.go b/relayer/relays/parachain/config.go index fcd8a32e4..a0f881831 100644 --- a/relayer/relays/parachain/config.go +++ b/relayer/relays/parachain/config.go @@ -1,13 +1,16 @@ package parachain import ( + "errors" "fmt" + "github.com/snowfork/snowbridge/relayer/config" ) type Config struct { - Source SourceConfig `mapstructure:"source"` - Sink SinkConfig `mapstructure:"sink"` + Source SourceConfig `mapstructure:"source"` + Sink SinkConfig `mapstructure:"sink"` + Schedule ScheduleConfig `mapstructure:"schedule"` } type SourceConfig struct { @@ -32,6 +35,23 @@ type SinkContractsConfig struct { Gateway string `mapstructure:"Gateway"` } +type ScheduleConfig struct { + // ID of current relayer, starting from 0 + ID uint64 `mapstructure:"id"` + // Number of total count of all relayers + Num uint64 `mapstructure:"num"` +} + +func (r ScheduleConfig) Validate() error { + if r.Num < 1 { + return errors.New("Number of relayer is not set") + } + if r.ID >= r.Num { + return errors.New("ID of the Number of relayer is not set") + } + return nil +} + type ChannelID [32]byte func (c Config) Validate() error { @@ -66,5 +86,12 @@ func (c Config) Validate() error { if c.Sink.Contracts.Gateway == "" { return fmt.Errorf("sink contracts setting [Gateway] is not set") } + + // Relay + err = c.Schedule.Validate() + if err != nil { + return fmt.Errorf("relay config: %w", err) + } + return nil } diff --git a/relayer/relays/parachain/main.go b/relayer/relays/parachain/main.go index 50bb95dcd..5ce1c1b9c 100644 --- a/relayer/relays/parachain/main.go +++ b/relayer/relays/parachain/main.go @@ -47,6 +47,7 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) { beefyListener := NewBeefyListener( &config.Source, + &config.Schedule, ethereumConnBeefy, relaychainConn, parachainConn, @@ -97,5 +98,7 @@ func (relay *Relay) Start(ctx context.Context, eg *errgroup.Group) error { return err } + log.Info("Current relay's ID:", relay.config.Schedule.ID) + return nil } diff --git a/relayer/relays/parachain/scanner.go b/relayer/relays/parachain/scanner.go index b71687309..b884ed7d0 100644 --- a/relayer/relays/parachain/scanner.go +++ b/relayer/relays/parachain/scanner.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/snowfork/go-substrate-rpc-client/v4/scale" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -71,30 +72,13 @@ func (s *Scanner) findTasks( paraHash types.Hash, ) ([]*Task, error) { // Fetch latest nonce in ethereum gateway - gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway) - gatewayContract, err := contracts.NewGateway( - gatewayAddress, - s.ethConn.Client(), - ) - if err != nil { - return nil, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err) - } - - options := bind.CallOpts{ - Pending: true, - Context: ctx, - } - ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID) - if err != nil { - return nil, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err) - } + ethInboundNonce, err := s.findLatestNonce(ctx) log.WithFields(log.Fields{ "nonce": ethInboundNonce, "channelID": s.config.ChannelID, }).Info("Checked latest nonce delivered to ethereum gateway") // Fetch latest nonce in parachain outbound queue - paraNonceKey, err := types.CreateStorageKey(s.paraConn.Metadata(), "EthereumOutboundQueue", "Nonce", s.config.ChannelID[:], nil) if err != nil { return nil, fmt.Errorf("create storage key for parachain outbound queue nonce with channelID '%v': %w", s.config.ChannelID, err) @@ -457,3 +441,25 @@ func fetchMessageProof( return MessageProof{Message: message, Proof: proof}, nil } + +func (s *Scanner) findLatestNonce(ctx context.Context) (uint64, error) { + // Fetch latest nonce in ethereum gateway + gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway) + gatewayContract, err := contracts.NewGateway( + gatewayAddress, + s.ethConn.Client(), + ) + if err != nil { + return 0, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err) + } + + options := bind.CallOpts{ + Pending: true, + Context: ctx, + } + ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID) + if err != nil { + return 0, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err) + } + return ethInboundNonce, err +} diff --git a/web/packages/test/config/parachain-relay.json b/web/packages/test/config/parachain-relay.json index 8af630607..8598265f3 100644 --- a/web/packages/test/config/parachain-relay.json +++ b/web/packages/test/config/parachain-relay.json @@ -24,5 +24,9 @@ "contracts": { "Gateway": null } + }, + "schedule": { + "id": null, + "num": 3 } } diff --git a/web/packages/test/scripts/start-relayer.sh b/web/packages/test/scripts/start-relayer.sh index 5977de9f5..c4a24876c 100755 --- a/web/packages/test/scripts/start-relayer.sh +++ b/web/packages/test/scripts/start-relayer.sh @@ -54,7 +54,7 @@ config_relayer() { ' \ config/parachain-relay.json >$output_dir/parachain-relay-bridge-hub-02.json - # Configure parachain relay (asset hub) + # Configure parachain relay (asset hub)-0 jq \ --arg k1 "$(address_for GatewayProxy)" \ --arg k2 "$(address_for BeefyClient)" \ @@ -70,8 +70,49 @@ config_relayer() { | .sink.ethereum.endpoint = $eth_writer_endpoint | .sink.ethereum."gas-limit" = $eth_gas_limit | .source."channel-id" = $channelID + | .schedule.id = 0 ' \ - config/parachain-relay.json >$output_dir/parachain-relay-asset-hub.json + config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-0.json + + # Configure parachain relay (asset hub)-1 + jq \ + --arg k1 "$(address_for GatewayProxy)" \ + --arg k2 "$(address_for BeefyClient)" \ + --arg eth_endpoint_ws $eth_endpoint_ws \ + --arg eth_writer_endpoint $eth_writer_endpoint \ + --arg channelID $ASSET_HUB_CHANNEL_ID \ + --arg eth_gas_limit $eth_gas_limit \ + ' + .source.contracts.Gateway = $k1 + | .source.contracts.BeefyClient = $k2 + | .sink.contracts.Gateway = $k1 + | .source.ethereum.endpoint = $eth_endpoint_ws + | .sink.ethereum.endpoint = $eth_writer_endpoint + | .sink.ethereum."gas-limit" = $eth_gas_limit + | .source."channel-id" = $channelID + | .schedule.id = 1 + ' \ + config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-1.json + + # Configure parachain relay (asset hub)-2 + jq \ + --arg k1 "$(address_for GatewayProxy)" \ + --arg k2 "$(address_for BeefyClient)" \ + --arg eth_endpoint_ws $eth_endpoint_ws \ + --arg eth_writer_endpoint $eth_writer_endpoint \ + --arg channelID $ASSET_HUB_CHANNEL_ID \ + --arg eth_gas_limit $eth_gas_limit \ + ' + .source.contracts.Gateway = $k1 + | .source.contracts.BeefyClient = $k2 + | .sink.contracts.Gateway = $k1 + | .source.ethereum.endpoint = $eth_endpoint_ws + | .sink.ethereum.endpoint = $eth_writer_endpoint + | .sink.ethereum."gas-limit" = $eth_gas_limit + | .source."channel-id" = $channelID + | .schedule.id = 2 + ' \ + config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-2.json # Configure parachain relay (penpal) jq \ @@ -172,15 +213,41 @@ start_relayer() { done ) & - # Launch parachain relay for assethub + # Launch parachain relay 0 for assethub ( - : >"$output_dir"/parachain-relay-asset-hub.log + : >"$output_dir"/parachain-relay-asset-hub-0.log while :; do echo "Starting parachain relay (asset-hub) at $(date)" "${relay_bin}" run parachain \ - --config "$output_dir/parachain-relay-asset-hub.json" \ + --config "$output_dir/parachain-relay-asset-hub-0.json" \ --ethereum.private-key $parachain_relay_assethub_eth_key \ - >>"$output_dir"/parachain-relay-asset-hub.log 2>&1 || true + >>"$output_dir"/parachain-relay-asset-hub-0.log 2>&1 || true + sleep 20 + done + ) & + + # Launch parachain relay 1 for assethub + ( + : >"$output_dir"/parachain-relay-asset-hub-1.log + while :; do + echo "Starting parachain relay (asset-hub) at $(date)" + "${relay_bin}" run parachain \ + --config "$output_dir/parachain-relay-asset-hub-1.json" \ + --ethereum.private-key $parachain_relay_primary_gov_eth_key \ + >>"$output_dir"/parachain-relay-asset-hub-1.log 2>&1 || true + sleep 20 + done + ) & + + # Launch parachain relay 2 for assethub + ( + : >"$output_dir"/parachain-relay-asset-hub-2.log + while :; do + echo "Starting parachain relay (asset-hub) at $(date)" + "${relay_bin}" run parachain \ + --config "$output_dir/parachain-relay-asset-hub-2.json" \ + --ethereum.private-key $parachain_relay_secondary_gov_eth_key \ + >>"$output_dir"/parachain-relay-asset-hub-2.log 2>&1 || true sleep 20 done ) &