diff --git a/.github/actions/run-load-test/action.yaml b/.github/actions/run-load-test/action.yaml index 0d3039f63..84f0a098f 100644 --- a/.github/actions/run-load-test/action.yaml +++ b/.github/actions/run-load-test/action.yaml @@ -91,12 +91,15 @@ runs: JD_IMAGE: ${{ inputs.jd-image }} run: | cd cmd/ccv && go install . && cd - - ccv u ${{ inputs.env_name }} && ccv obs up + ccv u ${{ inputs.env_name }}.toml && ccv obs up --mode full - name: Run load tests id: load_test shell: bash working-directory: build/devenv/tests/e2e + env: + LOAD_TEST_OUT_FILE: ../../${{ inputs.env_name }}-out.toml + PROM_URL: http://localhost:9099 run: | set -o pipefail LOAD_TEST_OUT_FILE=../../${{ inputs.env_name_out }} go test -v -timeout ${{ inputs.timeout }} -count=1 -run 'TestE2ELoad/${{ inputs.subtest }}' diff --git a/.github/workflows/test-load-nightly.yaml b/.github/workflows/test-load-nightly.yaml index 0012d8bae..fa328121f 100644 --- a/.github/workflows/test-load-nightly.yaml +++ b/.github/workflows/test-load-nightly.yaml @@ -21,6 +21,10 @@ jobs: test: - subtest: rpc_latency timeout: 1h10m + - subtest: gas + timeout: 10m + - subtest: burst + timeout: 10m steps: - name: Enable S3 Cache for Self-Hosted Runners uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # v2.0.3 diff --git a/aggregator/migrations/postgres/00008_add_source_chain_block_timestamp.sql b/aggregator/migrations/postgres/00008_add_source_chain_block_timestamp.sql new file mode 100644 index 000000000..194f287d4 --- /dev/null +++ b/aggregator/migrations/postgres/00008_add_source_chain_block_timestamp.sql @@ -0,0 +1,18 @@ +-- +goose Up +-- +goose StatementBegin + +-- Add source_chain_block_timestamp column to commit_verification_records +-- This represents the timestamp when the message was included in a source chain block +-- Default to current time in milliseconds for existing rows +ALTER TABLE commit_verification_records + ADD COLUMN source_chain_block_timestamp TIMESTAMP NOT NULL DEFAULT now(); + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin + +ALTER TABLE commit_verification_records + DROP COLUMN source_chain_block_timestamp; + +-- +goose StatementEnd diff --git a/aggregator/pkg/model/commit_aggregated_report.go b/aggregator/pkg/model/commit_aggregated_report.go index 6965f9ac5..2fbca2385 100644 --- a/aggregator/pkg/model/commit_aggregated_report.go +++ b/aggregator/pkg/model/commit_aggregated_report.go @@ -65,6 +65,10 @@ func (c *CommitAggregatedReport) GetMessageExecutorAddress() protocol.UnknownAdd return c.Verifications[0].MessageExecutorAddress } +func (c *CommitAggregatedReport) GetSourceChainBlockTimestamp() time.Time { + return c.Verifications[0].SourceChainBlockTimestamp +} + // It is assumed that all verifications in the report have the same message since otherwise the message ID would not match. func (c *CommitAggregatedReport) GetProtoMessage() *pb.Message { if len(c.Verifications) > 0 && c.Verifications[0].Message != nil { diff --git a/aggregator/pkg/model/commit_verification_record.go b/aggregator/pkg/model/commit_verification_record.go index a09a9579b..caa7e200d 100644 --- a/aggregator/pkg/model/commit_verification_record.go +++ b/aggregator/pkg/model/commit_verification_record.go @@ -32,14 +32,15 @@ func (c CommitVerificationRecordIdentifier) ToIdentifier() string { // CommitVerificationRecord represents a record of a commit verification. type CommitVerificationRecord struct { - MessageID MessageID - Message *protocol.Message - CCVVersion []byte - Signature []byte - MessageCCVAddresses []protocol.UnknownAddress - MessageExecutorAddress protocol.UnknownAddress - IdentifierSigner *IdentifierSigner - createdAt time.Time // Internal field for tracking creation time from DB + MessageID MessageID + Message *protocol.Message + CCVVersion []byte + Signature []byte + MessageCCVAddresses []protocol.UnknownAddress + MessageExecutorAddress protocol.UnknownAddress + SourceChainBlockTimestamp time.Time // Timestamp when message was included in source chain block (milliseconds) + IdentifierSigner *IdentifierSigner + createdAt time.Time // Internal field for tracking creation time from DB } // GetID retrieves the unique identifier for the commit verification record. diff --git a/aggregator/pkg/model/helpers.go b/aggregator/pkg/model/helpers.go index 093c52aae..4c573de79 100644 --- a/aggregator/pkg/model/helpers.go +++ b/aggregator/pkg/model/helpers.go @@ -97,9 +97,10 @@ func MapAggregatedReportToCCVDataProto(report *CommitAggregatedReport, c *Commit MessageExecutorAddress: []byte(report.GetMessageExecutorAddress()), CcvData: ccvData, Metadata: &pb.VerifierResultMetadata{ - Timestamp: timeToTimestampMillis(report.WrittenAt), - VerifierSourceAddress: quorumConfig.GetSourceVerifierAddressBytes(), - VerifierDestAddress: quorumConfig.GetDestVerifierAddressBytes(), + Timestamp: timeToTimestampMillis(report.WrittenAt), + VerifierSourceAddress: quorumConfig.GetSourceVerifierAddressBytes(), + VerifierDestAddress: quorumConfig.GetDestVerifierAddressBytes(), + SourceChainBlockTimestamp: report.GetSourceChainBlockTimestamp().UnixMilli(), }, }, nil } @@ -118,10 +119,11 @@ func CommitVerificationRecordFromProto(proto *pb.CommitteeVerifierNodeResult) (* } record := &CommitVerificationRecord{ - CCVVersion: proto.CcvVersion, - Signature: proto.Signature, - MessageCCVAddresses: ccvAddresses, - MessageExecutorAddress: protocol.UnknownAddress(proto.ExecutorAddress), + CCVVersion: proto.CcvVersion, + Signature: proto.Signature, + MessageCCVAddresses: ccvAddresses, + MessageExecutorAddress: protocol.UnknownAddress(proto.ExecutorAddress), + SourceChainBlockTimestamp: time.UnixMilli(proto.SourceChainBlockTimestamp), } record.SetTimestampFromMillis(time.Now().UnixMilli()) @@ -152,10 +154,11 @@ func CommitVerificationRecordToProto(record *CommitVerificationRecord) *pb.Commi } proto := &pb.CommitteeVerifierNodeResult{ - CcvVersion: record.CCVVersion, - Signature: record.Signature, - CcvAddresses: ccvAddresses, - ExecutorAddress: []byte(record.MessageExecutorAddress), + CcvVersion: record.CCVVersion, + Signature: record.Signature, + CcvAddresses: ccvAddresses, + ExecutorAddress: []byte(record.MessageExecutorAddress), + SourceChainBlockTimestamp: record.SourceChainBlockTimestamp.UnixMilli(), } if record.Message != nil { diff --git a/aggregator/pkg/storage/postgres/database_storage.go b/aggregator/pkg/storage/postgres/database_storage.go index 2c346b78b..6c1a867aa 100644 --- a/aggregator/pkg/storage/postgres/database_storage.go +++ b/aggregator/pkg/storage/postgres/database_storage.go @@ -107,8 +107,9 @@ func (d *DatabaseStorage) SaveCommitVerification(ctx context.Context, record *mo stmt := `INSERT INTO commit_verification_records (message_id, signer_address, signature_r, signature_s, aggregation_key, - ccv_version, signature, message_ccv_addresses, message_executor_address, message_data) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ccv_version, signature, message_ccv_addresses, message_executor_address, message_data, + source_chain_block_timestamp) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (message_id, signer_address, aggregation_key) DO NOTHING RETURNING id` @@ -125,6 +126,7 @@ func (d *DatabaseStorage) SaveCommitVerification(ctx context.Context, record *mo params["message_ccv_addresses"], params["message_executor_address"], params["message_data"], + params["source_chain_block_timestamp"], ) if err != nil { if err == sql.ErrNoRows { @@ -241,6 +243,7 @@ func (d *DatabaseStorage) QueryAggregatedReports(ctx context.Context, sinceSeque &verRow.MessageExecutorAddress, &verRow.MessageData, &verRow.ID, + &verRow.SourceChainBlockTimestamp, &verRow.CreatedAt, ) if err != nil { @@ -353,6 +356,7 @@ func (d *DatabaseStorage) GetCCVData(ctx context.Context, messageID model.Messag &verRow.MessageExecutorAddress, &verRow.MessageData, &verRow.ID, + &verRow.SourceChainBlockTimestamp, &verRow.CreatedAt, ) if err != nil { @@ -454,6 +458,7 @@ func (d *DatabaseStorage) GetBatchCCVData(ctx context.Context, messageIDs []mode &verRow.MessageExecutorAddress, &verRow.MessageData, &verRow.ID, + &verRow.SourceChainBlockTimestamp, &verRow.CreatedAt, ) if err != nil { diff --git a/aggregator/pkg/storage/postgres/database_storage_mapper.go b/aggregator/pkg/storage/postgres/database_storage_mapper.go index e6b16db14..5a3f8d796 100644 --- a/aggregator/pkg/storage/postgres/database_storage_mapper.go +++ b/aggregator/pkg/storage/postgres/database_storage_mapper.go @@ -13,19 +13,20 @@ import ( ) type commitVerificationRecordRow struct { - ID int64 `db:"id"` - SeqNum int64 `db:"seq_num"` - MessageID string `db:"message_id"` - SignerAddress string `db:"signer_address"` - SignatureR []byte `db:"signature_r"` - SignatureS []byte `db:"signature_s"` - AggregationKey string `db:"aggregation_key"` - CCVVersion []byte `db:"ccv_version"` - Signature []byte `db:"signature"` - MessageCCVAddresses pq.StringArray `db:"message_ccv_addresses"` - MessageExecutorAddress string `db:"message_executor_address"` - MessageData []byte `db:"message_data"` - CreatedAt time.Time `db:"created_at"` + ID int64 `db:"id"` + SeqNum int64 `db:"seq_num"` + MessageID string `db:"message_id"` + SignerAddress string `db:"signer_address"` + SignatureR []byte `db:"signature_r"` + SignatureS []byte `db:"signature_s"` + AggregationKey string `db:"aggregation_key"` + CCVVersion []byte `db:"ccv_version"` + Signature []byte `db:"signature"` + MessageCCVAddresses pq.StringArray `db:"message_ccv_addresses"` + MessageExecutorAddress string `db:"message_executor_address"` + MessageData []byte `db:"message_data"` + SourceChainBlockTimestamp time.Time `db:"source_chain_block_timestamp"` + CreatedAt time.Time `db:"created_at"` } func rowToCommitVerificationRecord(row *commitVerificationRecordRow) (*model.CommitVerificationRecord, error) { @@ -68,13 +69,14 @@ func rowToCommitVerificationRecord(row *commitVerificationRecordRow) (*model.Com } record := &model.CommitVerificationRecord{ - MessageID: messageID, - Message: &message, - CCVVersion: row.CCVVersion, - Signature: row.Signature, - MessageCCVAddresses: messageCCVAddresses, - MessageExecutorAddress: messageExecutorAddress, - IdentifierSigner: identifierSigner, + MessageID: messageID, + Message: &message, + CCVVersion: row.CCVVersion, + Signature: row.Signature, + MessageCCVAddresses: messageCCVAddresses, + MessageExecutorAddress: messageExecutorAddress, + SourceChainBlockTimestamp: row.SourceChainBlockTimestamp, + IdentifierSigner: identifierSigner, } record.SetTimestampFromMillis(row.CreatedAt.UnixMilli()) return record, nil @@ -109,16 +111,17 @@ func recordToInsertParams(record *model.CommitVerificationRecord, aggregationKey messageExecutorAddressHex := record.MessageExecutorAddress.String() params := map[string]any{ - "message_id": messageIDHex, - "signer_address": signerAddressHex, - "signature_r": record.IdentifierSigner.SignatureR[:], - "signature_s": record.IdentifierSigner.SignatureS[:], - "aggregation_key": aggregationKey, - "ccv_version": record.CCVVersion, - "signature": record.Signature, - "message_ccv_addresses": pq.Array(messageCCVAddressesHex), - "message_executor_address": messageExecutorAddressHex, - "message_data": messageDataJSON, + "message_id": messageIDHex, + "signer_address": signerAddressHex, + "signature_r": record.IdentifierSigner.SignatureR[:], + "signature_s": record.IdentifierSigner.SignatureS[:], + "aggregation_key": aggregationKey, + "ccv_version": record.CCVVersion, + "signature": record.Signature, + "message_ccv_addresses": pq.Array(messageCCVAddressesHex), + "message_executor_address": messageExecutorAddressHex, + "message_data": messageDataJSON, + "source_chain_block_timestamp": record.SourceChainBlockTimestamp, } return params, nil @@ -126,11 +129,11 @@ func recordToInsertParams(record *model.CommitVerificationRecord, aggregationKey const allVerificationRecordColumns = `message_id, signer_address, signature_r, signature_s, aggregation_key, - ccv_version, signature, message_ccv_addresses, message_executor_address, message_data, id, created_at` + ccv_version, signature, message_ccv_addresses, message_executor_address, message_data, id, source_chain_block_timestamp, created_at` const allVerificationRecordColumnsQualified = `cvr.message_id, cvr.signer_address, cvr.signature_r, cvr.signature_s, cvr.aggregation_key, - cvr.ccv_version, cvr.signature, cvr.message_ccv_addresses, cvr.message_executor_address, cvr.message_data, cvr.id, cvr.created_at` + cvr.ccv_version, cvr.signature, cvr.message_ccv_addresses, cvr.message_executor_address, cvr.message_data, cvr.id, cvr.source_chain_block_timestamp, cvr.created_at` func mustParseUint64(s string) uint64 { var result uint64 diff --git a/build/devenv/config.go b/build/devenv/config.go index 34fb50654..4aa1e4182 100644 --- a/build/devenv/config.go +++ b/build/devenv/config.go @@ -34,6 +34,7 @@ const ( DefaultAnvilKey = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" DefaultLokiURL = "http://localhost:3030/loki/api/v1/push" DefaultTempoURL = "http://localhost:4318/v1/traces" + DefaultPromURL = "http://localhost:9099" ) var L = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}).Level(zerolog.InfoLevel) @@ -101,23 +102,24 @@ func LoadOutput[T any](outputPath string) (*T, error) { } // Load addresses into the datastore so that tests can query them appropriately. - if c, ok := any(config).(*Cfg); ok { - if len(c.CLDF.Addresses) > 0 { - ds := datastore.NewMemoryDataStore() - for _, addrRefJSON := range c.CLDF.Addresses { - var addrs []datastore.AddressRef - if err := json.Unmarshal([]byte(addrRefJSON), &addrs); err != nil { - return nil, fmt.Errorf("failed to unmarshal addresses from config: %w", err) - } - for _, addr := range addrs { - if err := ds.Addresses().Add(addr); err != nil { - return nil, fmt.Errorf("failed to set address in datastore: %w", err) - } - } + c, ok := any(config).(*Cfg) + if !ok || len(c.CLDF.Addresses) == 0 { + return config, nil + } + + ds := datastore.NewMemoryDataStore() + for _, addrRefJSON := range c.CLDF.Addresses { + var addrs []datastore.AddressRef + if err := json.Unmarshal([]byte(addrRefJSON), &addrs); err != nil { + return nil, fmt.Errorf("failed to unmarshal addresses from config: %w", err) + } + for _, addr := range addrs { + if err := ds.Addresses().Add(addr); err != nil { + return nil, fmt.Errorf("failed to set address in datastore: %w", err) } - c.CLDF.DataStore = ds.Seal() } } + c.CLDF.DataStore = ds.Seal() return config, nil } diff --git a/build/devenv/go.mod b/build/devenv/go.mod index 1e2ae8cdd..450f13699 100644 --- a/build/devenv/go.mod +++ b/build/devenv/go.mod @@ -33,11 +33,12 @@ require ( github.com/go-resty/resty/v2 v2.16.5 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 + github.com/prometheus/common v1.20.99 github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm v0.0.0-20251127040717-30244f57ea7a github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/deployment v0.0.0-20251127040717-30244f57ea7a github.com/smartcontractkit/chainlink-ccv v0.0.0-00010101000000-000000000000 github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250826201006-c81344a26fc3 - github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251126123859-d079d6815edb + github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251127174315-8ee658a4af6a github.com/smartcontractkit/chainlink-testing-framework/wasp v1.51.1 google.golang.org/grpc v1.76.0 ) @@ -302,7 +303,6 @@ require ( github.com/pressly/goose/v3 v3.26.0 // indirect github.com/prometheus/alertmanager v0.28.0 // indirect github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v1.20.99 // indirect github.com/prometheus/exporter-toolkit v0.13.2 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/prometheus/prometheus v0.302.0 // indirect diff --git a/build/devenv/go.sum b/build/devenv/go.sum index 1f072d228..d11772fc2 100644 --- a/build/devenv/go.sum +++ b/build/devenv/go.sum @@ -1071,8 +1071,8 @@ github.com/smartcontractkit/chainlink-deployments-framework v0.66.0 h1:tJvjPiQsC github.com/smartcontractkit/chainlink-deployments-framework v0.66.0/go.mod h1:8EXTqTr/5T5WLZpWg6npDvmcR3/wLl1A8eculNCn5GA= github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250826201006-c81344a26fc3 h1:mJP6yJq2woOZchX0KvhLiKxDPaS0Vy4vTDFH4nnFkXs= github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250826201006-c81344a26fc3/go.mod h1:3Lsp38qxen9PABVF+O5eocveQev+hyo9HLAgRodBD4Q= -github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251126123859-d079d6815edb h1:vGLgImXYmzK8eow7kShHGZO948818NAI3FPihEH1v7c= -github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251126123859-d079d6815edb/go.mod h1:KJkb85Mfxr/2vjPvAWWpq0/QJMAP1Bts1wMWWhRn4/E= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251127174315-8ee658a4af6a h1:vGPF2Tlg1SBcCBPUP51m/PTO9IokKBuSaTjRV3CKyQ0= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251127174315-8ee658a4af6a/go.mod h1:KJkb85Mfxr/2vjPvAWWpq0/QJMAP1Bts1wMWWhRn4/E= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 h1:1/KdO5AbUr3CmpLjMPuJXPo2wHMbfB8mldKLsg7D4M8= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q= github.com/smartcontractkit/chainlink-protos/job-distributor v0.17.0 h1:xHPmFDhff7QpeFxKsZfk+24j4AlnQiFjjRh5O87Peu4= diff --git a/build/devenv/tests/e2e/load_test.go b/build/devenv/tests/e2e/load_test.go index 4080a7f17..6cf07fdde 100644 --- a/build/devenv/tests/e2e/load_test.go +++ b/build/devenv/tests/e2e/load_test.go @@ -12,11 +12,11 @@ import ( "github.com/Masterminds/semver/v3" "github.com/ethereum/go-ethereum/common" "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/deployment/v1_7_0/operations/committee_verifier" "github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/deployment/v1_7_0/operations/mock_receiver" - "github.com/smartcontractkit/chainlink-ccv/devenv/tests/e2e/metrics" "github.com/smartcontractkit/chainlink-ccv/protocol" cldfevm "github.com/smartcontractkit/chainlink-deployments-framework/chain/evm" "github.com/smartcontractkit/chainlink-deployments-framework/datastore" @@ -181,115 +181,6 @@ func (m *EVMTXGun) Call(_ *wasp.Generator) *wasp.Response { return &wasp.Response{Data: "ok"} } -func assertMessagesAsync(tc TestingContext, gun *EVMTXGun) func() ([]metrics.MessageMetrics, metrics.MessageTotals) { - fromSelector := gun.src.Selector - toSelector := gun.dest.Selector - - metricsChan := make(chan metrics.MessageMetrics, 100) - var wg sync.WaitGroup - var totalSent, totalReceived int - var countMu sync.Mutex - - // Track specific messages for detailed reporting - sentMessages := make(map[uint64]string) - receivedMessages := make(map[uint64]string) - - // Create a context with timeout for verification - verifyCtx, cancelVerify := context.WithTimeout(tc.Ctx, tc.Timeout) - - go func() { - defer close(metricsChan) - defer cancelVerify() - - for sentMsg := range gun.sentMsgCh { - msgIDHex := common.BytesToHash(sentMsg.MessageID[:]).Hex() - countMu.Lock() - totalSent++ - sentMessages[sentMsg.SeqNo] = msgIDHex - countMu.Unlock() - - // Launch a goroutine for each message to verify it - wg.Add(1) - go func(msg SentMessage) { - defer wg.Done() - - msgIDHex := common.BytesToHash(msg.MessageID[:]).Hex() - execEvent, err := tc.Impl.WaitOneExecEventBySeqNo(verifyCtx, fromSelector, toSelector, msg.SeqNo, tc.Timeout) - - if verifyCtx.Err() != nil { - tc.T.Logf("Message %d verification timed out", msg.SeqNo) - return - } - - if err != nil { - tc.T.Logf("Failed to get execution event for sequence number %d: %v", msg.SeqNo, err) - return - } - - if execEvent.State != cciptestinterfaces.ExecutionStateSuccess { - tc.T.Logf("Message with sequence number %d was not successfully executed, state: %d", msg.SeqNo, execEvent.State) - return - } - - executedTime := time.Now() - latency := executedTime.Sub(msg.SentTime) - - tc.T.Logf("Message with sequence number %d successfully executed (latency: %v)", msg.SeqNo, latency) - - countMu.Lock() - totalReceived++ - receivedMessages[msg.SeqNo] = msgIDHex - countMu.Unlock() - - metricsChan <- metrics.MessageMetrics{ - SeqNo: msg.SeqNo, - MessageID: msgIDHex, - SentTime: msg.SentTime, - ExecutedTime: executedTime, - LatencyDuration: latency, - } - }(sentMsg) - } - - tc.T.Logf("All messages sent, waiting for assertion to complete") - - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - select { - case <-done: - tc.T.Logf("All verification goroutines completed") - case <-verifyCtx.Done(): - tc.T.Logf("Verification timeout reached, some goroutines may still be running") - } - }() - - return func() ([]metrics.MessageMetrics, metrics.MessageTotals) { - datum := make([]metrics.MessageMetrics, 0, 100) - for metric := range metricsChan { - datum = append(datum, metric) - } - - countMu.Lock() - totals := metrics.MessageTotals{ - Sent: totalSent, - Received: totalReceived, - SentMessages: sentMessages, - ReceivedMessages: receivedMessages, - } - countMu.Unlock() - - notVerified := totals.Sent - totals.Received - tc.T.Logf("Verification complete - Sent: %d, ReachedVerifier: %d, Verified: %d, Aggregated: %d, Indexed: %d, ReachedExecutor: %d, SentToChain: %d, Received: %d, Not Received: %d", - totals.Sent, totals.ReachedVerifier, totals.Verified, totals.Aggregated, totals.Indexed, totals.ReachedExecutor, totals.SentToChainInExecutor, totals.Received, notVerified) - - return datum, totals - } -} - func gasControlFunc(t *testing.T, r *rpc.RPCClient, blockPace time.Duration) { startGasPrice := big.NewInt(2e9) // ramp @@ -351,6 +242,9 @@ func TestE2ELoad(t *testing.T) { if os.Getenv("LOKI_URL") == "" { _ = os.Setenv("LOKI_URL", ccv.DefaultLokiURL) } + if os.Getenv("PROM_URL") == "" { + _ = os.Setenv("PROM_URL", ccv.DefaultPromURL) + } srcRPCURL := in.Blockchains[0].Out.Nodes[0].ExternalHTTPUrl dstRPCURL := in.Blockchains[1].Out.Nodes[0].ExternalHTTPUrl @@ -374,51 +268,36 @@ func TestE2ELoad(t *testing.T) { impl, err := evm.NewCCIP17EVM(ctx, *l, e, chainIDs, wsURLs) require.NoError(t, err) - indexerURL := fmt.Sprintf("http://127.0.0.1:%d", in.Indexer.Port) - defaultAggregatorAddr := fmt.Sprintf("127.0.0.1:%d", defaultAggregatorPort(in)) - - defaultAggregatorClient, err := ccv.NewAggregatorClient( - zerolog.Ctx(ctx).With().Str("component", "aggregator-client").Logger(), - defaultAggregatorAddr) + // Initialize Prometheus helper + promHelper, err := NewPrometheusHelper(os.Getenv("PROM_URL"), *zerolog.Ctx(ctx)) require.NoError(t, err) - require.NotNil(t, defaultAggregatorClient) - t.Cleanup(func() { - defaultAggregatorClient.Close() - }) - - indexerClient := ccv.NewIndexerClient( - zerolog.Ctx(ctx).With().Str("component", "indexer-client").Logger(), - indexerURL) - require.NotNil(t, indexerClient) - + PrintRunningContainers(t) t.Run("clean", func(t *testing.T) { - // just a clean load test to measure performance - rps := int64(5) + rps := int64(1) testDuration := 30 * time.Second - tc := NewTestingContext(t, ctx, impl, defaultAggregatorClient, indexerClient) - tc.Timeout = 2 * time.Minute - p, gun := createLoadProfile(in, rps, testDuration, e, selectors, impl, srcChain, dstChain) - waitForMetrics := assertMessagesAsync(tc, gun) _, err = p.Run(true) require.NoError(t, err) + p.Wait() - // Close the channel to signal no more messages will be sent - gun.CloseSentChannel() + waitForAllMessagesToBeExecuted(t, ctx, promHelper, len(gun.msgIDs)) + assertP90LatencyBelowThreshold(t, ctx, promHelper, 5*time.Second) + }) - // Wait for all messages to be verified and collect metrics - metrics_datum, totals := waitForMetrics() + t.Run("burst", func(t *testing.T) { + rps := int64(5) + testDuration := 30 * time.Second - // Enrich metrics with log data collected during test - tc.enrichMetrics(metrics_datum) + p, gun := createLoadProfile(in, rps, testDuration, e, selectors, impl, srcChain, dstChain) - summary := metrics.CalculateMetricsSummary(metrics_datum, totals) - metrics.PrintMetricsSummary(t, summary) + _, err = p.Run(true) + require.NoError(t, err) + p.Wait() - require.Equal(t, summary.TotalSent, summary.TotalReceived) - require.LessOrEqual(t, summary.P90Latency, 8*time.Second) + waitForAllMessagesToBeExecuted(t, ctx, promHelper, len(gun.msgIDs)) + assertP90LatencyBelowThreshold(t, ctx, promHelper, 5*time.Second) }) t.Run("rpc latency", func(t *testing.T) { @@ -432,40 +311,21 @@ func TestE2ELoad(t *testing.T) { rps := int64(1) - tc := NewTestingContext(t, ctx, impl, defaultAggregatorClient, indexerClient) - tc.Timeout = timeoutDuration - p, gun := createLoadProfile(in, rps, testDuration, e, selectors, impl, srcChain, dstChain) - waitForMetrics := assertMessagesAsync(tc, gun) _, err = p.Run(true) require.NoError(t, err) + p.Wait() - // Close the channel to signal no more messages will be sent - gun.CloseSentChannel() - - // Wait for all messages to be verified and collect metrics - metrics_datum, totals := waitForMetrics() - - // Enrich metrics with log data collected during test - tc.enrichMetrics(metrics_datum) - - summary := metrics.CalculateMetricsSummary(metrics_datum, totals) - metrics.PrintMetricsSummary(t, summary) - - require.Equal(t, summary.TotalSent, summary.TotalReceived) - require.LessOrEqual(t, summary.P90Latency, expectedP90Latency) + waitForAllMessagesToBeExecuted(t, ctx, promHelper, len(gun.msgIDs)) + assertP90LatencyBelowThreshold(t, ctx, promHelper, 5*time.Second) }) t.Run("gas", func(t *testing.T) { rps := int64(1) testDuration := 5 * time.Minute - tc := NewTestingContext(t, ctx, impl, defaultAggregatorClient, indexerClient) - tc.Timeout = 10 * time.Minute - p, gun := createLoadProfile(in, rps, testDuration, e, selectors, impl, srcChain, dstChain) - waitForMetrics := assertMessagesAsync(tc, gun) _, err = p.Run(false) require.NoError(t, err) @@ -518,27 +378,15 @@ func TestE2ELoad(t *testing.T) { } p.Wait() - // Close the channel to signal no more messages will be sent - gun.CloseSentChannel() - - // Wait for all messages to be verified and collect metrics - metrics_datum, totals := waitForMetrics() - - // Enrich metrics with log data collected during test - tc.enrichMetrics(metrics_datum) - - summary := metrics.CalculateMetricsSummary(metrics_datum, totals) - metrics.PrintMetricsSummary(t, summary) + waitForAllMessagesToBeExecuted(t, ctx, promHelper, len(gun.msgIDs)) + assertP90LatencyBelowThreshold(t, ctx, promHelper, 10*time.Second) }) t.Run("reorgs", func(t *testing.T) { rps := int64(1) testDuration := 5 * time.Minute - tc := NewTestingContext(t, ctx, impl, defaultAggregatorClient, indexerClient) - p, gun := createLoadProfile(in, rps, testDuration, e, selectors, impl, srcChain, dstChain) - waitForMetrics := assertMessagesAsync(tc, gun) _, err = p.Run(false) require.NoError(t, err) @@ -601,17 +449,8 @@ func TestE2ELoad(t *testing.T) { } p.Wait() - // Close the channel to signal no more messages will be sent - gun.CloseSentChannel() - - // Wait for all messages to be verified and collect metrics - metrics_datum, totals := waitForMetrics() - - // Enrich metrics with log data collected during test - tc.enrichMetrics(metrics_datum) - - summary := metrics.CalculateMetricsSummary(metrics_datum, totals) - metrics.PrintMetricsSummary(t, summary) + waitForAllMessagesToBeExecuted(t, ctx, promHelper, len(gun.msgIDs)) + assertP90LatencyBelowThreshold(t, ctx, promHelper, 10*time.Second) }) t.Run("services_chaos", func(t *testing.T) { @@ -702,10 +541,7 @@ func TestE2ELoad(t *testing.T) { }, } - tc := NewTestingContext(t, ctx, impl, defaultAggregatorClient, indexerClient) - p, gun := createLoadProfile(in, rps, testDuration, e, selectors, impl, srcChain, dstChain) - waitForMetrics := assertMessagesAsync(tc, gun) _, err = p.Run(false) require.NoError(t, err) @@ -721,16 +557,26 @@ func TestE2ELoad(t *testing.T) { } p.Wait() - // Close the channel to signal no more messages will be sent - gun.CloseSentChannel() - - // Wait for all messages to be verified and collect metrics - metrics_datum, totals := waitForMetrics() + waitForAllMessagesToBeExecuted(t, ctx, promHelper, len(gun.msgIDs)) + assertP90LatencyBelowThreshold(t, ctx, promHelper, 10*time.Second) + }) +} - // Enrich metrics with log data collected during test - tc.enrichMetrics(metrics_datum) +// This is not perfect there are case where we could reprocess message and the metric would reach the expected value +// but for load test is not completed yet. It is also not going to work unless we create a new environment for each test. +func waitForAllMessagesToBeExecuted(t *testing.T, ctx context.Context, promHelper *PrometheusHelper, totalMessageExpected int) { + require.EventuallyWithT(t, func(collect *assert.CollectT) { + totalMessageProcessed, err := promHelper.GetCurrentCounter(ctx, "executor_message_e2e_duration_seconds_count") + require.NoError(collect, err) + t.Logf("Progess: %d/%d", totalMessageProcessed, totalMessageExpected) + require.GreaterOrEqual(collect, totalMessageProcessed, totalMessageExpected, + "Total processed messages should be at least total sent messages") + }, 2*time.Minute, 10*time.Second) +} - summary := metrics.CalculateMetricsSummary(metrics_datum, totals) - metrics.PrintMetricsSummary(t, summary) - }) +func assertP90LatencyBelowThreshold(t *testing.T, ctx context.Context, promHelper *PrometheusHelper, threshold time.Duration) { + p90E2ELatency, err := promHelper.GetPercentile(ctx, "executor_message_e2e_duration_seconds_bucket", 0.90) + require.NoError(t, err) + require.Less(t, p90E2ELatency, threshold.Seconds(), + fmt.Sprintf("P90 of executor_message_e2e_duration_seconds should be less than %.2f seconds", threshold.Seconds())) } diff --git a/build/devenv/tests/e2e/test_helpers.go b/build/devenv/tests/e2e/test_helpers.go index acf8f97c9..bfaacb602 100644 --- a/build/devenv/tests/e2e/test_helpers.go +++ b/build/devenv/tests/e2e/test_helpers.go @@ -5,14 +5,17 @@ import ( "encoding/hex" "fmt" "os" + "os/exec" "testing" "time" "github.com/ethereum/go-ethereum/ethclient" + promapi "github.com/prometheus/client_golang/api" + promv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" "github.com/rs/zerolog" "github.com/smartcontractkit/chainlink-ccv/devenv/tests/e2e/logasserter" - "github.com/smartcontractkit/chainlink-ccv/devenv/tests/e2e/metrics" ccv "github.com/smartcontractkit/chainlink-ccv/devenv" "github.com/smartcontractkit/chainlink-ccv/devenv/evm" @@ -67,10 +70,6 @@ func NewTestingContext(t *testing.T, ctx context.Context, impl *evm.CCIP17EVM, a return tc } -func (tc *TestingContext) enrichMetrics(metrics []metrics.MessageMetrics) { - tc.LogAsserter.EnrichMetrics(metrics) -} - type AssertionResult struct { AggregatorFound bool VerifierReached bool @@ -243,3 +242,151 @@ func (a *AnvilRPCHelper) GetAutomine(ctx context.Context) (bool, error) { a.logger.Info().Bool("automine", automine).Msg("Got automine status") return automine, nil } + +// PrometheusHelper provides utilities for querying Prometheus metrics. +type PrometheusHelper struct { + api promv1.API + logger zerolog.Logger +} + +// NewPrometheusHelper creates a new helper for Prometheus operations. +func NewPrometheusHelper(prometheusURL string, logger zerolog.Logger) (*PrometheusHelper, error) { + client, err := promapi.NewClient(promapi.Config{ + Address: prometheusURL, + }) + if err != nil { + return nil, fmt.Errorf("failed to create Prometheus client: %w", err) + } + + api := promv1.NewAPI(client) + _, err = api.Config(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to connect to Prometheus at %s: %w", prometheusURL, err) + } + + return &PrometheusHelper{ + api: api, + logger: logger, + }, nil +} + +func (p *PrometheusHelper) GetPercentile( + ctx context.Context, + metric string, + percentile float64, +) (float64, error) { + if percentile < 0 || percentile > 1 { + return 0, fmt.Errorf("percentile must be between 0 and 1, got %f", percentile) + } + + // We assume CI/test runs start from zeroed histogram buckets for the given selector. + // Therefore, the bucket values at endTime represent the full histogram over the run. + // + // Example query: + // histogram_quantile(0.90, sum by (le) (http_request_duration_seconds_bucket{test_id="loadtest-123"})) + query := fmt.Sprintf( + "histogram_quantile(%.4f, sum by (le) (%s))", + percentile, + metric, + ) + + p.logger.Info(). + Str("metric", metric). + Str("query", query). + Float64("percentile", percentile). + Msg("Prometheus percentile (instant) query") + + // Single evaluation at endTime. + result, warnings, err := p.api.Query(ctx, query, time.Now()) + if err != nil { + return 0, fmt.Errorf("failed to query Prometheus: %w", err) + } + + if len(warnings) > 0 { + p.logger.Warn(). + Strs("warnings", warnings). + Str("query", query). + Msg("Prometheus percentile query returned warnings") + } + + vector, ok := result.(model.Vector) + if !ok { + return 0, fmt.Errorf("unexpected result type: %T (expected model.Vector)", result) + } + + if len(vector) == 0 { + return 0, fmt.Errorf("no data found for metric %s", metric) + } + + // sum by (le) + histogram_quantile should yield exactly one sample; take the first. + value := float64(vector[0].Value) + + p.logger.Info(). + Str("metric", metric). + Float64("percentile", percentile). + Float64("value", value). + Str("query", query). + Msg("Retrieved percentile metric (instant)") + + return value, nil +} + +func (p *PrometheusHelper) GetCurrentCounter( + ctx context.Context, + metric string, +) (int, error) { + // Wrap metric in sum() so we get one aggregated value. + query := fmt.Sprintf("sum(%s)", metric) + + p.logger.Info(). + Str("metric", metric). + Str("query", query). + Msg("Prometheus current-counter query") + + result, warnings, err := p.api.Query(ctx, query, time.Now()) + if err != nil { + return 0, fmt.Errorf("failed to query Prometheus: %w", err) + } + + if len(warnings) > 0 { + p.logger.Warn(). + Strs("warnings", warnings). + Str("query", query). + Msg("Prometheus query returned warnings") + } + + vector, ok := result.(model.Vector) + if !ok { + return 0, fmt.Errorf("unexpected result type: %T (expected model.Vector)", result) + } + + if len(vector) == 0 { + return 0, fmt.Errorf("no data found for metric %s", metric) + } + + // sum(...) should give exactly one sample; we take the first. + value := int(vector[0].Value) + + p.logger.Info(). + Str("metric", metric). + Int("value", value). + Str("query", query). + Msg("Retrieved current counter metric value") + + return value, nil +} + +// PrintRunningContainers prints all running Docker containers for debugging purposes. +// This is useful for CI debugging to understand the container state. +func PrintRunningContainers(t *testing.T) { + t.Helper() + + cmd := exec.Command("docker", "ps", "--format", "table {{.Names}}\t{{.Status}}\t{{.Ports}}") + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: failed to list Docker containers: %v", err) + return + } + + t.Logf("Docker containers currently running:\n%s", string(output)) +} diff --git a/executor/interfaces.go b/executor/interfaces.go index ca1cb4559..debc1e33c 100644 --- a/executor/interfaces.go +++ b/executor/interfaces.go @@ -90,6 +90,8 @@ type MetricLabeler interface { With(keyValues ...string) MetricLabeler // RecordMessageExecutionLatency records the duration of the full ExecuteMessage operation. RecordMessageExecutionLatency(ctx context.Context, duration time.Duration) + // RecordMessageE2ELatency records the duration from first seen to execution for a message. + RecordMessageE2ELatency(ctx context.Context, duration time.Duration) // IncrementMessagesProcessed increments the counter for successfully processed messages. IncrementMessagesProcessed(ctx context.Context) // IncrementMessagesProcessingFailed increments the counter for failed message executions. diff --git a/executor/pkg/executor/cl_executor.go b/executor/pkg/executor/cl_executor.go index e1537cca6..f8131b23e 100644 --- a/executor/pkg/executor/cl_executor.go +++ b/executor/pkg/executor/cl_executor.go @@ -131,6 +131,24 @@ func (cle *ChainlinkExecutor) AttemptExecuteMessage(ctx context.Context, message "ccvDatasDestVerifiers", ccvDataDestVerifiers(ccvData), "ccvDatasSourceVerifiers", ccvDataSourceVerifiers(ccvData), ) + + var sourceBlockTimestamp *time.Time + for _, data := range ccvData { + if sourceBlockTimestamp == nil || data.SourceChainBlockTimestamp.Before(*sourceBlockTimestamp) { + if data.SourceChainBlockTimestamp.IsZero() { + continue + } + if sourceBlockTimestamp != nil && !data.SourceChainBlockTimestamp.Equal(*sourceBlockTimestamp) { + cle.lggr.Warnw("mismatched source chain block timestamps among required CCVs. Using earliest timestamp.", + "existingTimestamp", sourceBlockTimestamp, + "newTimestamp", data.SourceChainBlockTimestamp, + ) + } + + sourceBlockTimestamp = &data.SourceChainBlockTimestamp + } + } + orderedCCVData, orderedCCVOfframps, latestCCVTimestamp, err := orderCCVData(ccvData, ccvInfo) if err != nil { return fmt.Errorf("failed to order CCV Offramp data for message %s: %w", messageID.String(), err) @@ -155,7 +173,12 @@ func (cle *ChainlinkExecutor) AttemptExecuteMessage(ctx context.Context, message // Record the message execution latency. cle.monitoring.Metrics().RecordMessageExecutionLatency(ctx, time.Since(time.Unix(latestCCVTimestamp, 0))) - + if sourceBlockTimestamp != nil { + cle.monitoring.Metrics().RecordMessageE2ELatency(ctx, time.Since(*sourceBlockTimestamp)) + cle.lggr.Info("recorded source block latency", "messageID", messageID, "latency", time.Since(*sourceBlockTimestamp)) + } else { + cle.lggr.Warnw("source block timestamp is nil, cannot record source block latency", "messageID", messageID) + } return nil } diff --git a/executor/pkg/monitoring/metrics.go b/executor/pkg/monitoring/metrics.go index b6130c07e..b5c754b31 100644 --- a/executor/pkg/monitoring/metrics.go +++ b/executor/pkg/monitoring/metrics.go @@ -18,6 +18,7 @@ import ( type ExecutorMetrics struct { // Latency messageExecutionLatency metric.Float64Histogram + messageE2ELatency metric.Float64Histogram // Message Processing Counters messagesProcessedCounter metric.Int64Counter @@ -39,6 +40,15 @@ func InitMetrics() (*ExecutorMetrics, error) { return nil, fmt.Errorf("failed to register message e2e latency histogram: %w", err) } + vm.messageE2ELatency, err = beholder.GetMeter().Float64Histogram( + "executor_message_e2e_duration_seconds", + metric.WithDescription("End-to-end message latency from source chain inclusion to execution"), + metric.WithUnit("seconds"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register message e2e latency histogram: %w", err) + } + // Message Processing Counters vm.messagesProcessedCounter, err = beholder.GetMeter().Int64Counter( "executor_messages_processed_total", @@ -96,6 +106,11 @@ func (v *ExecutorMetricLabeler) RecordMessageExecutionLatency(ctx context.Contex v.vm.messageExecutionLatency.Record(ctx, duration.Seconds(), metric.WithAttributes(otelLabels...)) } +func (v *ExecutorMetricLabeler) RecordMessageE2ELatency(ctx context.Context, duration time.Duration) { + otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() + v.vm.messageE2ELatency.Record(ctx, duration.Seconds(), metric.WithAttributes(otelLabels...)) +} + func (v *ExecutorMetricLabeler) IncrementMessagesProcessed(ctx context.Context) { otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() v.vm.messagesProcessedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) diff --git a/executor/pkg/monitoring/monitoring.go b/executor/pkg/monitoring/monitoring.go index 4a2f05bcc..e08219e3f 100644 --- a/executor/pkg/monitoring/monitoring.go +++ b/executor/pkg/monitoring/monitoring.go @@ -99,6 +99,9 @@ func (n *NoopExecutorMetricLabeler) With(keyValues ...string) executor.MetricLab func (n *NoopExecutorMetricLabeler) RecordMessageExecutionLatency(ctx context.Context, duration time.Duration) { } +func (n *NoopExecutorMetricLabeler) RecordMessageE2ELatency(ctx context.Context, duration time.Duration) { +} + func (n *NoopExecutorMetricLabeler) IncrementMessagesProcessed(ctx context.Context) {} func (n *NoopExecutorMetricLabeler) IncrementMessagesProcessingFailed(ctx context.Context) {} diff --git a/go.mod b/go.mod index 3398e3e61..5aab807de 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/smartcontractkit/chainlink-common v0.9.6-0.20250929154511-1f5fbda7ae76 github.com/smartcontractkit/chainlink-evm v0.3.3 github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250826201006-c81344a26fc3 // indirect - github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251126123859-d079d6815edb + github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251127174315-8ee658a4af6a github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.39.0 github.com/ulule/limiter/v3 v3.11.2 diff --git a/go.sum b/go.sum index 74cab2b81..f7b6a01a3 100644 --- a/go.sum +++ b/go.sum @@ -598,8 +598,8 @@ github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-23 github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc= github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250729142306-508e798f6a5d h1:pTYIcsWHTMG5fAcbRUA8Qk5yscXKdSpopQ0DUEOjPik= github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250729142306-508e798f6a5d/go.mod h1:2JTBNp3FlRdO/nHc4dsc9bfxxMClMO1Qt8sLJgtreBY= -github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251126123859-d079d6815edb h1:vGLgImXYmzK8eow7kShHGZO948818NAI3FPihEH1v7c= -github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251126123859-d079d6815edb/go.mod h1:KJkb85Mfxr/2vjPvAWWpq0/QJMAP1Bts1wMWWhRn4/E= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251127174315-8ee658a4af6a h1:vGPF2Tlg1SBcCBPUP51m/PTO9IokKBuSaTjRV3CKyQ0= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251127174315-8ee658a4af6a/go.mod h1:KJkb85Mfxr/2vjPvAWWpq0/QJMAP1Bts1wMWWhRn4/E= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 h1:1/KdO5AbUr3CmpLjMPuJXPo2wHMbfB8mldKLsg7D4M8= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q= github.com/smartcontractkit/chainlink-protos/svr v1.1.0 h1:79Z9N9dMbMVRGaLoDPAQ+vOwbM+Hnx8tIN2xCPG8H4o= diff --git a/indexer/migrations/20251126000000_add_source_chain_block_timestamp.sql b/indexer/migrations/20251126000000_add_source_chain_block_timestamp.sql new file mode 100644 index 000000000..a889b3407 --- /dev/null +++ b/indexer/migrations/20251126000000_add_source_chain_block_timestamp.sql @@ -0,0 +1,18 @@ +-- +goose Up +-- +goose StatementBegin + +-- Add source_chain_block_timestamp column to indexer.verifier_results +-- This represents the timestamp when the message was included in a source chain block +-- Default to current time in milliseconds for existing rows +ALTER TABLE indexer.verifier_results + ADD COLUMN source_chain_block_timestamp TIMESTAMP NOT NULL DEFAULT now(); + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin + +ALTER TABLE indexer.verifier_results + DROP COLUMN source_chain_block_timestamp; + +-- +goose StatementEnd diff --git a/indexer/pkg/common/metadata.go b/indexer/pkg/common/metadata.go index 9d0d0ef27..d4531bc36 100644 --- a/indexer/pkg/common/metadata.go +++ b/indexer/pkg/common/metadata.go @@ -12,9 +12,10 @@ type VerifierResultWithMetadata struct { } type VerifierResultMetadata struct { - VerifierName string `json:"verifierName"` - AttestationTimestamp time.Time `json:"attestationTimestamp"` - IngestionTimestamp time.Time `json:"ingestionTimestamp"` + VerifierName string `json:"verifierName"` + AttestationTimestamp time.Time `json:"attestationTimestamp"` + IngestionTimestamp time.Time `json:"ingestionTimestamp"` + SourceChainBlockTimestamp time.Time `json:"sourceChainBlockTimestamp"` } type MessageWithMetadata struct { diff --git a/indexer/pkg/discovery/message_discovery.go b/indexer/pkg/discovery/message_discovery.go index 00003fcd3..6398bd312 100644 --- a/indexer/pkg/discovery/message_discovery.go +++ b/indexer/pkg/discovery/message_discovery.go @@ -254,9 +254,10 @@ func (a *AggregatorMessageDiscovery) callReader(ctx context.Context) (bool, erro verifierResultWithMetadata := common.VerifierResultWithMetadata{ VerifierResult: response.Data, Metadata: common.VerifierResultMetadata{ - IngestionTimestamp: ingestionTimestamp, - AttestationTimestamp: response.Data.Timestamp, - VerifierName: a.registry.GetVerifierNameFromAddress(response.Data.VerifierSourceAddress), + IngestionTimestamp: ingestionTimestamp, + AttestationTimestamp: response.Data.Timestamp, + VerifierName: a.registry.GetVerifierNameFromAddress(response.Data.VerifierSourceAddress), + SourceChainBlockTimestamp: response.Data.SourceChainBlockTimestamp, }, } diff --git a/indexer/pkg/storage/postgres.go b/indexer/pkg/storage/postgres.go index 1cf99fae6..6d3eb6b62 100644 --- a/indexer/pkg/storage/postgres.go +++ b/indexer/pkg/storage/postgres.go @@ -57,7 +57,8 @@ func (d *PostgresStorage) GetCCVData(ctx context.Context, messageID protocol.Byt ccv_data, message, message_ccv_addresses, - message_executor_address + message_executor_address, + source_chain_block_timestamp FROM indexer.verifier_results WHERE message_id = $1 ` @@ -118,7 +119,8 @@ func (d *PostgresStorage) QueryCCVData( ccv_data, message, message_ccv_addresses, - message_executor_address + message_executor_address, + source_chain_block_timestamp FROM indexer.verifier_results WHERE ingestion_timestamp >= $1 AND ingestion_timestamp <= $2 ` @@ -208,8 +210,9 @@ func (d *PostgresStorage) InsertCCVData(ctx context.Context, ccvData common.Veri ccv_data, message, message_ccv_addresses, - message_executor_address - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + message_executor_address, + source_chain_block_timestamp + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT (message_id, verifier_source_address, verifier_dest_address) DO NOTHING ` @@ -225,6 +228,7 @@ func (d *PostgresStorage) InsertCCVData(ctx context.Context, ccvData common.Veri messageJSON, pq.Array(ccvAddressesHex), ccvData.VerifierResult.MessageExecutorAddress.String(), + ccvData.VerifierResult.SourceChainBlockTimestamp, ) if err != nil { d.lggr.Errorw("Failed to insert CCV data", "error", err, "messageID", ccvData.VerifierResult.MessageID.String()) @@ -282,11 +286,12 @@ func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList [] ccv_data, message, message_ccv_addresses, - message_executor_address + message_executor_address, + source_chain_block_timestamp ) VALUES ` - args := make([]any, 0, len(ccvDataList)*11) + args := make([]any, 0, len(ccvDataList)*12) valueClauses := make([]string, 0, len(ccvDataList)) for i, ccvData := range ccvDataList { @@ -304,10 +309,10 @@ func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList [] } // Calculate parameter positions for this row - baseIdx := i * 11 - valueClause := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", + baseIdx := i * 12 + valueClause := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", baseIdx+1, baseIdx+2, baseIdx+3, baseIdx+4, baseIdx+5, baseIdx+6, - baseIdx+7, baseIdx+8, baseIdx+9, baseIdx+10, baseIdx+11) + baseIdx+7, baseIdx+8, baseIdx+9, baseIdx+10, baseIdx+11, baseIdx+12) valueClauses = append(valueClauses, valueClause) // Add arguments for this row @@ -323,6 +328,7 @@ func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList [] messageJSON, pq.Array(ccvAddressesHex), ccvData.VerifierResult.MessageExecutorAddress.String(), + ccvData.VerifierResult.SourceChainBlockTimestamp, ) } @@ -779,17 +785,18 @@ func (d *PostgresStorage) scanCCVData(row interface { }, ) (common.VerifierResultWithMetadata, error) { var ( - messageIDStr string - sourceVerifierAddrStr string - destVerifierAddrStr string - attestationTimestamp time.Time - ingestionTimestamp time.Time - sourceChainSelector uint64 - destChainSelector uint64 - ccvDataBytes []byte - messageJSON []byte - messageCCVAddressesArray []string - messageExecutorAddrStr string + messageIDStr string + sourceVerifierAddrStr string + destVerifierAddrStr string + attestationTimestamp time.Time + ingestionTimestamp time.Time + sourceChainSelector uint64 + destChainSelector uint64 + ccvDataBytes []byte + messageJSON []byte + messageCCVAddressesArray []string + messageExecutorAddrStr string + sourceChainBlockTimestamp time.Time ) err := row.Scan( @@ -804,6 +811,7 @@ func (d *PostgresStorage) scanCCVData(row interface { &messageJSON, pq.Array(&messageCCVAddressesArray), &messageExecutorAddrStr, + &sourceChainBlockTimestamp, ) if err != nil { return common.VerifierResultWithMetadata{}, fmt.Errorf("failed to scan row: %w", err) @@ -851,14 +859,15 @@ func (d *PostgresStorage) scanCCVData(row interface { return common.VerifierResultWithMetadata{ VerifierResult: protocol.VerifierResult{ - MessageID: messageID, - Message: message, - MessageCCVAddresses: messageCCVAddresses, - MessageExecutorAddress: messageExecutorAddress, - CCVData: ccvDataBytes, - Timestamp: attestationTimestamp, - VerifierSourceAddress: verifierSourceAddress, - VerifierDestAddress: verifierDestAddress, + MessageID: messageID, + Message: message, + MessageCCVAddresses: messageCCVAddresses, + MessageExecutorAddress: messageExecutorAddress, + CCVData: ccvDataBytes, + Timestamp: attestationTimestamp, + VerifierSourceAddress: verifierSourceAddress, + VerifierDestAddress: verifierDestAddress, + SourceChainBlockTimestamp: sourceChainBlockTimestamp, }, Metadata: common.VerifierResultMetadata{ AttestationTimestamp: attestationTimestamp, diff --git a/integration/pkg/sourcereader/evm_source_reader.go b/integration/pkg/sourcereader/evm_source_reader.go index 70d6c5158..4b590a947 100644 --- a/integration/pkg/sourcereader/evm_source_reader.go +++ b/integration/pkg/sourcereader/evm_source_reader.go @@ -5,7 +5,9 @@ import ( "encoding/binary" "errors" "fmt" + "math" "math/big" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -234,6 +236,15 @@ func (r *EVMSourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, continue // to next message } + // Safe conversion of BlockTimestamp from uint64 to int64 + blockTimestamp := int64(log.BlockTimestamp) + if log.BlockTimestamp > math.MaxInt64 { + r.lggr.Errorw("BlockTimestamp overflow", + "blockTimestamp", log.BlockTimestamp, + "blockNumber", log.BlockNumber) + continue // to next message + } + results = append(results, protocol.MessageSentEvent{ DestChainSelector: protocol.ChainSelector(event.DestChainSelector), SequenceNumber: event.SequenceNumber, @@ -241,6 +252,7 @@ func (r *EVMSourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, Message: *decodedMsg, Receipts: allReceipts, // Keep original order from OnRamp event BlockNumber: log.BlockNumber, + BlockTimestamp: time.Unix(blockTimestamp, 0), TxHash: protocol.ByteSlice(log.TxHash.Bytes()), }) } diff --git a/integration/storageaccess/aggregator_adapter.go b/integration/storageaccess/aggregator_adapter.go index 8fe8099d6..24aa3188b 100644 --- a/integration/storageaccess/aggregator_adapter.go +++ b/integration/storageaccess/aggregator_adapter.go @@ -32,10 +32,11 @@ func mapCCVDataToCCVNodeDataProto(ccvData protocol.VerifierNodeResult) (*pb.Writ return &pb.WriteCommitteeVerifierNodeResultRequest{ CommitteeVerifierNodeResult: &pb.CommitteeVerifierNodeResult{ - CcvVersion: ccvData.CCVVersion, - CcvAddresses: ccvAddresses, - ExecutorAddress: ccvData.ExecutorAddress[:], - Signature: ccvData.Signature[:], + CcvVersion: ccvData.CCVVersion, + CcvAddresses: ccvAddresses, + ExecutorAddress: ccvData.ExecutorAddress[:], + Signature: ccvData.Signature[:], + SourceChainBlockTimestamp: ccvData.SourceChainBlockTimestamp.UnixMilli(), Message: &pb.Message{ Version: uint32(ccvData.Message.Version), SourceChainSelector: uint64(ccvData.Message.SourceChainSelector), @@ -298,11 +299,24 @@ func mapMessage(msg *pb.Message) (protocol.Message, error) { if msg.Version > math.MaxUint8 { return protocol.Message{}, fmt.Errorf("field Version %d exceeds uint8 max", msg.Version) } + result.Version = uint8(msg.Version) if msg.OnRampAddressLength > math.MaxUint8 { return protocol.Message{}, fmt.Errorf("field OnRampAddressLength %d exceeds uint8 max", msg.OnRampAddressLength) } + + // Decode TokenTransfer if present + if msg.TokenTransferLength > 0 && len(msg.TokenTransfer) > 0 { + tt, err := protocol.DecodeTokenTransfer(msg.TokenTransfer) + if err != nil { + return protocol.Message{}, fmt.Errorf("failed to decode token transfer: %w", err) + } + result.TokenTransfer = tt + } else { + result.TokenTransfer = nil + } + result.OnRampAddressLength = uint8(msg.OnRampAddressLength) if msg.OffRampAddressLength > math.MaxUint8 { return protocol.Message{}, fmt.Errorf("field OffRampAddressLength %d exceeds uint8 max", @@ -388,23 +402,26 @@ func (a *AggregatorReader) ReadCCVData(ctx context.Context) ([]protocol.QueryRes var timestamp time.Time var verifierDestAddress protocol.UnknownAddress var verifierSourceAddress protocol.UnknownAddress + var sourceChainBlockTimestamp time.Time if result.Metadata != nil { timestamp = time.UnixMilli(result.Metadata.Timestamp) verifierDestAddress = protocol.UnknownAddress(result.Metadata.VerifierDestAddress) verifierSourceAddress = protocol.UnknownAddress(result.Metadata.VerifierSourceAddress) + sourceChainBlockTimestamp = time.UnixMilli(result.Metadata.SourceChainBlockTimestamp) } - + a.lggr.Debugw("Processed message", "messageID", messageID, "sequence", sequence, "timestamp", timestamp, "sourceChainBlockTimestamp", sourceChainBlockTimestamp) results = append(results, protocol.QueryResponse{ Timestamp: nil, Data: protocol.VerifierResult{ - MessageID: messageID, - Message: msg, - MessageCCVAddresses: messageCCVAddresses, - MessageExecutorAddress: protocol.UnknownAddress(result.MessageExecutorAddress), - CCVData: protocol.ByteSlice(result.CcvData), - Timestamp: timestamp, - VerifierDestAddress: verifierDestAddress, - VerifierSourceAddress: verifierSourceAddress, + MessageID: messageID, + Message: msg, + MessageCCVAddresses: messageCCVAddresses, + MessageExecutorAddress: protocol.UnknownAddress(result.MessageExecutorAddress), + CCVData: protocol.ByteSlice(result.CcvData), + Timestamp: timestamp, + SourceChainBlockTimestamp: sourceChainBlockTimestamp, + VerifierDestAddress: verifierDestAddress, + VerifierSourceAddress: verifierSourceAddress, }, }) } diff --git a/protocol/common_types.go b/protocol/common_types.go index fddf0472d..5bcfeb5df 100644 --- a/protocol/common_types.go +++ b/protocol/common_types.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" "strings" + "time" ) // ChainSelector represents chainlink-specific chain id. @@ -276,5 +277,6 @@ type MessageSentEvent struct { Message Message // The decoded CCIP message Receipts []ReceiptWithBlob // Verifier receipts + executor receipt BlockNumber uint64 // Block number where event occurred + BlockTimestamp time.Time // Timestamp when the block was mined TxHash ByteSlice // Transaction hash of the event } diff --git a/protocol/message_types.go b/protocol/message_types.go index 9bd7c8ba4..80b0aa8db 100644 --- a/protocol/message_types.go +++ b/protocol/message_types.go @@ -501,24 +501,26 @@ type CCV struct { // VerifierResult represents Cross-Chain Verification data (corresponds to VerifierResult proto). type VerifierResult struct { - MessageID Bytes32 `json:"message_id"` - Message Message `json:"message"` - MessageCCVAddresses []UnknownAddress `json:"message_ccv_addresses"` - MessageExecutorAddress UnknownAddress `json:"message_executor_address"` - CCVData ByteSlice `json:"ccv_data"` - Timestamp time.Time `json:"timestamp"` - VerifierSourceAddress UnknownAddress `json:"verifier_source_address"` - VerifierDestAddress UnknownAddress `json:"verifier_dest_address"` + MessageID Bytes32 `json:"message_id"` + Message Message `json:"message"` + MessageCCVAddresses []UnknownAddress `json:"message_ccv_addresses"` + MessageExecutorAddress UnknownAddress `json:"message_executor_address"` + CCVData ByteSlice `json:"ccv_data"` + Timestamp time.Time `json:"timestamp"` + VerifierSourceAddress UnknownAddress `json:"verifier_source_address"` + VerifierDestAddress UnknownAddress `json:"verifier_dest_address"` + SourceChainBlockTimestamp time.Time `json:"source_chain_block_timestamp"` } // VerifierNodeResult represents node-level verification data (corresponds to CommitteeVerifierNodeResult proto). type VerifierNodeResult struct { - MessageID Bytes32 `json:"message_id"` - Message Message `json:"message"` - CCVVersion ByteSlice `json:"ccv_version"` - CCVAddresses []UnknownAddress `json:"ccv_addresses"` - ExecutorAddress UnknownAddress `json:"executor_address"` - Signature ByteSlice `json:"signature"` + MessageID Bytes32 `json:"message_id"` + Message Message `json:"message"` + CCVVersion ByteSlice `json:"ccv_version"` + CCVAddresses []UnknownAddress `json:"ccv_addresses"` + ExecutorAddress UnknownAddress `json:"executor_address"` + Signature ByteSlice `json:"signature"` + SourceChainBlockTimestamp time.Time `json:"source_chain_block_timestamp"` } func (vr *VerifierResult) ValidateFieldsConsistent() error { diff --git a/verifier/commit/utils.go b/verifier/commit/utils.go index 0fc8a6485..7eaa7e1ee 100644 --- a/verifier/commit/utils.go +++ b/verifier/commit/utils.go @@ -43,11 +43,12 @@ func CreateVerifierNodeResult(verificationTask *verifier.VerificationTask, signa } return &protocol.VerifierNodeResult{ - MessageID: messageID, - Message: message, - CCVVersion: verifierBlob, - CCVAddresses: receiptStructure.CCVAddresses, - ExecutorAddress: receiptStructure.ExecutorAddress, - Signature: signature, + MessageID: messageID, + Message: message, + CCVVersion: verifierBlob, + CCVAddresses: receiptStructure.CCVAddresses, + ExecutorAddress: receiptStructure.ExecutorAddress, + Signature: signature, + SourceChainBlockTimestamp: verificationTask.BlockTimestamp, }, nil } diff --git a/verifier/source_reader_service.go b/verifier/source_reader_service.go index 7f3c65a11..ca6f1ffcb 100644 --- a/verifier/source_reader_service.go +++ b/verifier/source_reader_service.go @@ -270,12 +270,13 @@ func (r *SourceReaderService) processEventCycle(ctx context.Context) { continue } task := VerificationTask{ - Message: event.Message, - ReceiptBlobs: event.Receipts, - BlockNumber: event.BlockNumber, - MessageID: onchainMessageID, - TxHash: event.TxHash, - FirstSeenAt: now, + Message: event.Message, + ReceiptBlobs: event.Receipts, + BlockNumber: event.BlockNumber, + MessageID: onchainMessageID, + TxHash: event.TxHash, + FirstSeenAt: now, + BlockTimestamp: event.BlockTimestamp, } tasks = append(tasks, task) } diff --git a/verifier/types.go b/verifier/types.go index 3f08cf83d..450a7f1e5 100644 --- a/verifier/types.go +++ b/verifier/types.go @@ -9,13 +9,14 @@ import ( // VerificationTask represents the complete CCIPMessageSent event data from the onRamp/proxy. type VerificationTask struct { // TODO: Rename ReceiptBlobs to VerifierBlobs to match with onchain code. - ReceiptBlobs []protocol.ReceiptWithBlob `json:"receipt_blobs"` - MessageID string `json:"message_id"` - Message protocol.Message `json:"message"` - TxHash protocol.ByteSlice `json:"tx_hash"` - BlockNumber uint64 `json:"block_number"` // Block number when the message was included - FirstSeenAt time.Time `json:"first_seen_at"` // When message first entered the system (for E2E latency) - QueuedAt time.Time `json:"queued_at"` // When added to finality queue (for finality wait duration) + ReceiptBlobs []protocol.ReceiptWithBlob `json:"receipt_blobs"` + MessageID string `json:"message_id"` + Message protocol.Message `json:"message"` + TxHash protocol.ByteSlice `json:"tx_hash"` + BlockNumber uint64 `json:"block_number"` // Block number when the message was included + FirstSeenAt time.Time `json:"first_seen_at"` // When message first entered the system (for E2E latency) + QueuedAt time.Time `json:"queued_at"` // When added to finality queue (for finality wait duration) + BlockTimestamp time.Time `json:"block_timestamp"` // Block timestamp when the message was included on source chain } // SourceConfig contains configuration for a single source chain.