-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathsql.go
More file actions
242 lines (215 loc) · 6.59 KB
/
sql.go
File metadata and controls
242 lines (215 loc) · 6.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package sql
import (
"context"
"encoding/hex"
"fmt"
"github.com/jackc/pgx/v5"
ptypes "github.com/pokt-network/pocket/persistence/types"
coreTypes "github.com/pokt-network/pocket/shared/core/types"
)
// actorTypeToSchmeaName maps an ActorType to a PostgreSQL schema name
var actorTypeToSchemaName = map[coreTypes.ActorType]ptypes.ProtocolActorSchema{
coreTypes.ActorType_ACTOR_TYPE_APP: ptypes.ApplicationActor,
coreTypes.ActorType_ACTOR_TYPE_VAL: ptypes.ValidatorActor,
coreTypes.ActorType_ACTOR_TYPE_FISH: ptypes.FishermanActor,
coreTypes.ActorType_ACTOR_TYPE_SERVICER: ptypes.ServicerActor,
}
// GetActors is responsible for fetching the actors that have been updated at a given height.
func GetActors(
pgtx pgx.Tx,
actorType coreTypes.ActorType,
height uint64,
) ([]*coreTypes.Actor, error) {
actorSchema, ok := actorTypeToSchemaName[actorType]
if !ok {
return nil, fmt.Errorf("no schema found for actor type: %s", actorType)
}
// TECHDEBT(#813): Avoid this cast to int64
query := actorSchema.GetUpdatedAtHeightQuery(int64(height))
rows, err := pgtx.Query(context.TODO(), query)
if err != nil {
return nil, err
}
defer rows.Close()
addrs := make([][]byte, 0)
for rows.Next() {
var addr string
if err := rows.Scan(&addr); err != nil {
return nil, err
}
addrBz, err := hex.DecodeString(addr)
if err != nil {
return nil, err
}
addrs = append(addrs, addrBz)
}
actors := make([]*coreTypes.Actor, len(addrs))
for i, addr := range addrs {
// TECHDEBT(#813): Avoid this cast to int64
actor, err := getActor(pgtx, actorSchema, addr, int64(height))
if err != nil {
return nil, err
}
actors[i] = actor
}
return actors, nil
}
// GetAccountsUpdated gets the AccountSchema accounts that have been updated at height
func GetAccountsUpdated(
pgtx pgx.Tx,
acctType ptypes.ProtocolAccountSchema,
height uint64,
) ([]*coreTypes.Account, error) {
accounts := []*coreTypes.Account{}
// TECHDEBT(#813): Avoid this cast to int64
query := acctType.GetAccountsUpdatedAtHeightQuery(int64(height))
rows, err := pgtx.Query(context.TODO(), query)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
acc := new(coreTypes.Account)
if err := rows.Scan(&acc.Address, &acc.Amount); err != nil {
return nil, err
}
accounts = append(accounts, acc)
}
return accounts, nil
}
// GetPools returns the pools updated at the given height
func GetPools(pgtx pgx.Tx, height uint64) ([]*coreTypes.Account, error) {
pools, err := GetAccountsUpdated(pgtx, ptypes.Pool, height)
if err != nil {
return nil, fmt.Errorf("failed to get pools: %w", err)
}
return pools, nil
}
// GetAccounts returns the list of accounts updated at the provided height
func GetAccounts(pgtx pgx.Tx, height uint64) ([]*coreTypes.Account, error) {
accounts, err := GetAccountsUpdated(pgtx, ptypes.Account, height)
if err != nil {
return nil, fmt.Errorf("failed to get accounts: %w", err)
}
return accounts, nil
}
// GetFlags returns the set of flags updated at the given height
func GetFlags(pgtx pgx.Tx, height uint64) ([]*coreTypes.Flag, error) {
fields := "name,value,enabled"
query := fmt.Sprintf("SELECT %s FROM %s WHERE height=%d ORDER BY name ASC", fields, ptypes.FlagsTableName, height)
rows, err := pgtx.Query(context.TODO(), query)
if err != nil {
return nil, fmt.Errorf("failed to get flags: %w", err)
}
defer rows.Close()
flagSlice := []*coreTypes.Flag{}
for rows.Next() {
flag := new(coreTypes.Flag)
if err := rows.Scan(&flag.Name, &flag.Value, &flag.Enabled); err != nil {
return nil, err
}
flag.Height = int64(height)
flagSlice = append(flagSlice, flag)
}
return flagSlice, nil
}
// GetParams returns the set of params updated at the currented height
func GetParams(pgtx pgx.Tx, height uint64) ([]*coreTypes.Param, error) {
fields := "name,value"
query := fmt.Sprintf("SELECT %s FROM %s WHERE height=%d ORDER BY name ASC", fields, ptypes.ParamsTableName, height)
rows, err := pgtx.Query(context.TODO(), query)
if err != nil {
return nil, err
}
defer rows.Close()
var paramSlice []*coreTypes.Param
for rows.Next() {
param := new(coreTypes.Param)
if err := rows.Scan(¶m.Name, ¶m.Value); err != nil {
return nil, err
}
param.Height = int64(height)
paramSlice = append(paramSlice, param)
}
return paramSlice, nil
}
// GetIBCStoreUpdates returns the set of key-value pairs updated at the current height for the IBC store
func GetIBCStoreUpdates(pgtx pgx.Tx, height uint64) (keys, values [][]byte, err error) {
fields := "key,value"
query := fmt.Sprintf("SELECT %s FROM %s WHERE height=%d ORDER BY key ASC", fields, ptypes.IBCStoreTableName, height)
rows, err := pgtx.Query(context.TODO(), query)
if err != nil {
return nil, nil, err
}
defer rows.Close()
var hexKey, hexValue string
for rows.Next() {
if err := rows.Scan(&hexKey, &hexValue); err != nil {
return nil, nil, err
}
key, err := hex.DecodeString(hexKey)
if err != nil {
return nil, nil, err
}
value, err := hex.DecodeString(hexValue)
if err != nil {
return nil, nil, err
}
keys = append(keys, key)
values = append(values, value)
}
return keys, values, nil
}
func getActor(tx pgx.Tx, actorSchema ptypes.ProtocolActorSchema, address []byte, height int64) (actor *coreTypes.Actor, err error) {
ctx := context.TODO()
actor, height, err = getActorFromRow(actorSchema.GetActorType(), tx.QueryRow(ctx, actorSchema.GetQuery(hex.EncodeToString(address), height)))
if err != nil {
return
}
return getChainsForActor(ctx, tx, actorSchema, actor, height)
}
func getActorFromRow(actorType coreTypes.ActorType, row pgx.Row) (actor *coreTypes.Actor, height int64, err error) {
actor = &coreTypes.Actor{
ActorType: actorType,
}
err = row.Scan(
&actor.Address,
&actor.PublicKey,
&actor.StakedAmount,
&actor.ServiceUrl,
&actor.Output,
&actor.PausedHeight,
&actor.UnstakingHeight,
&height)
return
}
func getChainsForActor(
ctx context.Context,
tx pgx.Tx,
actorSchema ptypes.ProtocolActorSchema,
actor *coreTypes.Actor,
height int64,
) (a *coreTypes.Actor, err error) {
if actorSchema.GetChainsTableName() == "" {
return actor, nil
}
rows, err := tx.Query(ctx, actorSchema.GetChainsQuery(actor.Address, height))
if err != nil {
return actor, err
}
defer rows.Close()
var chainAddr string
var chainID string
var chainEndHeight int64
for rows.Next() {
err = rows.Scan(&chainAddr, &chainID, &chainEndHeight)
if err != nil {
return
}
if chainAddr != actor.Address {
return actor, fmt.Errorf("unexpected address %s, expected %s when reading chains", chainAddr, actor.Address)
}
actor.Chains = append(actor.Chains, chainID)
}
return actor, nil
}