Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d115b91
update event indexer and txn indexer to insert txDigest as a hex string
faisal-chainlink Nov 3, 2025
501a3e8
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 4, 2025
2190478
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 4, 2025
6d6ea4f
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 5, 2025
c040282
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 6, 2025
849dd4b
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 10, 2025
201a9d0
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 10, 2025
fe4a097
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 13, 2025
fbb15a6
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 14, 2025
eb1146e
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 20, 2025
e1fa58c
Merge branch 'develop' into fix/tx-digest-hex
stackman27 Nov 20, 2025
94a7e43
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 21, 2025
561186b
fix bug related to events cursor txDigest
faisal-chainlink Nov 21, 2025
077c1cd
update events indexer test
faisal-chainlink Nov 21, 2025
fbc7e54
tidy
faisal-chainlink Nov 21, 2025
fe389f9
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 21, 2025
562ce52
block hash base58
faisal-chainlink Nov 21, 2025
faf6724
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 22, 2025
e604b89
Merge branch 'develop' into fix/tx-digest-hex
FelixFan1992 Nov 24, 2025
85f92cc
Merge branch 'develop' into fix/tx-digest-hex
FelixFan1992 Nov 24, 2025
b8610ee
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 26, 2025
73507a1
Misc fixes (#274)
faisal-chainlink Nov 26, 2025
e6acbe6
update tests to skip if the DB url is not provided
faisal-chainlink Nov 26, 2025
3da4dc1
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 26, 2025
da05211
update txm confirmer test
faisal-chainlink Nov 26, 2025
ea31c7c
edit error logs
faisal-chainlink Nov 26, 2025
7332ce3
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Nov 27, 2025
ce66b45
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Dec 3, 2025
f22aad5
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Dec 4, 2025
593ebb1
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Dec 5, 2025
d299b7f
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Dec 8, 2025
235d4e4
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Dec 8, 2025
9aa5080
Enhancement/allow override event offset (#295)
faisal-chainlink Dec 8, 2025
4b71a25
Merge branch 'develop' into fix/tx-digest-hex
faisal-chainlink Dec 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/go-plugin v1.6.3
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4
github.com/mr-tron/base58 v1.2.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pelletier/go-toml/v2 v2.2.4
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -100,7 +101,6 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/oklog/run v1.1.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
Expand Down
71 changes: 38 additions & 33 deletions relayer/chainreader/indexer/events_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ import (
"encoding/hex"
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"

"github.com/block-vision/sui-go-sdk/models"
"github.com/mr-tron/base58"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

"github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database"
"github.com/smartcontractkit/chainlink-sui/relayer/client"
"github.com/smartcontractkit/chainlink-sui/relayer/codec"
)

type EventsIndexer struct {
Expand Down Expand Up @@ -204,30 +203,6 @@ func convertMapKeysToCamelCaseWithPath(input any, path string) any {
return input
}

func convertBytesToHex(input any) any {
kind := reflect.ValueOf(input).Kind()

switch kind {
case reflect.Map:
result := make(map[string]any)
for k, v := range input.(map[string]any) {
result[k] = convertBytesToHex(v)
}

return result

case reflect.Slice:
bytes, err := codec.AnySliceToBytes(input.([]any))
if err != nil {
return input
}

return "0x" + hex.EncodeToString(bytes)
}

return input
}

func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.EventSelector) error {
if selector == nil {
return fmt.Errorf("unspecified selector for SyncEvent call")
Expand All @@ -252,15 +227,31 @@ func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.E
cursor := eIndexer.lastProcessedCursors[eventHandle]
eIndexer.cursorMutex.RUnlock()
var totalCount uint64
var err error

if cursor == nil {
// attempt to get the latest event sync of the given type and use its data to construct a cursor
cursor, totalCount, err = eIndexer.db.GetLatestOffset(ctx, selector.Package, eventHandle)
if err != nil {
return err
dbOffsetCursor, dbTotalCount, offsetErr := eIndexer.db.GetLatestOffset(ctx, selector.Package, eventHandle)
if offsetErr != nil {
eIndexer.logger.Errorw("syncEvent: failed to get latest offset", "error", offsetErr)
return offsetErr
}

eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle, "cursor", cursor)
if dbOffsetCursor != nil {
txDigestBytes, err := hex.DecodeString(strings.TrimPrefix(dbOffsetCursor.TxDigest, "0x"))
if err != nil {
eIndexer.logger.Errorw("syncEvent: failed to decode tx digest", "error", err)
return err
}
// convert the db offset cursor digest from hex (the format stored in the DB) to base58 (the format expected by the client)
cursor = &models.EventId{
TxDigest: base58.Encode(txDigestBytes),
EventSeq: dbOffsetCursor.EventSeq,
}

totalCount = dbTotalCount
} else {
eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle)
}
}

batchSize := uint(batchSizeRecords)
Expand Down Expand Up @@ -331,15 +322,29 @@ eventLoop:
// normalize the data, convert snake case to camel case
normalizedData := convertMapKeysToCamelCase(event.ParsedJson)

// Convert the txDigest to hex
txDigestHex := event.Id.TxDigest
if base58Bytes, err := base58.Decode(txDigestHex); err == nil {
hexTxId := hex.EncodeToString(base58Bytes)
txDigestHex = "0x" + hexTxId
}

blockHashBytes, err := base58.Decode(block.TxDigest)
if err != nil {
eIndexer.logger.Errorw("Failed to decode block hash", "error", err)
// fallback
blockHashBytes = []byte(block.TxDigest)
}

// Convert event to database record
record := database.EventRecord{
EventAccountAddress: selector.Package,
EventHandle: eventHandle,
EventOffset: offset,
TxDigest: event.Id.TxDigest,
TxDigest: txDigestHex,
BlockVersion: 0,
BlockHeight: fmt.Sprintf("%d", block.Height),
BlockHash: []byte(block.TxDigest),
BlockHash: blockHashBytes,
// Sui returns block.Timestamp in ms; convert to seconds for consistency with CCIP readers.
BlockTimestamp: block.Timestamp / 1000,
Data: normalizedData.(map[string]any),
Expand Down
30 changes: 30 additions & 0 deletions relayer/chainreader/indexer/events_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,36 @@ func TestEventsIndexer(t *testing.T) {
log.Infow("All concurrent access tests completed successfully")
})

t.Run("TestWithTimestamps", func(t *testing.T) {
log.Infow("Testing with timestamps")

// Trigger some events
for i := 1; i <= 3; i++ {
createEvent(i)
}

// Create a new event selector for timestamps
timestampEventSelector := &client.EventSelector{
Package: packageId,
Module: "counter",
Event: "CounterIncremented",
}

// Run sync to index events
err := indexer.SyncEvent(ctx, timestampEventSelector)
require.NoError(t, err)

// Wait for events to be indexed
events := waitForEventCount(3, 60*time.Second)
require.GreaterOrEqual(t, len(events), 3)

// Check that events are recorded with timestamps in seconds
for _, event := range events[:3] {
require.Greater(t, event.BlockTimestamp, uint64(0), "Event should have a timestamp")
require.Less(t, event.BlockTimestamp, uint64(time.Now().Unix()+1), "Event timestamp should be in the past")
}
})

t.Run("TestRaceDetection", func(t *testing.T) {
// Run with: go test -race -run TestEventsIndexer/TestRaceDetection
log.Infow("Starting race detection test")
Expand Down
19 changes: 17 additions & 2 deletions relayer/chainreader/indexer/transactions_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexer

import (
"context"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -449,13 +450,27 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
continue
}

// Convert the txDigest to hex
txDigestHex := transactionRecord.Digest
if base64Bytes, err := base64.StdEncoding.DecodeString(txDigestHex); err == nil {
hexTxId := hex.EncodeToString(base64Bytes)
txDigestHex = "0x" + hexTxId
}

blockHashBytes, err := base64.StdEncoding.DecodeString(checkpointResponse.Digest)
if err != nil {
tIndexer.logger.Errorw("Failed to decode block hash", "error", err)
// fallback
blockHashBytes = []byte(checkpointResponse.Digest)
}

record := database.EventRecord{
EventAccountAddress: eventAccountAddress,
EventHandle: eventHandle,
EventOffset: 0,
TxDigest: transactionRecord.Digest,
TxDigest: txDigestHex,
BlockHeight: checkpointResponse.SequenceNumber,
BlockHash: []byte(checkpointResponse.Digest),
BlockHash: blockHashBytes,
BlockTimestamp: blockTimestamp,
Data: executionStateChanged,
}
Expand Down
Loading