Skip to content

Commit bd76b47

Browse files
feat(indexer): add to modules and implement proto fields (#22544)
1 parent 18dcdb8 commit bd76b47

File tree

74 files changed

+622
-131
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+622
-131
lines changed

baseapp/abci.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -903,8 +903,12 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin
903903
defer func() {
904904
// call the streaming service hooks with the FinalizeBlock messages
905905
for _, streamingListener := range app.streamingManager.ABCIListeners {
906-
if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); err != nil {
906+
if streamErr := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); streamErr != nil {
907907
app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err)
908+
if app.streamingManager.StopNodeOnErr {
909+
// if StopNodeOnErr is set, we should return the streamErr in order to stop the node
910+
err = streamErr
911+
}
908912
}
909913
}
910914
}()
@@ -976,12 +980,12 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
976980
rms.SetCommitHeader(header)
977981
}
978982

979-
app.cms.Commit()
980-
981983
resp := &abci.CommitResponse{
982984
RetainHeight: retainHeight,
983985
}
984986

987+
app.cms.Commit()
988+
985989
abciListeners := app.streamingManager.ABCIListeners
986990
if len(abciListeners) > 0 {
987991
ctx := app.finalizeBlockState.Context()
@@ -991,6 +995,14 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
991995
for _, abciListener := range abciListeners {
992996
if err := abciListener.ListenCommit(ctx, *resp, changeSet); err != nil {
993997
app.logger.Error("Commit listening hook failed", "height", blockHeight, "err", err)
998+
if app.streamingManager.StopNodeOnErr {
999+
err = fmt.Errorf("Commit listening hook failed: %w", err)
1000+
rollbackErr := app.cms.RollbackToVersion(blockHeight - 1)
1001+
if rollbackErr != nil {
1002+
return nil, errors.Join(err, rollbackErr)
1003+
}
1004+
return nil, err
1005+
}
9941006
}
9951007
}
9961008
}

client/v2/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module cosmossdk.io/client/v2
22

3-
go 1.23.1
3+
go 1.23.3
44

55
require (
66
cosmossdk.io/api v0.7.6

codec/collections.go

Lines changed: 244 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
package codec
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"reflect"
7+
"strings"
68

79
"github.com/cosmos/gogoproto/proto"
810
gogotypes "github.com/cosmos/gogoproto/types"
911
"google.golang.org/protobuf/encoding/protojson"
1012
protov2 "google.golang.org/protobuf/proto"
13+
"google.golang.org/protobuf/reflect/protoreflect"
14+
"google.golang.org/protobuf/types/dynamicpb"
15+
"google.golang.org/protobuf/types/known/durationpb"
16+
"google.golang.org/protobuf/types/known/timestamppb"
1117

1218
"cosmossdk.io/collections"
1319
collcodec "cosmossdk.io/collections/codec"
20+
"cosmossdk.io/schema"
1421
)
1522

1623
// BoolValue implements a ValueCodec that saves the bool value
@@ -51,12 +58,17 @@ type protoMessage[T any] interface {
5158
proto.Message
5259
}
5360

61+
type protoCollValueCodec[T any] interface {
62+
collcodec.HasSchemaCodec[T]
63+
collcodec.ValueCodec[T]
64+
}
65+
5466
// CollValue inits a collections.ValueCodec for a generic gogo protobuf message.
5567
func CollValue[T any, PT protoMessage[T]](cdc interface {
5668
Marshal(proto.Message) ([]byte, error)
5769
Unmarshal([]byte, proto.Message) error
5870
},
59-
) collcodec.ValueCodec[T] {
71+
) protoCollValueCodec[T] {
6072
return &collValue[T, PT]{cdc.(Codec), proto.MessageName(PT(new(T)))}
6173
}
6274

@@ -91,6 +103,139 @@ func (c collValue[T, PT]) ValueType() string {
91103
return "github.com/cosmos/gogoproto/" + c.messageName
92104
}
93105

106+
func (c collValue[T, PT]) SchemaCodec() (collcodec.SchemaCodec[T], error) {
107+
var (
108+
t T
109+
pt PT
110+
)
111+
msgName := proto.MessageName(pt)
112+
desc, err := proto.HybridResolver.FindDescriptorByName(protoreflect.FullName(msgName))
113+
if err != nil {
114+
return collcodec.SchemaCodec[T]{}, fmt.Errorf("could not find descriptor for %s: %w", msgName, err)
115+
}
116+
schemaFields := protoCols(desc.(protoreflect.MessageDescriptor))
117+
118+
kind := schema.KindForGoValue(t)
119+
if err := kind.Validate(); err == nil {
120+
return collcodec.SchemaCodec[T]{
121+
Fields: []schema.Field{{
122+
// we don't set any name so that this can be set to a good default by the caller
123+
Name: "",
124+
Kind: kind,
125+
}},
126+
// these can be nil because T maps directly to a schema value for this kind
127+
ToSchemaType: nil,
128+
FromSchemaType: nil,
129+
}, nil
130+
} else {
131+
return collcodec.SchemaCodec[T]{
132+
Fields: schemaFields,
133+
ToSchemaType: func(t T) (any, error) {
134+
values := []interface{}{}
135+
msgDesc, ok := desc.(protoreflect.MessageDescriptor)
136+
if !ok {
137+
return nil, fmt.Errorf("expected message descriptor, got %T", desc)
138+
}
139+
140+
nm := dynamicpb.NewMessage(msgDesc)
141+
bz, err := c.cdc.Marshal(any(&t).(PT))
142+
if err != nil {
143+
return nil, err
144+
}
145+
146+
err = c.cdc.Unmarshal(bz, nm)
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
for _, field := range schemaFields {
152+
// Find the field descriptor by the Protobuf field name
153+
fieldDesc := msgDesc.Fields().ByName(protoreflect.Name(field.Name))
154+
if fieldDesc == nil {
155+
return nil, fmt.Errorf("field %q not found in message %s", field.Name, desc.FullName())
156+
}
157+
158+
val := nm.ProtoReflect().Get(fieldDesc)
159+
160+
// if the field is a map or list, we need to convert it to a slice of values
161+
if fieldDesc.IsList() {
162+
repeatedVals := []interface{}{}
163+
list := val.List()
164+
for i := 0; i < list.Len(); i++ {
165+
repeatedVals = append(repeatedVals, list.Get(i).Interface())
166+
}
167+
values = append(values, repeatedVals)
168+
continue
169+
}
170+
171+
switch fieldDesc.Kind() {
172+
case protoreflect.BoolKind:
173+
values = append(values, val.Bool())
174+
case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind,
175+
protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind:
176+
values = append(values, val.Int())
177+
case protoreflect.Uint32Kind, protoreflect.Fixed32Kind, protoreflect.Uint64Kind,
178+
protoreflect.Fixed64Kind:
179+
values = append(values, val.Uint())
180+
case protoreflect.FloatKind, protoreflect.DoubleKind:
181+
values = append(values, val.Float())
182+
case protoreflect.StringKind:
183+
values = append(values, val.String())
184+
case protoreflect.BytesKind:
185+
values = append(values, val.Bytes())
186+
case protoreflect.EnumKind:
187+
// TODO: postgres uses the enum name, not the number
188+
values = append(values, string(fieldDesc.Enum().Values().ByNumber(val.Enum()).Name()))
189+
case protoreflect.MessageKind:
190+
msg := val.Interface().(*dynamicpb.Message)
191+
msgbz, err := c.cdc.Marshal(msg)
192+
if err != nil {
193+
return nil, err
194+
}
195+
196+
if field.Kind == schema.TimeKind {
197+
// make it a time.Time
198+
ts := &timestamppb.Timestamp{}
199+
err = c.cdc.Unmarshal(msgbz, ts)
200+
if err != nil {
201+
return nil, fmt.Errorf("error unmarshalling timestamp: %w %x %s", err, msgbz, fieldDesc.FullName())
202+
}
203+
values = append(values, ts.AsTime())
204+
} else if field.Kind == schema.DurationKind {
205+
// make it a time.Duration
206+
dur := &durationpb.Duration{}
207+
err = c.cdc.Unmarshal(msgbz, dur)
208+
if err != nil {
209+
return nil, fmt.Errorf("error unmarshalling duration: %w", err)
210+
}
211+
values = append(values, dur.AsDuration())
212+
} else {
213+
// if not a time or duration, just keep it as a JSON object
214+
// we might want to change this to include the entire object as separate fields
215+
bz, err := c.cdc.MarshalJSON(msg)
216+
if err != nil {
217+
return nil, fmt.Errorf("error marshaling message: %w", err)
218+
}
219+
220+
values = append(values, json.RawMessage(bz))
221+
}
222+
}
223+
224+
}
225+
226+
// if there's only one value, return it directly
227+
if len(values) == 1 {
228+
return values[0], nil
229+
}
230+
return values, nil
231+
},
232+
FromSchemaType: func(a any) (T, error) {
233+
panic("not implemented")
234+
},
235+
}, nil
236+
}
237+
}
238+
94239
type protoMessageV2[T any] interface {
95240
*T
96241
protov2.Message
@@ -179,3 +324,101 @@ func (c collInterfaceValue[T]) ValueType() string {
179324
var t T
180325
return fmt.Sprintf("%T", t)
181326
}
327+
328+
// SchemaCodec returns a schema codec, which will always have a single JSON field
329+
// as there is no way to know in advance the necessary fields for an interface.
330+
func (c collInterfaceValue[T]) SchemaCodec() (collcodec.SchemaCodec[T], error) {
331+
var pt T
332+
333+
kind := schema.KindForGoValue(pt)
334+
if err := kind.Validate(); err == nil {
335+
return collcodec.SchemaCodec[T]{
336+
Fields: []schema.Field{{
337+
// we don't set any name so that this can be set to a good default by the caller
338+
Name: "",
339+
Kind: kind,
340+
}},
341+
// these can be nil because T maps directly to a schema value for this kind
342+
ToSchemaType: nil,
343+
FromSchemaType: nil,
344+
}, nil
345+
} else {
346+
return collcodec.SchemaCodec[T]{
347+
Fields: []schema.Field{{
348+
Name: "value",
349+
Kind: schema.JSONKind,
350+
}},
351+
ToSchemaType: func(t T) (any, error) {
352+
bz, err := c.codec.MarshalInterfaceJSON(t)
353+
if err != nil {
354+
return nil, err
355+
}
356+
357+
return json.RawMessage(bz), nil
358+
},
359+
FromSchemaType: func(a any) (T, error) {
360+
panic("not implemented")
361+
},
362+
}, nil
363+
}
364+
}
365+
366+
func protoCols(desc protoreflect.MessageDescriptor) []schema.Field {
367+
nFields := desc.Fields()
368+
cols := make([]schema.Field, 0, nFields.Len())
369+
for i := 0; i < nFields.Len(); i++ {
370+
f := nFields.Get(i)
371+
cols = append(cols, protoCol(f))
372+
}
373+
return cols
374+
}
375+
376+
func protoCol(f protoreflect.FieldDescriptor) schema.Field {
377+
col := schema.Field{Name: string(f.Name())}
378+
if f.IsMap() || f.IsList() {
379+
col.Kind = schema.JSONKind
380+
col.Nullable = true
381+
} else {
382+
switch f.Kind() {
383+
case protoreflect.BoolKind:
384+
col.Kind = schema.BoolKind
385+
case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind:
386+
col.Kind = schema.Int32Kind
387+
case protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind:
388+
col.Kind = schema.Int64Kind
389+
case protoreflect.Uint32Kind, protoreflect.Fixed32Kind:
390+
col.Kind = schema.Int64Kind
391+
case protoreflect.Uint64Kind, protoreflect.Fixed64Kind:
392+
col.Kind = schema.Uint64Kind
393+
case protoreflect.FloatKind:
394+
col.Kind = schema.Float32Kind
395+
case protoreflect.DoubleKind:
396+
col.Kind = schema.Float64Kind
397+
case protoreflect.StringKind:
398+
col.Kind = schema.StringKind
399+
case protoreflect.BytesKind:
400+
col.Kind = schema.BytesKind
401+
case protoreflect.EnumKind:
402+
// TODO: support enums
403+
col.Kind = schema.EnumKind
404+
// use the full name to avoid collissions
405+
col.ReferencedType = string(f.Enum().FullName())
406+
col.ReferencedType = strings.ReplaceAll(col.ReferencedType, ".", "_")
407+
case protoreflect.MessageKind:
408+
col.Nullable = true
409+
fullName := f.Message().FullName()
410+
if fullName == "google.protobuf.Timestamp" {
411+
col.Kind = schema.TimeKind
412+
} else if fullName == "google.protobuf.Duration" {
413+
col.Kind = schema.DurationKind
414+
} else {
415+
col.Kind = schema.JSONKind
416+
}
417+
}
418+
if f.HasPresence() {
419+
col.Nullable = true
420+
}
421+
}
422+
423+
return col
424+
}

collections/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
4141
* [#21090](https://github.com/cosmos/cosmos-sdk/pull/21090) Introduces `Quad`, a composite key with four keys.
4242
* [#20704](https://github.com/cosmos/cosmos-sdk/pull/20704) Add `ModuleCodec` method to `Schema` and `HasSchemaCodec` interface in order to support `cosmossdk.io/schema` compatible indexing.
4343
* [#20538](https://github.com/cosmos/cosmos-sdk/pull/20538) Add `Nameable` variations to `KeyCodec` and `ValueCodec` to allow for better indexing of `collections` types.
44+
* [#22544](https://github.com/cosmos/cosmos-sdk/pull/22544) Schema's `ModuleCodec` will now also return Enum descriptors to be registered with the indexer.
4445

4546
## [v0.4.0](https://github.com/cosmos/cosmos-sdk/releases/tag/collections%2Fv0.4.0)
4647

collections/go.mod

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
module cosmossdk.io/collections
22

3-
go 1.23
3+
go 1.23.2
44

55
require (
66
cosmossdk.io/core v1.0.0-alpha.6
77
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29
88
cosmossdk.io/schema v0.3.0
9+
github.com/cosmos/gogoproto v1.7.0
910
github.com/stretchr/testify v1.10.0
1011
github.com/tidwall/btree v1.7.0
12+
google.golang.org/protobuf v1.35.2
1113
pgregory.net/rapid v1.1.0
1214
)
1315

1416
require (
1517
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
16-
github.com/pmezard/go-difflib v1.0.0 // indirect
18+
github.com/golang/protobuf v1.5.4 // indirect
19+
github.com/google/go-cmp v0.6.0 // indirect
20+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
1721
gopkg.in/yaml.v3 v3.0.1 // indirect
1822
)

collections/go.sum

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,22 @@ cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 h1:NxxUo0GMJUbIuVg0
44
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29/go.mod h1:8s2tPeJtSiQuoyPmr2Ag7meikonISO4Fv4MoO8+ORrs=
55
cosmossdk.io/schema v0.3.0 h1:01lcaM4trhzZ1HQTfTV8z6Ma1GziOZ/YmdzBN3F720c=
66
cosmossdk.io/schema v0.3.0/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
7+
github.com/cosmos/gogoproto v1.7.0 h1:79USr0oyXAbxg3rspGh/m4SWNyoz/GLaAh0QlCe2fro=
8+
github.com/cosmos/gogoproto v1.7.0/go.mod h1:yWChEv5IUEYURQasfyBW5ffkMHR/90hiHgbNgrtp4j0=
79
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
810
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9-
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
10-
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
11+
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
12+
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
13+
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
14+
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
15+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
16+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
1117
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
1218
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
1319
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
1420
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
21+
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
22+
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
1523
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
1624
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
1725
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

0 commit comments

Comments
 (0)