-
Notifications
You must be signed in to change notification settings - Fork 4.1k
feat(indexer/postgres)!: add basic support for header, txs and events #22695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
af5d2fd
6bf2f3e
4ac5dfe
8dec19f
f7985a4
e29f0ff
7d0333e
a0469e8
bccad4f
689c530
9a575f0
9ee52c6
85b6ebe
a7dabcc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| package postgres | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "fmt" | ||
|
|
||
| "cosmossdk.io/schema/appdata" | ||
|
|
@@ -81,5 +82,72 @@ func (i *indexerImpl) listener() appdata.Listener { | |
| i.tx, err = i.db.BeginTx(i.ctx, nil) | ||
| return nil, err | ||
| }, | ||
| OnTx: txListener(i), | ||
| OnEvent: eventListener(i), | ||
| } | ||
| } | ||
|
|
||
| func txListener(i *indexerImpl) func(data appdata.TxData) error { | ||
| return func(td appdata.TxData) error { | ||
| var bz []byte | ||
| if td.Bytes != nil { | ||
| var err error | ||
| bz, err = td.Bytes() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| var jsonData json.RawMessage | ||
| if td.JSON != nil { | ||
| var err error | ||
| jsonData, err = td.JSON() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| _, err := i.tx.Exec("INSERT INTO tx (block_number, index_in_block, data, bytes) VALUES ($1, $2, $3, $4)", | ||
| td.BlockNumber, td.TxIndex, jsonData, bz) | ||
|
|
||
| return err | ||
| } | ||
| } | ||
|
|
||
| func eventListener(i *indexerImpl) func(data appdata.EventData) error { | ||
| return func(data appdata.EventData) error { | ||
| for _, e := range data.Events { | ||
| var jsonData json.RawMessage | ||
|
|
||
| if e.Data != nil { | ||
| var err error | ||
| jsonData, err = e.Data() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get event data: %w", err) | ||
| } | ||
| } else if e.Attributes != nil { | ||
| attrs, err := e.Attributes() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get event attributes: %w", err) | ||
| } | ||
|
|
||
| attrsMap := map[string]interface{}{} | ||
| for _, attr := range attrs { | ||
| attrsMap[attr.Key] = attr.Value | ||
| } | ||
|
Comment on lines
+134
to
+137
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Preserve all event attributes by handling duplicate keys appropriately Currently, attributes with duplicate keys may overwrite previous entries due to the use of a map. This can lead to loss of data for events that have multiple attributes with the same key. To ensure all attributes are preserved, store them as a slice of key-value pairs instead. Apply this diff to modify the handling of attributes: -func (e *eventData) Attributes() ([]EventAttribute, error) {
- attrsMap := map[string]interface{}{}
- for _, attr := range attrs {
- attrsMap[attr.Key] = attr.Value
- }
- jsonAttributes, err = json.Marshal(attrsMap)
+func (e *eventData) Attributes() ([]EventAttribute, error) {
+ attrsSlice := []map[string]interface{}{}
+ for _, attr := range attrs {
+ attrsSlice = append(attrsSlice, map[string]interface{}{
+ "key": attr.Key,
+ "value": attr.Value,
+ })
+ }
+ jsonAttributes, err = json.Marshal(attrsSlice)
if err != nil {
return fmt.Errorf("failed to marshal event attributes: %w", err)
}
}
|
||
|
|
||
| jsonData, err = json.Marshal(attrsMap) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to marshal event attributes: %w", err) | ||
| } | ||
| } | ||
|
|
||
| _, err := i.tx.Exec("INSERT INTO event (block_number, block_stage, tx_index, msg_index, event_index, type, data) VALUES ($1, $2, $3, $4, $5, $6, $7)", | ||
| e.BlockNumber, e.BlockStage, e.TxIndex, e.MsgIndex, e.EventIndex, e.Type, jsonData) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to index event: %w", err) | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -94,6 +94,11 @@ func (a *App[T]) SchemaDecoderResolver() decoding.DecoderResolver { | |||||||||||||||||||||||||
| for moduleName, module := range a.moduleManager.Modules() { | ||||||||||||||||||||||||||
| moduleSet[moduleName] = module | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| for _, overrideKey := range a.config.OverrideStoreKeys { | ||||||||||||||||||||||||||
| moduleSet[overrideKey.KvStoreKey] = moduleSet[overrideKey.ModuleName] | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+97
to
+100
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Validate module existence before applying overrides The current implementation assumes that the module specified in for _, overrideKey := range a.config.OverrideStoreKeys {
+ module, exists := moduleSet[overrideKey.ModuleName]
+ if !exists {
+ return nil, fmt.Errorf("module %s not found for store key override %s",
+ overrideKey.ModuleName, overrideKey.KvStoreKey)
+ }
- moduleSet[overrideKey.KvStoreKey] = moduleSet[overrideKey.ModuleName]
+ moduleSet[overrideKey.KvStoreKey] = module
}📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| return decoding.ModuleSetDecoderResolver(moduleSet) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -538,7 +538,7 @@ func (c *consensus[T]) FinalizeBlock( | |
| events = append(events, resp.EndBlockEvents...) | ||
|
|
||
| // listen to state streaming changes in accordance with the block | ||
| err = c.streamDeliverBlockChanges(ctx, req.Height, req.Txs, resp.TxResults, events, stateChanges) | ||
| err = c.streamDeliverBlockChanges(ctx, req.Height, req.Txs, decodedTxs, resp.TxResults, events, stateChanges) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
@@ -575,7 +575,7 @@ func (c *consensus[T]) internalFinalizeBlock( | |
| // TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case, | ||
| // considering that prepare and process always decode txs, assuming they're the ones providing txs we should never | ||
| // have a tx that fails decoding. | ||
| decodedTxs, err := decodeTxs(req.Txs, c.txCodec) | ||
| decodedTxs, err := decodeTxs(c.logger, req.Txs, c.txCodec) | ||
| if err != nil { | ||
| return nil, nil, nil, err | ||
| } | ||
|
Comment on lines
+593
to
596
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ineffective error handling in The Apply this diff to modify the error handling: func decodeTxs[T transaction.Tx](logger log.Logger, rawTxs [][]byte, codec transaction.Codec[T]) ([]T, error) {
txs := make([]T, 0, len(rawTxs))
for _, rawTx := range rawTxs {
tx, err := codec.Decode(rawTx)
if err != nil {
- // do not return an error here, as we want to deliver the block even if some txs are invalid
- logger.Debug("failed to decode tx", "err", err)
+ // skip invalid transactions
+ logger.Error("failed to decode tx", "err", err)
+ continue
}
- txs[i] = tx
+ txs = append(txs, tx)
}
return txs, nil
} |
||
|
|
@@ -708,12 +708,13 @@ func (c *consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVote | |
| return resp, err | ||
| } | ||
|
|
||
| func decodeTxs[T transaction.Tx](rawTxs [][]byte, codec transaction.Codec[T]) ([]T, error) { | ||
| func decodeTxs[T transaction.Tx](logger log.Logger, rawTxs [][]byte, codec transaction.Codec[T]) ([]T, error) { | ||
| txs := make([]T, len(rawTxs)) | ||
| for i, rawTx := range rawTxs { | ||
| tx, err := codec.Decode(rawTx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("unable to decode tx: %d: %w", i, err) | ||
| // do not return an error here, as we want to deliver the block even if some txs are invalid | ||
| logger.Debug("failed to decode tx", "err", err) | ||
| } | ||
| txs[i] = tx | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.