From d115b91890179c7f3531979a666ee32cddbadf3e Mon Sep 17 00:00:00 2001 From: faisal-link Date: Mon, 3 Nov 2025 17:07:54 +0400 Subject: [PATCH 01/15] update event indexer and txn indexer to insert txDigest as a hex string --- relayer/chainreader/indexer/events_indexer.go | 19 +++++++++++++++++-- .../indexer/transactions_indexer.go | 19 +++++++++++++++++-- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/relayer/chainreader/indexer/events_indexer.go b/relayer/chainreader/indexer/events_indexer.go index 9a6b3bdce..3bbe2d479 100644 --- a/relayer/chainreader/indexer/events_indexer.go +++ b/relayer/chainreader/indexer/events_indexer.go @@ -2,6 +2,7 @@ package indexer import ( "context" + "encoding/base64" "encoding/hex" "errors" "fmt" @@ -317,15 +318,29 @@ eventLoop: packageIdToInsert = *selector.InitialPackageId } + // Convert the txDigest to hex + txDigestHex := event.Id.TxDigest + if base64Bytes, err := base64.StdEncoding.DecodeString(txDigestHex); err == nil { + hexTxId := hex.EncodeToString(base64Bytes) + txDigestHex = "0x" + hexTxId + } + + blockHashBytes, err := base64.StdEncoding.DecodeString(block.TxDigest) + if err != nil { + eIndexer.logger.Errorw("Failed to decode block hash", "error", err) + // fallback + blockHashBytes = []byte(block.TxDigest) + } + // Convert event to database record record := database.EventRecord{ EventAccountAddress: packageIdToInsert, EventHandle: eventHandle, EventOffset: offset, - TxDigest: event.Id.TxDigest, + TxDigest: txDigestHex, BlockVersion: 0, BlockHeight: fmt.Sprintf("%d", block.Height), - BlockHash: []byte(block.TxDigest), + BlockHash: blockHashBytes, // Sui returns block.Timestamp in ms; convert to seconds for consistency with CCIP readers. BlockTimestamp: block.Timestamp / 1000, Data: normalizedData.(map[string]any), diff --git a/relayer/chainreader/indexer/transactions_indexer.go b/relayer/chainreader/indexer/transactions_indexer.go index f3ef64bc5..6dfff3dcc 100644 --- a/relayer/chainreader/indexer/transactions_indexer.go +++ b/relayer/chainreader/indexer/transactions_indexer.go @@ -2,6 +2,7 @@ package indexer import ( "context" + "encoding/base64" "encoding/hex" "errors" "fmt" @@ -431,13 +432,27 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con continue } + // Convert the txDigest to hex + txDigestHex := transactionRecord.Digest + if base64Bytes, err := base64.StdEncoding.DecodeString(txDigestHex); err == nil { + hexTxId := hex.EncodeToString(base64Bytes) + txDigestHex = "0x" + hexTxId + } + + blockHashBytes, err := base64.StdEncoding.DecodeString(checkpointResponse.Digest) + if err != nil { + tIndexer.logger.Errorw("Failed to decode block hash", "error", err) + // fallback + blockHashBytes = []byte(checkpointResponse.Digest) + } + record := database.EventRecord{ EventAccountAddress: eventAccountAddress, EventHandle: eventHandle, EventOffset: 0, - TxDigest: transactionRecord.Digest, + TxDigest: txDigestHex, BlockHeight: checkpointResponse.SequenceNumber, - BlockHash: []byte(checkpointResponse.Digest), + BlockHash: blockHashBytes, BlockTimestamp: blockTimestamp, Data: executionStateChanged, } From 561186bcf94e6ce847aa79d02cf5be8a4aa2366c Mon Sep 17 00:00:00 2001 From: faisal-link Date: Fri, 21 Nov 2025 16:52:56 +0400 Subject: [PATCH 02/15] fix bug related to events cursor txDigest --- relayer/chainreader/indexer/events_indexer.go | 56 ++++++++----------- 1 file changed, 23 insertions(+), 33 deletions(-) diff --git a/relayer/chainreader/indexer/events_indexer.go b/relayer/chainreader/indexer/events_indexer.go index a338516c5..8442dbafa 100644 --- a/relayer/chainreader/indexer/events_indexer.go +++ b/relayer/chainreader/indexer/events_indexer.go @@ -6,7 +6,6 @@ import ( "encoding/hex" "errors" "fmt" - "reflect" "strconv" "strings" "sync" @@ -19,7 +18,6 @@ import ( "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database" "github.com/smartcontractkit/chainlink-sui/relayer/client" - "github.com/smartcontractkit/chainlink-sui/relayer/codec" ) type EventsIndexer struct { @@ -205,30 +203,6 @@ func convertMapKeysToCamelCaseWithPath(input any, path string) any { return input } -func convertBytesToHex(input any) any { - kind := reflect.ValueOf(input).Kind() - - switch kind { - case reflect.Map: - result := make(map[string]any) - for k, v := range input.(map[string]any) { - result[k] = convertBytesToHex(v) - } - - return result - - case reflect.Slice: - bytes, err := codec.AnySliceToBytes(input.([]any)) - if err != nil { - return input - } - - return "0x" + hex.EncodeToString(bytes) - } - - return input -} - func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.EventSelector) error { if selector == nil { return fmt.Errorf("unspecified selector for SyncEvent call") @@ -253,15 +227,31 @@ func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.E cursor := eIndexer.lastProcessedCursors[eventHandle] eIndexer.cursorMutex.RUnlock() var totalCount uint64 - var err error + if cursor == nil { // attempt to get the latest event sync of the given type and use its data to construct a cursor - cursor, totalCount, err = eIndexer.db.GetLatestOffset(ctx, selector.Package, eventHandle) - if err != nil { - return err + dbOffsetCursor, dbTotalCount, offsetErr := eIndexer.db.GetLatestOffset(ctx, selector.Package, eventHandle) + if offsetErr != nil { + eIndexer.logger.Errorw("syncEvent: failed to get latest offset", "error", offsetErr) + return offsetErr } - eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle, "cursor", cursor) + if dbOffsetCursor != nil { + txDigestBytes, err := hex.DecodeString(strings.TrimPrefix(dbOffsetCursor.TxDigest, "0x")) + if err != nil { + eIndexer.logger.Errorw("syncEvent: failed to decode tx digest", "error", err) + return err + } + // convert the db offset cursor digest from hex (the format stored in the DB) to base64 (the format expected by the client) + cursor = &models.EventId{ + TxDigest: base64.StdEncoding.EncodeToString(txDigestBytes), + EventSeq: dbOffsetCursor.EventSeq, + } + + totalCount = dbTotalCount + } else { + eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle) + } } batchSize := uint(batchSizeRecords) @@ -332,7 +322,7 @@ eventLoop: // normalize the data, convert snake case to camel case normalizedData := convertMapKeysToCamelCase(event.ParsedJson) - // Convert the txDigest to hex + // Convert the txDigest to hex txDigestHex := event.Id.TxDigest if base64Bytes, err := base64.StdEncoding.DecodeString(txDigestHex); err == nil { hexTxId := hex.EncodeToString(base64Bytes) @@ -345,7 +335,7 @@ eventLoop: // fallback blockHashBytes = []byte(block.TxDigest) } - + // Convert event to database record record := database.EventRecord{ EventAccountAddress: selector.Package, From 077c1cddd218687a60c3753362394220672dec18 Mon Sep 17 00:00:00 2001 From: faisal-link Date: Fri, 21 Nov 2025 17:21:22 +0400 Subject: [PATCH 03/15] update events indexer test --- relayer/chainreader/indexer/events_indexer.go | 9 +++--- .../indexer/events_indexer_test.go | 30 +++++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/relayer/chainreader/indexer/events_indexer.go b/relayer/chainreader/indexer/events_indexer.go index 8442dbafa..8ebed9905 100644 --- a/relayer/chainreader/indexer/events_indexer.go +++ b/relayer/chainreader/indexer/events_indexer.go @@ -12,6 +12,7 @@ import ( "time" "github.com/block-vision/sui-go-sdk/models" + "github.com/mr-tron/base58" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -242,9 +243,9 @@ func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.E eIndexer.logger.Errorw("syncEvent: failed to decode tx digest", "error", err) return err } - // convert the db offset cursor digest from hex (the format stored in the DB) to base64 (the format expected by the client) + // convert the db offset cursor digest from hex (the format stored in the DB) to base58 (the format expected by the client) cursor = &models.EventId{ - TxDigest: base64.StdEncoding.EncodeToString(txDigestBytes), + TxDigest: base58.Encode(txDigestBytes), EventSeq: dbOffsetCursor.EventSeq, } @@ -324,8 +325,8 @@ eventLoop: // Convert the txDigest to hex txDigestHex := event.Id.TxDigest - if base64Bytes, err := base64.StdEncoding.DecodeString(txDigestHex); err == nil { - hexTxId := hex.EncodeToString(base64Bytes) + if base58Bytes, err := base58.Decode(txDigestHex); err == nil { + hexTxId := hex.EncodeToString(base58Bytes) txDigestHex = "0x" + hexTxId } diff --git a/relayer/chainreader/indexer/events_indexer_test.go b/relayer/chainreader/indexer/events_indexer_test.go index 7282dba65..aae0bbedb 100644 --- a/relayer/chainreader/indexer/events_indexer_test.go +++ b/relayer/chainreader/indexer/events_indexer_test.go @@ -520,6 +520,36 @@ func TestEventsIndexer(t *testing.T) { log.Infow("All concurrent access tests completed successfully") }) + t.Run("TestWithTimestamps", func(t *testing.T) { + log.Infow("Testing with timestamps") + + // Trigger some events + for i := 1; i <= 3; i++ { + createEvent(i) + } + + // Create a new event selector for timestamps + timestampEventSelector := &client.EventSelector{ + Package: packageId, + Module: "counter", + Event: "CounterIncremented", + } + + // Run sync to index events + err := indexer.SyncEvent(ctx, timestampEventSelector) + require.NoError(t, err) + + // Wait for events to be indexed + events := waitForEventCount(3, 60*time.Second) + require.GreaterOrEqual(t, len(events), 3) + + // Check that events are recorded with timestamps in seconds + for _, event := range events[:3] { + require.Greater(t, event.BlockTimestamp, uint64(0), "Event should have a timestamp") + require.Less(t, event.BlockTimestamp, uint64(time.Now().Unix()+1), "Event timestamp should be in the past") + } + }) + t.Run("TestRaceDetection", func(t *testing.T) { // Run with: go test -race -run TestEventsIndexer/TestRaceDetection log.Infow("Starting race detection test") From fbc7e545e50fd824ba1d473c24ec2bf8d87c081d Mon Sep 17 00:00:00 2001 From: faisal-link Date: Fri, 21 Nov 2025 18:16:59 +0400 Subject: [PATCH 04/15] tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index de2a306a7..56d83a25a 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/go-plugin v1.6.3 github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 + github.com/mr-tron/base58 v1.2.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pelletier/go-toml/v2 v2.2.4 github.com/pkg/errors v0.9.1 @@ -100,7 +101,6 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/mr-tron/base58 v1.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/oklog/run v1.1.0 // indirect github.com/pelletier/go-toml v1.9.5 // indirect From 562ce522b8957592566ec0577010e6701834b98a Mon Sep 17 00:00:00 2001 From: faisal-link Date: Fri, 21 Nov 2025 21:43:21 +0400 Subject: [PATCH 05/15] block hash base58 --- relayer/chainreader/indexer/events_indexer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/relayer/chainreader/indexer/events_indexer.go b/relayer/chainreader/indexer/events_indexer.go index 8ebed9905..4c9be1999 100644 --- a/relayer/chainreader/indexer/events_indexer.go +++ b/relayer/chainreader/indexer/events_indexer.go @@ -2,7 +2,6 @@ package indexer import ( "context" - "encoding/base64" "encoding/hex" "errors" "fmt" @@ -330,7 +329,7 @@ eventLoop: txDigestHex = "0x" + hexTxId } - blockHashBytes, err := base64.StdEncoding.DecodeString(block.TxDigest) + blockHashBytes, err := base58.Decode(block.TxDigest) if err != nil { eIndexer.logger.Errorw("Failed to decode block hash", "error", err) // fallback From 73507a119b3459fb0c7bf68c73d9029f69c1e34e Mon Sep 17 00:00:00 2001 From: Faisal Date: Wed, 26 Nov 2025 17:41:48 +0400 Subject: [PATCH 06/15] Misc fixes (#274) * allow maxRetries instead of maxRetries - 1 * avoid overwriting the module name in event selector when querying for events --- relayer/chainreader/reader/chainreader.go | 10 ++++-- .../reader/chainreader_testnet_test.go | 36 ++++++++++++++++--- relayer/txm/retry_manager.go | 2 +- 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/relayer/chainreader/reader/chainreader.go b/relayer/chainreader/reader/chainreader.go index f95e0a9e8..ca9296a50 100644 --- a/relayer/chainreader/reader/chainreader.go +++ b/relayer/chainreader/reader/chainreader.go @@ -542,13 +542,17 @@ func (s *suiChainReader) updateEventConfigs(ctx context.Context, contract pkgtyp return nil, err } - if moduleConfig.Name != "" { + if moduleConfig.Name != "" && eventConfig.Name == "" { eventConfig.Name = moduleConfig.Name } else { // If the module config has no name, use the module name from the event config moduleConfig.Name = moduleConfig.Events[filter.Key].Module } + if eventConfig.EventSelector.Module == "" { + eventConfig.EventSelector.Module = moduleConfig.Name + } + // only write contract address, rest will be handled during chainreader config eventConfig.Package = contract.Address @@ -556,7 +560,7 @@ func (s *suiChainReader) updateEventConfigs(ctx context.Context, contract pkgtyp // create a selector for the initial package ID selector := client.EventSelector{ Package: contract.Address, - Module: moduleConfig.Name, + Module: eventConfig.EventSelector.Module, Event: eventConfig.EventType, } @@ -977,7 +981,7 @@ func (s *suiChainReader) getEventConfig(moduleConfig *config.ChainReaderModule, // queryEvents queries events from the database instead of the Sui blockchain func (s *suiChainReader) queryEvents(ctx context.Context, eventConfig *config.ChainReaderEvent, expressions []query.Expression, limitAndSort query.LimitAndSort) ([]database.EventRecord, error) { // Create the event handle for database lookup - eventHandle := fmt.Sprintf("%s::%s::%s", eventConfig.Package, eventConfig.Name, eventConfig.EventType) + eventHandle := fmt.Sprintf("%s::%s::%s", eventConfig.Package, eventConfig.EventSelector.Module, eventConfig.EventType) s.logger.Debugw("Querying events from database", "address", eventConfig.Package, diff --git a/relayer/chainreader/reader/chainreader_testnet_test.go b/relayer/chainreader/reader/chainreader_testnet_test.go index 2e0cfd4ae..4c3cb6792 100644 --- a/relayer/chainreader/reader/chainreader_testnet_test.go +++ b/relayer/chainreader/reader/chainreader_testnet_test.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/sqltest" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/config" @@ -28,7 +29,7 @@ import ( func TestChainReaderTestnet(t *testing.T) { log := logger.Test(t) - rpcUrl := "https://sui-testnet-rpc.publicnode.com" // testutils.TestnetUrl + rpcUrl := testutils.TestnetUrl offrampContractName := "OffRamp" offrampPackageId := "0x50ff1c5a49f012f9360de2fa5065efe7185c22bbeee6254e68ef695b0b0d0f40" @@ -80,7 +81,7 @@ func TestChainReaderTestnet(t *testing.T) { Functions: map[string]*config.ChainReaderFunction{}, }, burnMintTokenPoolContractName: { - Name: "burn_mint_token_pool", + Name: "token_pool", Functions: map[string]*config.ChainReaderFunction{ "get_token": { Name: "get_token", @@ -180,12 +181,21 @@ func TestChainReaderTestnet(t *testing.T) { }, }, }, - Events: map[string]*config.ChainReaderEvent{}, + Events: map[string]*config.ChainReaderEvent{ + "released_or_minted": { + Name: "released_or_minted", + EventType: "ReleasedOrMinted", + EventSelector: client.EventFilterByMoveEventModule{ + Module: "token_pool", + Event: "ReleasedOrMinted", + }, + }, + }, }, }, } - db := sqltest.NewNoOpDataSource() + db := sqltest.NewDB(t, os.Getenv("TEST_DB_URL")) // Create the indexers txnIndexer := indexer.NewTransactionsIndexer( @@ -369,4 +379,22 @@ func TestChainReaderTestnet(t *testing.T) { log.Infof("Completed %d requests, %d errors", processedCount, errorCount) }) + + t.Run("token pool events", func(t *testing.T) { + var retReleasedOrMinted map[string]any + + sequences, err := chainReader.QueryKey(ctx, types.BoundContract{ + Name: burnMintTokenPoolContractName, + Address: burnMintTokenPoolPackageId, + }, query.KeyFilter{ + Key: "released_or_minted", + }, query.LimitAndSort{ + Limit: query.Limit{ + Count: 100, + }, + }, &retReleasedOrMinted) + + testutils.PrettyPrintDebug(log, sequences, "sequences") + require.NoError(t, err) + }) } diff --git a/relayer/txm/retry_manager.go b/relayer/txm/retry_manager.go index 5cc8dae05..76dd6233e 100644 --- a/relayer/txm/retry_manager.go +++ b/relayer/txm/retry_manager.go @@ -121,7 +121,7 @@ func defaultRetryStrategy(tx *SuiTx, txErrorMsg string, maxRetries int) (bool, R } // Check if the transaction has exceeded the number of retries allowed. - if tx.Attempt >= maxRetries { + if tx.Attempt > maxRetries { return false, NoRetry } From e6acbe66bc427ba08e1bbcfa9da1e17fdd2ae459 Mon Sep 17 00:00:00 2001 From: faisal-link Date: Wed, 26 Nov 2025 18:09:42 +0400 Subject: [PATCH 07/15] update tests to skip if the DB url is not provided --- relayer/chainreader/reader/chainreader_testnet_test.go | 6 +++++- relayer/txm/retry_manager_test.go | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/relayer/chainreader/reader/chainreader_testnet_test.go b/relayer/chainreader/reader/chainreader_testnet_test.go index 4c3cb6792..e14a6ab2e 100644 --- a/relayer/chainreader/reader/chainreader_testnet_test.go +++ b/relayer/chainreader/reader/chainreader_testnet_test.go @@ -195,7 +195,11 @@ func TestChainReaderTestnet(t *testing.T) { }, } - db := sqltest.NewDB(t, os.Getenv("TEST_DB_URL")) + datastoreUrl := os.Getenv("TEST_DB_URL") + if datastoreUrl == "" { + t.Skip("Skipping persistent tests as TEST_DB_URL is not set in CI") + } + db := sqltest.NewDB(t, datastoreUrl) // Create the indexers txnIndexer := indexer.NewTransactionsIndexer( diff --git a/relayer/txm/retry_manager_test.go b/relayer/txm/retry_manager_test.go index 091f1b8a2..e9af5e28c 100644 --- a/relayer/txm/retry_manager_test.go +++ b/relayer/txm/retry_manager_test.go @@ -50,7 +50,7 @@ func TestCallbackRetryManager_IsRetryable_Scenarios(t *testing.T) { }, { name: "Exceeded max retries returns NoRetry", - txRetries: 3, + txRetries: 4, errMessage: "Transaction failed: GasPriceTooHigh", maxRetries: 3, expectedRetry: false, From da05211275261dfac25867bbd639cc9935f0ca1c Mon Sep 17 00:00:00 2001 From: faisal-link Date: Wed, 26 Nov 2025 18:49:51 +0400 Subject: [PATCH 08/15] update txm confirmer test --- relayer/txm/confirmer_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/relayer/txm/confirmer_test.go b/relayer/txm/confirmer_test.go index 86c63eab6..0f10a7a08 100644 --- a/relayer/txm/confirmer_test.go +++ b/relayer/txm/confirmer_test.go @@ -112,7 +112,7 @@ func TestConfirmerRoutine_GasBump(t *testing.T) { Payload: "payload", Signatures: []string{"signature"}, RequestType: "WaitForEffectsCert", - Attempt: 1, + Attempt: 3, State: txm.StateSubmitted, Digest: "test-digest", LastUpdatedAt: txm.GetCurrentUnixTimestamp(), @@ -132,12 +132,12 @@ func TestConfirmerRoutine_GasBump(t *testing.T) { } return updatedTx.State == txm.StateFailed - }, 5*time.Second, 100*time.Millisecond, "Transaction did not retry as expected") + }, 5*time.Second, 1000*time.Millisecond, "Transaction did not reach Failed state") // Check that the transaction was retried and the gas limit was updated. updatedTx, err := store.GetTransaction(txID) require.NoError(t, err) - require.Equal(t, 3, updatedTx.Attempt) + require.Equal(t, 5, updatedTx.Attempt) // 3 retries + 1 initial attempt + 1 failed attempt require.Equal(t, suierrors.ErrGasBudgetTooHigh, updatedTx.TxError) txmInstance.Close() From ea31c7cbd68ba9bf64809e0b5fe23393ca0897ad Mon Sep 17 00:00:00 2001 From: faisal-link Date: Wed, 26 Nov 2025 22:34:08 +0400 Subject: [PATCH 09/15] edit error logs --- relayer/client/ptb_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/client/ptb_client.go b/relayer/client/ptb_client.go index df602d24b..3a71d7108 100644 --- a/relayer/client/ptb_client.go +++ b/relayer/client/ptb_client.go @@ -500,7 +500,7 @@ func (c *PTBClient) readFunctionInternal(ctx context.Context, signerAddress stri response, err := c.client.SuiDevInspectTransactionBlock(ctx, devInspectReq) if err != nil || response.Effects.Status.Status != "success" { - return nil, fmt.Errorf("failed to read function: %w", err) + return nil, fmt.Errorf("failed to read function: %w (%s)", err, response.Effects.Status.Error) } c.log.Debugw("ReadFunction RPC response", "RPC response", response, "functionTag", fmt.Sprintf("%s::%s::%s", packageId, module, function)) From 9aa50807f32f35178e0e6d8552f45c817b30321d Mon Sep 17 00:00:00 2001 From: Faisal Date: Tue, 9 Dec 2025 02:21:43 +0400 Subject: [PATCH 10/15] Enhancement/allow override event offset (#295) * firedrill contracts test * add firedrill sanity check * extend configs to allow overriding event offset * add an offset override fallback for events * remove unnecessary unlock --- relayer/chainreader/config/config.go | 4 + relayer/chainreader/indexer/events_indexer.go | 31 ++- relayer/chainreader/reader/chainreader.go | 17 ++ .../reader/chainreader_firedrill_test.go | 237 ++++++++++++++++++ relayer/client/models.go | 3 - 5 files changed, 284 insertions(+), 8 deletions(-) create mode 100644 relayer/chainreader/reader/chainreader_firedrill_test.go diff --git a/relayer/chainreader/config/config.go b/relayer/chainreader/config/config.go index 72368e740..c1c349dd1 100644 --- a/relayer/chainreader/config/config.go +++ b/relayer/chainreader/config/config.go @@ -58,6 +58,10 @@ type ChainReaderEvent struct { // The expected event type (optional). When not provided, the event type is used as-is. ExpectedEventType any + + // A fallback for events selectors with no offset recorded in the DB and a starting point + // earlier than the pruning cutoff of the RPC + EventSelectorDefaultOffset *client.EventId } type EventsIndexerConfig struct { diff --git a/relayer/chainreader/indexer/events_indexer.go b/relayer/chainreader/indexer/events_indexer.go index 58326d2ed..665c7dc0f 100644 --- a/relayer/chainreader/indexer/events_indexer.go +++ b/relayer/chainreader/indexer/events_indexer.go @@ -2,6 +2,7 @@ package indexer import ( "context" + "encoding/hex" "errors" "fmt" "strconv" @@ -27,8 +28,9 @@ type EventsIndexer struct { syncTimeout time.Duration // Protected by configMutex - eventConfigurations []*client.EventSelector - configMutex sync.RWMutex + eventConfigurations []*client.EventSelector + eventOffsetOverrides map[string]client.EventId + configMutex sync.RWMutex // Protected by cursorMutex // a map of event handles to the last processed cursor @@ -41,6 +43,7 @@ type EventsIndexerApi interface { SyncAllEvents(ctx context.Context) error SyncEvent(ctx context.Context, selector *client.EventSelector) error AddEventSelector(ctx context.Context, selector *client.EventSelector) error + SetEventOffsetOverrides(ctx context.Context, offsetOverrides map[string]client.EventId) error Ready() error Close() error } @@ -209,6 +212,7 @@ func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.E return fmt.Errorf("unspecified selector for SyncEvent call") } + eventKey := fmt.Sprintf("%s::%s", selector.Module, selector.Event) eventHandle := fmt.Sprintf("%s::%s::%s", selector.Package, selector.Module, selector.Event) // check if the event selector is already tracked, if not add it to the list @@ -233,7 +237,7 @@ func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.E // attempt to get the latest event sync of the given type and use its data to construct a cursor dbOffsetCursor, dbTotalCount, offsetErr := eIndexer.db.GetLatestOffset(ctx, selector.Package, eventHandle) if offsetErr != nil { - eIndexer.cursorMutex.RUnlock() + eIndexer.cursorMutex.RUnlock() eIndexer.logger.Errorw("syncEvent: failed to get latest offset", "error", offsetErr) return offsetErr } @@ -241,7 +245,8 @@ func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.E if dbOffsetCursor != nil { txDigestBytes, err := hex.DecodeString(strings.TrimPrefix(dbOffsetCursor.TxDigest, "0x")) if err != nil { - eIndexer.logger.Errorw("syncEvent: failed to decode tx digest", "error", err) + eIndexer.cursorMutex.RUnlock() + eIndexer.logger.Errorw("syncEvent: failed to decode tx digest", "error", err, "txDigest", dbOffsetCursor.TxDigest) return err } // convert the db offset cursor digest from hex (the format stored in the DB) to base58 (the format expected by the client) @@ -252,7 +257,16 @@ func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.E totalCount = dbTotalCount } else { - eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle) + eIndexer.configMutex.RLock() + if override, ok := eIndexer.eventOffsetOverrides[eventKey]; ok { + cursor = &models.EventId{ + TxDigest: override.TxDigest, + EventSeq: override.EventSeq, + } + } else { + eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle) + } + eIndexer.configMutex.RUnlock() } } @@ -440,6 +454,13 @@ func (eIndexer *EventsIndexer) AddEventSelector(ctx context.Context, selector *c return nil } +func (eIndexer *EventsIndexer) SetEventOffsetOverrides(ctx context.Context, offsetOverrides map[string]client.EventId) error { + eIndexer.configMutex.Lock() + defer eIndexer.configMutex.Unlock() + eIndexer.eventOffsetOverrides = offsetOverrides + return nil +} + // IsEventSelectorAdded checks if a specific event selector has already been included in the list of events to sync func (eIndexer *EventsIndexer) isEventSelectorAdded(eConfig client.EventSelector) bool { eIndexer.configMutex.RLock() diff --git a/relayer/chainreader/reader/chainreader.go b/relayer/chainreader/reader/chainreader.go index d313a6037..d6984f2b1 100644 --- a/relayer/chainreader/reader/chainreader.go +++ b/relayer/chainreader/reader/chainreader.go @@ -128,6 +128,23 @@ func (s *suiChainReader) HealthReport() map[string]error { func (s *suiChainReader) Start(ctx context.Context) error { return s.starter.StartOnce(s.Name(), func() error { + // set the event offset overrides for the event indexer if any + offsetOverrides := make(map[string]client.EventId) + + for _, moduleConfig := range s.config.Modules { + for _, eventConfig := range moduleConfig.Events { + if eventConfig.EventSelectorDefaultOffset != nil { + key := fmt.Sprintf("%s::%s", eventConfig.EventSelector.Module, eventConfig.EventSelector.Event) + offsetOverrides[key] = *eventConfig.EventSelectorDefaultOffset + } + } + } + + if len(offsetOverrides) > 0 { + // ignore this error to avoid blocking the start of the chain reader + _ = s.indexer.GetEventIndexer().SetEventOffsetOverrides(ctx, offsetOverrides) + } + return nil }) } diff --git a/relayer/chainreader/reader/chainreader_firedrill_test.go b/relayer/chainreader/reader/chainreader_firedrill_test.go new file mode 100644 index 000000000..1ca322d60 --- /dev/null +++ b/relayer/chainreader/reader/chainreader_firedrill_test.go @@ -0,0 +1,237 @@ +//go:build integration + +package reader + +import ( + "context" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/sqltest" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + + "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/config" + "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/indexer" + "github.com/smartcontractkit/chainlink-sui/relayer/client" + "github.com/smartcontractkit/chainlink-sui/relayer/codec" + "github.com/smartcontractkit/chainlink-sui/relayer/testutils" +) + +func TestChainReaderFiredrill(t *testing.T) { + log := logger.Test(t) + rpcUrl := "https://sui-testnet-rpc.publicnode.com" // testutils.TestnetUrl + + offrampContractName := "OffRamp" + offrampPackageId := "0xe2d83f15195acd57b798610d167dc241fcb30b5cc3808af497c33d97512b7970" + + onrampContractName := "OnRamp" + onrampPackageId := "0x30e087460af8a8aacccbc218aa358cdcde8d43faf61ec0638d71108e276e2f1d" + + rmnRemoteContractName := "RMNRemote" + ccipPackageAddress := "0x324c505732fadfa5ac2877cdca28a6be28910009e100de8e6e16eb33ed1218dc" + + t.Helper() + ctx := context.Background() + + keystoreInstance := testutils.NewTestKeystore(t) + accountAddress, _ := testutils.GetAccountAndKeyFromSui(keystoreInstance) + + clientMaxConcurrentRequests := int64(10) + if envClientMaxConcurrentRequests := os.Getenv("MAX_CONCURRENT_REQUESTS"); envClientMaxConcurrentRequests != "" { + if parsed, err := strconv.Atoi(envClientMaxConcurrentRequests); err == nil { + clientMaxConcurrentRequests = int64(parsed) + } + } + relayerClient, clientErr := client.NewPTBClient(log, rpcUrl, nil, 120*time.Second, keystoreInstance, clientMaxConcurrentRequests, "WaitForLocalExecution") + require.NoError(t, clientErr) + + type SourceChainConfig struct { + Router string + IsEnabled bool + MinSeqNr uint64 + IsRMNVerificationDisabled bool + OnRamp string + } + + type SourceChainConfigSetEvent struct { + SourceChainSelector uint64 + SourceChainConfig SourceChainConfig + } + + ccipObjectRefStatePointer := &codec.PointerTag{ + Module: "state_object", + PointerName: "CCIPObjectRefPointer", + FieldName: "ccip_object_id", + DerivationKey: "CCIPObjectRef", + } + + chainReaderConfig := config.ChainReaderConfig{ + IsLoopPlugin: false, + EventsIndexer: config.EventsIndexerConfig{ + PollingInterval: 10 * time.Second, + SyncTimeout: 10 * time.Second, + }, + TransactionsIndexer: config.TransactionsIndexerConfig{ + PollingInterval: 10 * time.Second, + SyncTimeout: 10 * time.Second, + }, + Modules: map[string]*config.ChainReaderModule{ + offrampContractName: { + Name: "offramp", + Functions: map[string]*config.ChainReaderFunction{}, + Events: map[string]*config.ChainReaderEvent{ + "SourceChainConfigSet": { + Name: "SourceChainConfigSet", + EventType: "SourceChainConfigSet", + EventSelector: client.EventSelector{ + Module: "offramp", + Event: "SourceChainConfigSet", + }, + ExpectedEventType: &SourceChainConfigSetEvent{}, + }, + }, + }, + rmnRemoteContractName: { + Name: "rmn_remote", + Functions: map[string]*config.ChainReaderFunction{ + "GetVersionedConfig": { + Name: "get_versioned_config", + SignerAddress: accountAddress, + Params: []codec.SuiFunctionParam{ + { + Name: "object_ref_id", + Type: "object_id", + PointerTag: ccipObjectRefStatePointer, + Required: true, + }, + }, + ResultTupleToStruct: []string{"version", "config"}, + }, + }, + Events: map[string]*config.ChainReaderEvent{}, + }, + onrampContractName: { + Name: "onramp", + Functions: map[string]*config.ChainReaderFunction{}, + Events: map[string]*config.ChainReaderEvent{ + "CCIPMessageSent": { + Name: "CCIPMessageSent", + EventType: "CCIPMessageSent", + EventSelector: client.EventSelector{ + Module: "onramp", + Event: "CCIPMessageSent", + }, + EventSelectorDefaultOffset: &client.EventId{ + TxDigest: "CpFQ8JsaHwTEuNLCfeJQopu3eM3ipViowkWmg23k4fNk", + EventSeq: "0", + }, + }, + }, + }, + }, + } + + datastoreUrl := os.Getenv("TEST_DB_URL") + if datastoreUrl == "" { + t.Skip("Skipping persistent tests as TEST_DB_URL is not set in CI") + } + db := sqltest.NewDB(t, datastoreUrl) + + // attempt to connect + _, err := db.Connx(ctx) + require.NoError(t, err) + + // Create the indexers + txnIndexer := indexer.NewTransactionsIndexer( + db, + log, + relayerClient, + chainReaderConfig.TransactionsIndexer.PollingInterval, + chainReaderConfig.TransactionsIndexer.SyncTimeout, + // start without any configs, they will be set when ChainReader is initialized and gets a reference + // to the transaction indexer to avoid having to reading ChainReader configs here as well + map[string]*config.ChainReaderEvent{}, + ) + evIndexer := indexer.NewEventIndexer( + db, + log, + relayerClient, + // start without any selectors, they will be added during .Bind() calls on ChainReader + []*client.EventSelector{}, + chainReaderConfig.EventsIndexer.PollingInterval, + chainReaderConfig.EventsIndexer.SyncTimeout, + ) + indexerInstance := indexer.NewIndexer( + log, + evIndexer, + txnIndexer, + ) + + // ChainReader in non-loop mode + chainReader, err := NewChainReader(ctx, log, relayerClient, chainReaderConfig, db, indexerInstance) + require.NoError(t, err) + + err = chainReader.Bind(context.Background(), []types.BoundContract{{ + Name: offrampContractName, + Address: offrampPackageId, + }, { + Name: rmnRemoteContractName, + Address: ccipPackageAddress, + }, { + Name: onrampContractName, + Address: onrampPackageId, + }}) + require.NoError(t, err) + + err = chainReader.Start(ctx) + require.NoError(t, err) + defer chainReader.Close() + + err = indexerInstance.Start(ctx) + require.NoError(t, err) + defer indexerInstance.Close() + + t.Run("sanity check for SourceChainConfigSet event", func(t *testing.T) { + t.Skip("skipping SourceChainConfigSet event test") + var seqType any + events, err := chainReader.QueryKey(ctx, types.BoundContract{ + Name: offrampContractName, + Address: offrampPackageId, + }, query.KeyFilter{Key: "SourceChainConfigSet"}, query.LimitAndSort{}, &seqType) + + require.NoError(t, err) + testutils.PrettyPrintDebug(log, events, "events") + }) + + t.Run("sanity check for GetVersionedConfig function", func(t *testing.T) { + t.Skip("skipping GetVersionedConfig function test") + var expectedVersionedConfig map[string]any + err := chainReader.GetLatestValue( + context.Background(), + strings.Join([]string{ccipPackageAddress, rmnRemoteContractName, "GetVersionedConfig"}, "-"), + primitives.Finalized, + map[string]any{}, + &expectedVersionedConfig, + ) + require.NoError(t, err) + testutils.PrettyPrintDebug(log, expectedVersionedConfig, "expectedVersionedConfig") + }) + + t.Run("sanity check for CCIPMessageSent event with offset override from configs", func(t *testing.T) { + var ccipMessageSent any + sequences, err := chainReader.QueryKey(ctx, types.BoundContract{ + Name: onrampContractName, + Address: onrampPackageId, + }, query.KeyFilter{Key: "CCIPMessageSent"}, query.LimitAndSort{}, &ccipMessageSent) + require.NoError(t, err) + testutils.PrettyPrintDebug(log, sequences, "sequences") + }) +} diff --git a/relayer/client/models.go b/relayer/client/models.go index 58cc5ae90..ebe16d441 100644 --- a/relayer/client/models.go +++ b/relayer/client/models.go @@ -73,9 +73,6 @@ type EventFilterByMoveEventModule struct { Package string `json:"package"` Module string `json:"module"` Event string `json:"event"` - // this is used to insert the event using the initial package ID event if the - // indexer is polling events based on the latest package ID after upgrades - InitialPackageId *string `json:"initialPackageId"` } // EventSelector is an alias for EventFilterByMoveEventModule From 2099af0b1370f04383e3c41863d7922931020bfa Mon Sep 17 00:00:00 2001 From: faisal-link Date: Wed, 10 Dec 2025 19:41:30 +0400 Subject: [PATCH 11/15] ensure base58 instead of hex when parsing block and tx digest in transaction indexer --- .../indexer/transactions_indexer.go | 16 ++++++++++--- .../reader/chainreader_testnet_test.go | 23 +++++++++++++++++++ 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/relayer/chainreader/indexer/transactions_indexer.go b/relayer/chainreader/indexer/transactions_indexer.go index 4d54f391e..fafc5ba29 100644 --- a/relayer/chainreader/indexer/transactions_indexer.go +++ b/relayer/chainreader/indexer/transactions_indexer.go @@ -2,7 +2,6 @@ package indexer import ( "context" - "encoding/base64" "encoding/hex" "errors" "fmt" @@ -12,6 +11,7 @@ import ( "time" "github.com/block-vision/sui-go-sdk/models" + "github.com/mr-tron/base58" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/types/query" @@ -231,6 +231,10 @@ func (tIndexer *TransactionsIndexer) SyncAllTransmittersTransactions(ctx context return nil } + println("\n\n------------------------------------------------------------------------------\n\n") + tIndexer.logger.Debugw("syncTransmittersTransactions start", "transmitters", transmitters) + println("------------------------------------------------------------------------------\n\n") + var batchSize uint64 = 50 var totalProcessed int @@ -267,6 +271,11 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con eventKey = tIndexer.executionEventKey ) + println("\n\n------------------------------------------------------------------------------\n\n") + println("syncTransmitterTransactions start") + println("transmitter", transmitter) + println("------------------------------------------------------------------------------\n\n") + cursor := tIndexer.transmitters[transmitter] totalProcessed := 0 @@ -359,6 +368,7 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con "Expected PTB command (_::offramp::init_execute) not found in commands of failed PTB originating from known transmitter", "transmitter", transmitter, "digest", transactionRecord.Digest, + "transactionRecord", transactionRecord, ) continue } @@ -476,12 +486,12 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con // Convert the txDigest to hex txDigestHex := transactionRecord.Digest - if base64Bytes, err := base64.StdEncoding.DecodeString(txDigestHex); err == nil { + if base64Bytes, err := base58.Decode(txDigestHex); err == nil { hexTxId := hex.EncodeToString(base64Bytes) txDigestHex = "0x" + hexTxId } - blockHashBytes, err := base64.StdEncoding.DecodeString(checkpointResponse.Digest) + blockHashBytes, err := base58.Decode(checkpointResponse.Digest) if err != nil { tIndexer.logger.Errorw("Failed to decode block hash", "error", err) // fallback diff --git a/relayer/chainreader/reader/chainreader_testnet_test.go b/relayer/chainreader/reader/chainreader_testnet_test.go index e14a6ab2e..26fd6552c 100644 --- a/relayer/chainreader/reader/chainreader_testnet_test.go +++ b/relayer/chainreader/reader/chainreader_testnet_test.go @@ -384,6 +384,29 @@ func TestChainReaderTestnet(t *testing.T) { log.Infof("Completed %d requests, %d errors", processedCount, errorCount) }) + t.Run("TransactionIndexer_ExecutionStateChanged_event", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, 1200*time.Second) + defer cancel() + + assert.Eventually(t, func() bool { + println("\n\n------------------------------------------------------------------------------\n\n") + + // events, err := chainReader.QueryKey(ctx, types.BoundContract{ + // Name: offrampContractName, + // Address: offrampPackageId, + // }, query.KeyFilter{Key: offrampExecutionStateChangedEventKey}, query.LimitAndSort{}, &map[string]any{}) + // require.NoError(t, err) + + events, err := dbStore.QueryEvents(ctx, offrampPackageId, offrampExecutionStateChangedEventKey, nil, query.LimitAndSort{}) + require.NoError(t, err) + + // testutils.PrettyPrintDebug(log, events, "found execution state changed events from DB") + // return len(events) > 0 + + return len(events) > 0 + }, 1200*time.Second, 60*time.Second) + }) + t.Run("token pool events", func(t *testing.T) { var retReleasedOrMinted map[string]any From 6c576ec66e4c63a08adaa7f97c6a5e5281b5c437 Mon Sep 17 00:00:00 2001 From: faisal-link Date: Wed, 10 Dec 2025 22:43:33 +0400 Subject: [PATCH 12/15] cleanup logs --- .../indexer/transactions_indexer.go | 7 +- .../reader/chainreader_testnet_test.go | 68 +++++++++++++------ 2 files changed, 49 insertions(+), 26 deletions(-) diff --git a/relayer/chainreader/indexer/transactions_indexer.go b/relayer/chainreader/indexer/transactions_indexer.go index fafc5ba29..8658dbff5 100644 --- a/relayer/chainreader/indexer/transactions_indexer.go +++ b/relayer/chainreader/indexer/transactions_indexer.go @@ -231,9 +231,7 @@ func (tIndexer *TransactionsIndexer) SyncAllTransmittersTransactions(ctx context return nil } - println("\n\n------------------------------------------------------------------------------\n\n") tIndexer.logger.Debugw("syncTransmittersTransactions start", "transmitters", transmitters) - println("------------------------------------------------------------------------------\n\n") var batchSize uint64 = 50 var totalProcessed int @@ -271,10 +269,7 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con eventKey = tIndexer.executionEventKey ) - println("\n\n------------------------------------------------------------------------------\n\n") - println("syncTransmitterTransactions start") - println("transmitter", transmitter) - println("------------------------------------------------------------------------------\n\n") + tIndexer.logger.Debugw("syncTransmitterTransactions start", "transmitter", transmitter) cursor := tIndexer.transmitters[transmitter] totalProcessed := 0 diff --git a/relayer/chainreader/reader/chainreader_testnet_test.go b/relayer/chainreader/reader/chainreader_testnet_test.go index 26fd6552c..cdba6e319 100644 --- a/relayer/chainreader/reader/chainreader_testnet_test.go +++ b/relayer/chainreader/reader/chainreader_testnet_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -21,6 +22,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/config" + "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database" "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/indexer" "github.com/smartcontractkit/chainlink-sui/relayer/client" "github.com/smartcontractkit/chainlink-sui/relayer/codec" @@ -32,7 +34,7 @@ func TestChainReaderTestnet(t *testing.T) { rpcUrl := testutils.TestnetUrl offrampContractName := "OffRamp" - offrampPackageId := "0x50ff1c5a49f012f9360de2fa5065efe7185c22bbeee6254e68ef695b0b0d0f40" + offrampPackageId := "0x01a0a22b2abacbd48e9a026c1661189a8ec5ce4942cba07017b63eaad0a205a4" tokenAdminRegistryContractName := "TokenAdminRegistry" tokenAdminRegistryPackageId := "0x9de8a33d158e26f0b51f199da8be1a22e9755510b705cfb88230b257187da733" @@ -64,12 +66,12 @@ func TestChainReaderTestnet(t *testing.T) { chainReaderConfig := config.ChainReaderConfig{ IsLoopPlugin: false, EventsIndexer: config.EventsIndexerConfig{ - PollingInterval: 10 * time.Second, - SyncTimeout: 10 * time.Second, + PollingInterval: 15 * time.Second, + SyncTimeout: 60 * time.Second, }, TransactionsIndexer: config.TransactionsIndexerConfig{ - PollingInterval: 10 * time.Second, - SyncTimeout: 10 * time.Second, + PollingInterval: 15 * time.Second, + SyncTimeout: 60 * time.Second, }, Modules: map[string]*config.ChainReaderModule{ tokenAdminRegistryContractName: { @@ -79,6 +81,20 @@ func TestChainReaderTestnet(t *testing.T) { offrampContractName: { Name: "offramp", Functions: map[string]*config.ChainReaderFunction{}, + Events: map[string]*config.ChainReaderEvent{ + "execution_state_changed": { + Name: "execution_state_changed", + EventType: "ExecutionStateChanged", + EventSelector: client.EventFilterByMoveEventModule{ + Module: "offramp", + Event: "ExecutionStateChanged", + }, + EventSelectorDefaultOffset: &client.EventId{ + TxDigest: "7KyXWWmJnX4u5aKr1ofvdh5Af5Vix6rnHnonk33CiDtV", + EventSeq: "0", + }, + }, + }, }, burnMintTokenPoolContractName: { Name: "token_pool", @@ -200,22 +216,29 @@ func TestChainReaderTestnet(t *testing.T) { t.Skip("Skipping persistent tests as TEST_DB_URL is not set in CI") } db := sqltest.NewDB(t, datastoreUrl) + dbStore := database.NewDBStore(db, log) + require.NoError(t, dbStore.EnsureSchema(ctx)) + indexerClient, clientErr := client.NewPTBClient(log, rpcUrl, nil, 120*time.Second, keystoreInstance, clientMaxConcurrentRequests, "WaitForLocalExecution") + require.NoError(t, clientErr) // Create the indexers txnIndexer := indexer.NewTransactionsIndexer( db, log, - relayerClient, + indexerClient, chainReaderConfig.TransactionsIndexer.PollingInterval, chainReaderConfig.TransactionsIndexer.SyncTimeout, // start without any configs, they will be set when ChainReader is initialized and gets a reference // to the transaction indexer to avoid having to reading ChainReader configs here as well map[string]*config.ChainReaderEvent{}, ) + + eventIndexerClient, clientErr := client.NewPTBClient(log, rpcUrl, nil, 120*time.Second, keystoreInstance, clientMaxConcurrentRequests, "WaitForLocalExecution") + require.NoError(t, clientErr) evIndexer := indexer.NewEventIndexer( db, log, - relayerClient, + eventIndexerClient, // start without any selectors, they will be added during .Bind() calls on ChainReader []*client.EventSelector{}, chainReaderConfig.EventsIndexer.PollingInterval, @@ -385,26 +408,31 @@ func TestChainReaderTestnet(t *testing.T) { }) t.Run("TransactionIndexer_ExecutionStateChanged_event", func(t *testing.T) { - ctx, cancel := context.WithTimeout(ctx, 1200*time.Second) - defer cancel() + t.Skip("Skipping TransactionIndexer_ExecutionStateChanged_event test") + indexerInstance.Start(ctx) + defer indexerInstance.Close() assert.Eventually(t, func() bool { println("\n\n------------------------------------------------------------------------------\n\n") - // events, err := chainReader.QueryKey(ctx, types.BoundContract{ - // Name: offrampContractName, - // Address: offrampPackageId, - // }, query.KeyFilter{Key: offrampExecutionStateChangedEventKey}, query.LimitAndSort{}, &map[string]any{}) - // require.NoError(t, err) - - events, err := dbStore.QueryEvents(ctx, offrampPackageId, offrampExecutionStateChangedEventKey, nil, query.LimitAndSort{}) + eventHandle := offrampPackageId + "::offramp::ExecutionStateChanged" + events, err := dbStore.QueryEvents(ctx, offrampPackageId, eventHandle, nil, query.LimitAndSort{ + Limit: query.Limit{ + Count: 100, + }, + }) require.NoError(t, err) - // testutils.PrettyPrintDebug(log, events, "found execution state changed events from DB") - // return len(events) > 0 + foundExecutionStateChangedDummyReceiver := false + for _, event := range events { + if event.Data["state"] == 3 { + foundExecutionStateChangedDummyReceiver = true + break + } + } - return len(events) > 0 - }, 1200*time.Second, 60*time.Second) + return foundExecutionStateChangedDummyReceiver + }, 45*60*time.Second, 60*time.Second) }) t.Run("token pool events", func(t *testing.T) { From 954a2fcd6801a30a75e9baee53d959eb50f6f492 Mon Sep 17 00:00:00 2001 From: faisal-link Date: Thu, 11 Dec 2025 19:22:04 +0400 Subject: [PATCH 13/15] search SourceChainConfigSet using strings instead of u64 for chain selectors --- relayer/chainreader/indexer/transactions_indexer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/chainreader/indexer/transactions_indexer.go b/relayer/chainreader/indexer/transactions_indexer.go index 8658dbff5..a4cf47fbc 100644 --- a/relayer/chainreader/indexer/transactions_indexer.go +++ b/relayer/chainreader/indexer/transactions_indexer.go @@ -616,7 +616,7 @@ func (tIndexer *TransactionsIndexer) getSourceChainConfig(ctx context.Context, s filter := []query.Expression{ query.Comparator(selector, - primitives.ValueComparator{Value: sourceChainSelector, Operator: primitives.Eq}, + primitives.ValueComparator{Value: strconv.FormatUint(sourceChainSelector, 10), Operator: primitives.Eq}, ), } From 97e49fe634cb83cb746a850c03546b470ae36316 Mon Sep 17 00:00:00 2001 From: faisal-link Date: Fri, 12 Dec 2025 18:33:40 +0400 Subject: [PATCH 14/15] add a database table to store transactions cursors when replaying failed transactions from transmitters --- relayer/chainreader/database/database.go | 25 ++++++++++++++++ .../chainreader/database/database_queries.go | 18 +++++++++++ relayer/chainreader/indexer/indexer.go | 2 +- .../indexer/transactions_indexer.go | 30 ++++++++++++++++--- 4 files changed, 70 insertions(+), 5 deletions(-) diff --git a/relayer/chainreader/database/database.go b/relayer/chainreader/database/database.go index 6aaa5a961..0804e19aa 100644 --- a/relayer/chainreader/database/database.go +++ b/relayer/chainreader/database/database.go @@ -49,6 +49,11 @@ func (store *DBStore) EnsureSchema(ctx context.Context) error { return fmt.Errorf("failed to create sui indexes: %w", err) } + _, err = store.ds.ExecContext(ctx, CreateTransmitterCursorsTable) + if err != nil { + return fmt.Errorf("failed to create sui.transmitter_cursors table: %w", err) + } + return nil } @@ -235,3 +240,23 @@ func operatorSQL(op primitives.ComparisonOperator) string { return "=" } } + +func (store *DBStore) GetTransmitterCursor(ctx context.Context, transmitter models.SuiAddress) (string, error) { + var cursor string + err := store.ds.QueryRowxContext(ctx, GetTransmitterCursor, transmitter).Scan(&cursor) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return "", nil + } + return "", fmt.Errorf("failed to get transmitter cursor: %w", err) + } + return cursor, nil +} + +func (store *DBStore) UpdateTransmitterCursor(ctx context.Context, transmitter models.SuiAddress, cursor string) error { + _, err := store.ds.ExecContext(ctx, UpdateTransmitterCursor, transmitter, cursor) + if err != nil { + return fmt.Errorf("failed to update transmitter cursor: %w", err) + } + return nil +} diff --git a/relayer/chainreader/database/database_queries.go b/relayer/chainreader/database/database_queries.go index 2cff80ce4..9d5bbc0a7 100644 --- a/relayer/chainreader/database/database_queries.go +++ b/relayer/chainreader/database/database_queries.go @@ -21,6 +21,13 @@ const ( ); ` + CreateTransmitterCursorsTable = ` + CREATE TABLE IF NOT EXISTS sui.transmitter_cursors ( + transmitter TEXT PRIMARY KEY, + cursor TEXT NOT NULL + ); + ` + CreateIndices = ` CREATE INDEX IF NOT EXISTS idx_events_account_handle_timestamp ON sui.events(event_account_address, event_handle, block_timestamp DESC); CREATE INDEX IF NOT EXISTS idx_events_offset ON sui.events(event_account_address, event_handle, event_offset); @@ -67,4 +74,15 @@ const ( FROM sui.events WHERE id = $1 ` + + GetTransmitterCursor = ` + SELECT cursor + FROM sui.transmitter_cursors + WHERE transmitter = $1 + ` + + UpdateTransmitterCursor = ` + INSERT INTO sui.transmitter_cursors (transmitter, cursor) VALUES ($1, $2) + ON CONFLICT (transmitter) DO UPDATE SET cursor = $2; + ` ) diff --git a/relayer/chainreader/indexer/indexer.go b/relayer/chainreader/indexer/indexer.go index 3b6818f9a..280a02308 100644 --- a/relayer/chainreader/indexer/indexer.go +++ b/relayer/chainreader/indexer/indexer.go @@ -40,7 +40,7 @@ func NewIndexer( transactionIndexer TransactionsIndexerApi, ) *Indexer { return &Indexer{ - log: logger.Named(l, "Indexers"), + log: logger.Named(l, "SuiIndexers"), eventsIndexer: eventsIndexer, eventsIndexerCancel: nil, transactionIndexer: transactionIndexer, diff --git a/relayer/chainreader/indexer/transactions_indexer.go b/relayer/chainreader/indexer/transactions_indexer.go index a4cf47fbc..67a3b7158 100644 --- a/relayer/chainreader/indexer/transactions_indexer.go +++ b/relayer/chainreader/indexer/transactions_indexer.go @@ -66,12 +66,13 @@ func NewTransactionsIndexer( syncTimeout time.Duration, eventConfigs map[string]*config.ChainReaderEvent, ) TransactionsIndexerApi { - dataStore := database.NewDBStore(db, lggr) + logInstance := logger.Named(lggr, "SuiTransactionsIndexer") + dataStore := database.NewDBStore(db, logInstance) return &TransactionsIndexer{ db: dataStore, client: sdkClient, - logger: lggr, + logger: logInstance, pollingInterval: pollingInterval, syncTimeout: syncTimeout, transmitters: make(map[models.SuiAddress]string), @@ -269,11 +270,11 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con eventKey = tIndexer.executionEventKey ) - tIndexer.logger.Debugw("syncTransmitterTransactions start", "transmitter", transmitter) - cursor := tIndexer.transmitters[transmitter] totalProcessed := 0 + tIndexer.logger.Debugw("syncTransmitterTransactions start", "transmitter", transmitter, "cursor", cursor) + eventAccountAddress, latestOfframpPackageId, err := tIndexer.getEventPackageIdFromConfig() if err != nil { return 0, fmt.Errorf("failed to get ExecutionStateChanged event config: %w", err) @@ -284,6 +285,21 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con case <-ctx.Done(): return totalProcessed, ctx.Err() default: + if cursor == "" { + // Get the cursor from the DB store + transmitterCursorFromDB, err := tIndexer.db.GetTransmitterCursor(ctx, transmitter) + if err != nil { + tIndexer.logger.Warnw("Failed to get transmitter cursor from DB store", "error", err) + } + // Attempt to check if a cursor exists in the DB store + if transmitterCursorFromDB != "" { + tIndexer.logger.Debugw("Found transmitter cursor in DB store", "transmitter", transmitter, "cursor", transmitterCursorFromDB) + cursor = transmitterCursorFromDB + } else { + tIndexer.logger.Debugw("No transmitter cursor found in DB store, starting fresh sync", "transmitter", transmitter) + } + } + queryResponse, err := tIndexer.client.QueryTransactions(ctx, string(transmitter), &cursor, &batchSize) if err != nil { return totalProcessed, fmt.Errorf("failed to fetch transactions for transmitter %s: %w", transmitter, err) @@ -297,6 +313,12 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con defer func() { // Update the cursor to the last transaction digest regardless of the code path below tIndexer.transmitters[transmitter] = lastDigest + + // Update the cursor in the DB store + err := tIndexer.db.UpdateTransmitterCursor(ctx, transmitter, lastDigest) + if err != nil { + tIndexer.logger.Errorw("Failed to update transmitter cursor in DB store", "error", err) + } }() var records []database.EventRecord From 7234f6035d03a67ed308e8962b394b9b71fd8196 Mon Sep 17 00:00:00 2001 From: faisal-link Date: Wed, 17 Dec 2025 21:29:17 +0400 Subject: [PATCH 15/15] disable firedrill sanity test --- relayer/chainreader/reader/chainreader_firedrill_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/relayer/chainreader/reader/chainreader_firedrill_test.go b/relayer/chainreader/reader/chainreader_firedrill_test.go index 1ca322d60..dce3038bb 100644 --- a/relayer/chainreader/reader/chainreader_firedrill_test.go +++ b/relayer/chainreader/reader/chainreader_firedrill_test.go @@ -26,6 +26,8 @@ import ( ) func TestChainReaderFiredrill(t *testing.T) { + t.Skip("skipping ChainReaderFiredrill test, this is used as a sanity check only and not to be included in CI") + log := logger.Test(t) rpcUrl := "https://sui-testnet-rpc.publicnode.com" // testutils.TestnetUrl