Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s

## Unreleased

### Added

- Add `pb/firehose/options.proto` defining `(firehose.transactions)` and `(firehose.nondeterministic)` custom `FieldOptions` extensions for annotating Block proto fields with semantic meaning (transaction list and non-deterministic values).
- Add `fireproto` package providing `WalkNonDeterministicFields`, `ClearNonDeterministicFields`, and `FindTransactionsField` utilities for walking and manipulating proto fields annotated with the `(firehose.nondeterministic)` and `(firehose.transactions)` field extensions.

### Changed

- Bumped `dstore`: S3 store now suppresses the SDK's checksum validation warnings (sets `DisableLogOutputChecksumValidationSkipped` to `true`) and updates the AWS S3 SDK to a newer version.
Expand Down
37 changes: 37 additions & 0 deletions fireproto/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Package fireproto provides utilities for working with Firehose-annotated
// protobuf messages using custom field extensions defined in the firehose proto
// package (pb/firehose/options.proto).
//
// # Field Annotations
//
// The firehose proto package defines two FieldOptions extensions that can be
// applied to fields in any Block protobuf message:
//
// - (firehose.transactions) — marks the repeated field that holds the block's
// transaction list, enabling generic tooling to locate transactions without
// chain-specific knowledge.
//
// - (firehose.nondeterministic) — marks fields whose values may differ between
// nodes (e.g. gas estimates, fee calculations, timing metadata). These fields
// are candidates for clearing when performing deterministic block comparisons
// or diffing block content across nodes.
//
// # Usage
//
// Walk all non-deterministic fields in a block:
//
// fireproto.WalkNonDeterministicFields(block, func(msg protoreflect.Message, field protoreflect.FieldDescriptor) {
// fmt.Printf("non-deterministic: %s.%s\n", msg.Descriptor().Name(), field.Name())
// })
//
// Clear all non-deterministic fields before comparison:
//
// fireproto.ClearNonDeterministicFields(block)
//
// Find the transactions field on a block:
//
// fd := fireproto.FindTransactionsField(block)
// if fd != nil {
// txs := block.ProtoReflect().Get(fd).List()
// }
package fireproto
9 changes: 9 additions & 0 deletions fireproto/log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package fireproto_test

import "github.com/streamingfast/logging"

var zlogTest, tracerTest = logging.PackageLogger("fireproto_test", "github.com/streamingfast/firehose-core/fireproto/test")

func init() {
logging.InstantiateLoggers()
}
54 changes: 54 additions & 0 deletions fireproto/walker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package fireproto

import (
pbfirehose "github.com/streamingfast/firehose-core/pb/firehose"
"github.com/streamingfast/protox"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)

// WalkNonDeterministicFields walks the proto.Message tree rooted at root and calls fn
// for each field tagged with (firehose.nondeterministic) = true.
// fn receives the containing message instance and the field descriptor.
//
// fn is called for every non-deterministic field in the message type regardless of
// whether the field is currently populated in the instance; clearing an unset field
// is a no-op, so this is safe to use with ClearNonDeterministicFields.
//
// Well-known Google types (google.protobuf.*) are not recursed into.
// Self-referential message types are handled via DFS cycle prevention in the walker.
func WalkNonDeterministicFields(root proto.Message, fn func(msg protoreflect.Message, field protoreflect.FieldDescriptor)) {
for msg, field := range protox.WalkMessageInstanceFields(root.ProtoReflect(), nil) {
if isNonDeterministic(field) {
fn(msg, field)
}
}
}

// ClearNonDeterministicFields clears all fields tagged with (firehose.nondeterministic) = true
// in the message tree rooted at root. Fields are set to their zero value.
func ClearNonDeterministicFields(root proto.Message) {
WalkNonDeterministicFields(root, func(msg protoreflect.Message, field protoreflect.FieldDescriptor) {
msg.Clear(field)
})
}

// FindTransactionsField returns the first field descriptor in root's message type
// tagged with (firehose.transactions) = true, or nil if none is tagged.
// Only the top-level fields of root's message type are examined; nested messages
// are not searched.
func FindTransactionsField(root proto.Message) protoreflect.FieldDescriptor {
fields := root.ProtoReflect().Descriptor().Fields()
for i := range fields.Len() {
field := fields.Get(i)
if v, ok := protox.GetFieldExtensionValue[bool](field, pbfirehose.E_Transactions, false); ok && v {
return field
}
}
return nil
}

func isNonDeterministic(field protoreflect.FieldDescriptor) bool {
v, ok := protox.GetFieldExtensionValue[bool](field, pbfirehose.E_Nondeterministic, false)
return ok && v
}
173 changes: 173 additions & 0 deletions fireproto/walker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package fireproto_test

import (
"testing"

"github.com/streamingfast/firehose-core/fireproto"
pbfirehose "github.com/streamingfast/firehose-core/pb/firehose"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

// buildAnnotatedDescriptors builds dynamic proto descriptors:
//
// Block { repeated Tx transactions = 1 [(firehose.transactions)=true]; bytes gas = 2 [(firehose.nondeterministic)=true]; }
// Tx { string hash = 1; bytes fee = 2 [(firehose.nondeterministic)=true]; }
func buildAnnotatedDescriptors(t *testing.T) (blockType protoreflect.MessageType, txType protoreflect.MessageType) {
t.Helper()

txFDP := &descriptorpb.FileDescriptorProto{
Name: new("fireproto_test_tx.proto"),
Syntax: new("proto3"),
Package: new("fireproto.test"),
Options: &descriptorpb.FileOptions{GoPackage: new("fireproto/test;fireprototest")},
MessageType: []*descriptorpb.DescriptorProto{
{
Name: new("Tx"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: new("hash"),
Number: new(int32(1)),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
JsonName: new("hash"),
},
{
Name: new("fee"),
Number: new(int32(2)),
Type: descriptorpb.FieldDescriptorProto_TYPE_BYTES.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
JsonName: new("fee"),
Comment on lines +25 to +46
Options: func() *descriptorpb.FieldOptions {
opts := &descriptorpb.FieldOptions{}
proto.SetExtension(opts, pbfirehose.E_Nondeterministic, true)
return opts
}(),
},
},
},
},
}

blockFDP := &descriptorpb.FileDescriptorProto{
Name: new("fireproto_test_block.proto"),
Syntax: new("proto3"),
Package: new("fireproto.test"),
Dependency: []string{"fireproto_test_tx.proto"},
Options: &descriptorpb.FileOptions{GoPackage: new("fireproto/test;fireprototest")},
Comment on lines +59 to +63
MessageType: []*descriptorpb.DescriptorProto{
{
Name: new("Block"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: new("transactions"),
Number: new(int32(1)),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum(),
TypeName: new(".fireproto.test.Tx"),
JsonName: new("transactions"),
Comment on lines +65 to +74
Options: func() *descriptorpb.FieldOptions {
opts := &descriptorpb.FieldOptions{}
proto.SetExtension(opts, pbfirehose.E_Transactions, true)
return opts
}(),
},
{
Name: new("gas"),
Number: new(int32(2)),
Type: descriptorpb.FieldDescriptorProto_TYPE_BYTES.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
JsonName: new("gas"),
Comment on lines +81 to +86

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THis is a new Golang feature enabling creating pointer directly to literal types, added in Golang 1.26, aren't we compatible already?

Options: func() *descriptorpb.FieldOptions {
opts := &descriptorpb.FieldOptions{}
proto.SetExtension(opts, pbfirehose.E_Nondeterministic, true)
return opts
}(),
},
},
},
},
}

files, err := protodesc.NewFiles(&descriptorpb.FileDescriptorSet{
File: []*descriptorpb.FileDescriptorProto{txFDP, blockFDP},
})
require.NoError(t, err)

txDesc, err := files.FindDescriptorByName("fireproto.test.Tx")
require.NoError(t, err)
blockDesc, err := files.FindDescriptorByName("fireproto.test.Block")
require.NoError(t, err)

return dynamicpb.NewMessageType(blockDesc.(protoreflect.MessageDescriptor)),
dynamicpb.NewMessageType(txDesc.(protoreflect.MessageDescriptor))
}

func TestFindTransactionsField(t *testing.T) {
blockType, _ := buildAnnotatedDescriptors(t)
block := blockType.New().Interface()

fd := fireproto.FindTransactionsField(block)
require.NotNil(t, fd)
assert.Equal(t, protoreflect.Name("transactions"), fd.Name())
}

func TestFindTransactionsField_none(t *testing.T) {
ts := &timestamppb.Timestamp{Seconds: 1}
assert.Nil(t, fireproto.FindTransactionsField(ts))
}

func TestWalkNonDeterministicFields(t *testing.T) {
blockType, txType := buildAnnotatedDescriptors(t)

block := blockType.New()
tx1 := txType.New()
tx1.Set(tx1.Descriptor().Fields().ByName("hash"), protoreflect.ValueOfString("0xabc"))
tx1.Set(tx1.Descriptor().Fields().ByName("fee"), protoreflect.ValueOfBytes([]byte{1, 2, 3}))
block.Mutable(block.Descriptor().Fields().ByName("transactions")).List().Append(protoreflect.ValueOfMessage(tx1))
block.Set(block.Descriptor().Fields().ByName("gas"), protoreflect.ValueOfBytes([]byte{0xff}))

type result struct{ parent, field string }
var got []result
fireproto.WalkNonDeterministicFields(block.Interface(), func(msg protoreflect.Message, field protoreflect.FieldDescriptor) {
got = append(got, result{string(msg.Descriptor().Name()), string(field.Name())})
})

assert.ElementsMatch(t, []result{
{parent: "Block", field: "gas"},
{parent: "Tx", field: "fee"},
}, got)
}

func TestWalkNonDeterministicFields_noAnnotations(t *testing.T) {
ts := &timestamppb.Timestamp{Seconds: 100}
var visited []string
fireproto.WalkNonDeterministicFields(ts, func(msg protoreflect.Message, field protoreflect.FieldDescriptor) {
visited = append(visited, string(field.Name()))
})
assert.Empty(t, visited)
}

func TestClearNonDeterministicFields(t *testing.T) {
blockType, txType := buildAnnotatedDescriptors(t)

block := blockType.New()
tx1 := txType.New()
tx1.Set(tx1.Descriptor().Fields().ByName("fee"), protoreflect.ValueOfBytes([]byte{1, 2, 3}))
block.Mutable(block.Descriptor().Fields().ByName("transactions")).List().Append(protoreflect.ValueOfMessage(tx1))
block.Set(block.Descriptor().Fields().ByName("gas"), protoreflect.ValueOfBytes([]byte{0xff}))

fireproto.ClearNonDeterministicFields(block.Interface())

assert.False(t, block.Has(block.Descriptor().Fields().ByName("gas")), "gas should be cleared")
tx := block.Get(block.Descriptor().Fields().ByName("transactions")).List().Get(0).Message()
assert.False(t, tx.Has(tx.Descriptor().Fields().ByName("fee")), "fee should be cleared")
// hash was never set, just verify it's still unset
assert.False(t, tx.Has(tx.Descriptor().Fields().ByName("hash")))
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ require (
github.com/streamingfast/logging v1.2.2
github.com/streamingfast/payment-gateway v0.0.0-20260527144655-d0576d2a4ee3
github.com/streamingfast/pbgo v0.0.6-0.20260206150405-2b95acf70437
github.com/streamingfast/protox v0.0.0-20260610151242-cd8a8cf98c45
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0
github.com/streamingfast/substreams v1.18.6-0.20260610195629-c3f50ce6cf6d
github.com/stretchr/testify v1.11.1
github.com/test-go/testify v1.1.4
github.com/testcontainers/testcontainers-go v0.40.0
go.opentelemetry.io/otel v1.43.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.1
go.uber.org/zap v1.28.0
golang.org/x/exp v0.0.0-20250813145105-42675adae3e6
google.golang.org/grpc v1.80.0
google.golang.org/protobuf v1.36.11
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2357,6 +2357,8 @@ github.com/streamingfast/pbgo v0.0.6-0.20260206150405-2b95acf70437 h1:BsGEgwjlIJ
github.com/streamingfast/pbgo v0.0.6-0.20260206150405-2b95acf70437/go.mod h1:MHb9nR5zyNyvA1Zi2wud+hWpoILpll7vpLUijvh3YV0=
github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d h1:33VIARqUqBUKXJcuQoOS1rVSms54tgxhhNCmrLptpLg=
github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d/go.mod h1:aBJivEdekmFWYSQ29EE/fN9IanJWJXbtjy3ky0XD/jE=
github.com/streamingfast/protox v0.0.0-20260610151242-cd8a8cf98c45 h1:hVLYp14KlfW9mnpNQoTXbHExUWHrX0A7Ey6z/veC5IE=
github.com/streamingfast/protox v0.0.0-20260610151242-cd8a8cf98c45/go.mod h1:/9DK62G+3E3hKwSMCs9vc+sNCDJBnhCxbb+KnZcIRDQ=
github.com/streamingfast/sf-tracing v0.0.0-20251218140752-bafd5572499f h1:KXE5MEXwc7l6FwD9dBYH0r+0IojAS9VYZa/vuzCBnK0=
github.com/streamingfast/sf-tracing v0.0.0-20251218140752-bafd5572499f/go.mod h1:H57LMxdkHi0MZ+n7xr7zo9PJv7kZZ38du9w0SizLcFU=
github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAtyaTOgs=
Expand Down Expand Up @@ -2521,8 +2523,8 @@ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.14.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.uber.org/zap v1.28.0 h1:IZzaP1Fv73/T/pBMLk4VutPl36uNC+OSUh3JLG3FIjo=
go.uber.org/zap v1.28.0/go.mod h1:rDLpOi171uODNm/mxFcuYWxDsqWSAVkFdX4XojSKg/Q=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
Expand Down
Loading
Loading