Skip to content
177 changes: 77 additions & 100 deletions relayer/relays/parachain/beefy-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package parachain

import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"golang.org/x/exp/rand"
"golang.org/x/sync/errgroup"

"github.com/snowfork/go-substrate-rpc-client/v4/types"
Expand All @@ -22,6 +22,7 @@ import (

type BeefyListener struct {
config *SourceConfig
scheduleConfig *ScheduleConfig
ethereumConn *ethereum.Connection
beefyClientContract *contracts.BeefyClient
relaychainConn *relaychain.Connection
Expand All @@ -33,13 +34,15 @@ type BeefyListener struct {

func NewBeefyListener(
config *SourceConfig,
scheduleConfig *ScheduleConfig,
ethereumConn *ethereum.Connection,
relaychainConn *relaychain.Connection,
parachainConnection *parachain.Connection,
tasks chan<- *Task,
) *BeefyListener {
return &BeefyListener{
config: config,
scheduleConfig: scheduleConfig,
ethereumConn: ethereumConn,
relaychainConn: relaychainConn,
parachainConnection: parachainConnection,
Expand Down Expand Up @@ -82,125 +85,63 @@ func (li *BeefyListener) Start(ctx context.Context, eg *errgroup.Group) error {
eg.Go(func() error {
defer close(li.tasks)

// Subscribe NewMMRRoot event logs and fetch parachain message commitments
// since latest beefy block
beefyBlockNumber, _, err := li.fetchLatestBeefyBlock(ctx)
if err != nil {
return fmt.Errorf("fetch latest beefy block: %w", err)
}

err = li.doScan(ctx, beefyBlockNumber)
if err != nil {
return fmt.Errorf("scan for sync tasks bounded by BEEFY block %v: %w", beefyBlockNumber, err)
}

err = li.subscribeNewMMRRoots(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
for {
select {
case <-ctx.Done():
return nil
// Add some randomness here in case one relayer is down and other relayers won't compete for
// that failed message at same time.
case <-time.After(time.Second*90 + time.Duration(rand.Intn(30))*time.Second):
beefyBlockNumber, _, err := li.fetchLatestBeefyBlock(ctx)
if err != nil {
return fmt.Errorf("fetch latest beefy block: %w", err)
}

err = li.doScan(ctx, beefyBlockNumber)
if err != nil {
return fmt.Errorf("scan for sync tasks bounded by BEEFY block %v: %w", beefyBlockNumber, err)
}
}
return err
}

return nil
})

return nil
}

func (li *BeefyListener) subscribeNewMMRRoots(ctx context.Context) error {
headers := make(chan *gethTypes.Header, 5)

sub, err := li.ethereumConn.Client().SubscribeNewHead(ctx, headers)
func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) error {
tasks, err := li.scanner.Scan(ctx, beefyBlockNumber)
if err != nil {
return fmt.Errorf("creating ethereum header subscription: %w", err)
return err
}
defer sub.Unsubscribe()

for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-sub.Err():
return fmt.Errorf("header subscription: %w", err)
case gethheader := <-headers:
blockNumber := gethheader.Number.Uint64()
contractEvents, err := li.queryBeefyClientEvents(ctx, blockNumber, &blockNumber)
if len(tasks) > 0 {
task := tasks[0]
paraNonce := (*task.MessageProofs)[0].Message.Nonce
if paraNonce%li.scheduleConfig.Num == li.scheduleConfig.ID {
// Task self assigned
err = li.addTask(ctx, task)
if err != nil {
return fmt.Errorf("query NewMMRRoot event logs in block %v: %w", blockNumber, err)
return fmt.Errorf("add task for nonce %d: %w", paraNonce, err)
}

if len(contractEvents) > 0 {
log.Info(fmt.Sprintf("Found %d BeefyLightClient.NewMMRRoot events in block %d", len(contractEvents), blockNumber))
// Only process the last emitted event in the block
event := contractEvents[len(contractEvents)-1]
log.WithFields(log.Fields{
"beefyBlockNumber": event.BlockNumber,
"ethereumBlockNumber": event.Raw.BlockNumber,
"ethereumTxHash": event.Raw.TxHash.Hex(),
}).Info("Witnessed a new MMRRoot event")

err = li.doScan(ctx, event.BlockNumber)
log.Info(fmt.Sprintf("nonce %d self assigned to relay(%d)", paraNonce, li.scheduleConfig.ID))
} else {
// Task wait for picked up by another relayer, submit anyway if timeout
done, err := li.waitForTask(ctx, task)
if err != nil {
return fmt.Errorf("wait task for nonce %d: %w", paraNonce, err)
}
if !done {
err = li.addTask(ctx, task)
if err != nil {
return fmt.Errorf("scan for sync tasks bounded by BEEFY block %v: %w", event.BlockNumber, err)
return fmt.Errorf("add task for nonce %d: %w", paraNonce, err)
}
log.Info(fmt.Sprintf("nonce %d timeout but picked up by relay(%d)", paraNonce, li.scheduleConfig.ID))
}
}
}
}

func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) error {
tasks, err := li.scanner.Scan(ctx, beefyBlockNumber)
if err != nil {
return err
}

for _, task := range tasks {
// do final proof generation right before sending. The proof needs to be fresh.
task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case li.tasks <- task:
log.Info("Beefy Listener emitted new task")
}
}

return nil
}

// queryBeefyClientEvents queries ContractNewMMRRoot events from the BeefyClient contract
func (li *BeefyListener) queryBeefyClientEvents(
ctx context.Context, start uint64,
end *uint64,
) ([]*contracts.BeefyClientNewMMRRoot, error) {
var events []*contracts.BeefyClientNewMMRRoot
filterOps := bind.FilterOpts{Start: start, End: end, Context: ctx}

iter, err := li.beefyClientContract.FilterNewMMRRoot(&filterOps)
if err != nil {
return nil, err
}

for {
more := iter.Next()
if !more {
err = iter.Error()
if err != nil {
return nil, err
}
break
}

events = append(events, iter.Event)
}

return events, nil
}

// Fetch the latest verified beefy block number and hash from Ethereum
func (li *BeefyListener) fetchLatestBeefyBlock(ctx context.Context) (uint64, types.Hash, error) {
number, err := li.beefyClientContract.LatestBeefyBlock(&bind.CallOpts{
Expand Down Expand Up @@ -285,3 +226,39 @@ func (li *BeefyListener) generateProof(ctx context.Context, input *ProofInput, h

return &output, nil
}

func (li *BeefyListener) addTask(ctx context.Context, task *Task) (err error) {
task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case li.tasks <- task:
log.Info("Beefy Listener emitted new task")
}
return nil
}

func (li *BeefyListener) waitForTask(ctx context.Context, task *Task) (bool, error) {
paraNonce := (*task.MessageProofs)[0].Message.Nonce
log.Info(fmt.Sprintf("waiting for nonce %d to be picked up by another relayer", paraNonce))
cnt := 0
for {
ethInboundNonce, err := li.scanner.findLatestNonce(ctx)
if err != nil {
return false, err
}
if ethInboundNonce >= paraNonce {
log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce))
return true, nil
}
time.Sleep(5 * time.Second)
if cnt == 12 {
break
}
cnt++
}
return false, nil
}
31 changes: 29 additions & 2 deletions relayer/relays/parachain/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package parachain

import (
"errors"
"fmt"

"github.com/snowfork/snowbridge/relayer/config"
)

type Config struct {
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
Schedule ScheduleConfig `mapstructure:"schedule"`
}

type SourceConfig struct {
Expand All @@ -32,6 +35,23 @@ type SinkContractsConfig struct {
Gateway string `mapstructure:"Gateway"`
}

type ScheduleConfig struct {
// ID of current relayer, starting from 0
ID uint64 `mapstructure:"id"`
// Number of total count of all relayers
Num uint64 `mapstructure:"num"`
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments needs for these fields please

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

func (r ScheduleConfig) Validate() error {
if r.Num < 1 {
return errors.New("Number of relayer is not set")
}
if r.ID >= r.Num {
return errors.New("ID of the Number of relayer is not set")
}
return nil
}

type ChannelID [32]byte

func (c Config) Validate() error {
Expand Down Expand Up @@ -66,5 +86,12 @@ func (c Config) Validate() error {
if c.Sink.Contracts.Gateway == "" {
return fmt.Errorf("sink contracts setting [Gateway] is not set")
}

// Relay
err = c.Schedule.Validate()
if err != nil {
return fmt.Errorf("relay config: %w", err)
}

return nil
}
3 changes: 3 additions & 0 deletions relayer/relays/parachain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) {

beefyListener := NewBeefyListener(
&config.Source,
&config.Schedule,
ethereumConnBeefy,
relaychainConn,
parachainConn,
Expand Down Expand Up @@ -97,5 +98,7 @@ func (relay *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
return err
}

log.Info("Current relay's ID:", relay.config.Schedule.ID)

return nil
}
42 changes: 24 additions & 18 deletions relayer/relays/parachain/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"

"github.com/snowfork/go-substrate-rpc-client/v4/scale"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -71,30 +72,13 @@ func (s *Scanner) findTasks(
paraHash types.Hash,
) ([]*Task, error) {
// Fetch latest nonce in ethereum gateway
gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway)
gatewayContract, err := contracts.NewGateway(
gatewayAddress,
s.ethConn.Client(),
)
if err != nil {
return nil, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err)
}

options := bind.CallOpts{
Pending: true,
Context: ctx,
}
ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID)
if err != nil {
return nil, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err)
}
ethInboundNonce, err := s.findLatestNonce(ctx)
log.WithFields(log.Fields{
"nonce": ethInboundNonce,
"channelID": s.config.ChannelID,
}).Info("Checked latest nonce delivered to ethereum gateway")

// Fetch latest nonce in parachain outbound queue

paraNonceKey, err := types.CreateStorageKey(s.paraConn.Metadata(), "EthereumOutboundQueue", "Nonce", s.config.ChannelID[:], nil)
if err != nil {
return nil, fmt.Errorf("create storage key for parachain outbound queue nonce with channelID '%v': %w", s.config.ChannelID, err)
Expand Down Expand Up @@ -457,3 +441,25 @@ func fetchMessageProof(

return MessageProof{Message: message, Proof: proof}, nil
}

func (s *Scanner) findLatestNonce(ctx context.Context) (uint64, error) {
// Fetch latest nonce in ethereum gateway
gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway)
gatewayContract, err := contracts.NewGateway(
gatewayAddress,
s.ethConn.Client(),
)
if err != nil {
return 0, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err)
}

options := bind.CallOpts{
Pending: true,
Context: ctx,
}
ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID)
if err != nil {
return 0, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err)
}
return ethInboundNonce, err
}
4 changes: 4 additions & 0 deletions web/packages/test/config/parachain-relay.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,9 @@
"contracts": {
"Gateway": null
}
},
"schedule": {
"id": null,
"num": 3
}
}
Loading