-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathqueue.go
More file actions
252 lines (212 loc) · 6.2 KB
/
queue.go
File metadata and controls
252 lines (212 loc) · 6.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
package queue
import (
"bytes"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/JackalLabs/sequoia/config"
storageTypes "github.com/jackalLabs/canine-chain/v5/x/storage/types"
"github.com/cosmos/cosmos-sdk/types"
walletTypes "github.com/desmos-labs/cosmos-go-wallet/types"
"github.com/desmos-labs/cosmos-go-wallet/wallet"
"github.com/rs/zerolog/log"
"golang.org/x/time/rate"
)
// Rate limiter defaults are provided by config.DefaultRateLimitPerTokenMs and config.DefaultRateLimitBurst
func calculateTransactionSize(messages []types.Msg) (int64, error) {
if len(messages) == 0 {
return 0, nil
}
// Estimate transaction size based on message types and content
// This is an approximation since we can't easily get the exact transaction size
var totalSize int64 = 0
// Add base transaction overhead (signature, fee, gas, etc.)
var baseOverhead int64 = 500
for _, msg := range messages {
// Estimate size based on message type
switch m := msg.(type) {
case *storageTypes.MsgPostProof:
// Estimate size for MsgPostProof based on its fields
size := int64(len(m.Creator)) + int64(len(m.Item)) + int64(len(m.HashList)) +
int64(len(m.Merkle)) + int64(len(m.Owner)) + 16 // 16 bytes for Start field
totalSize += size
default:
// For other message types, use a conservative estimate
totalSize += 500 // Default estimate for unknown message types
}
}
// Add some additional overhead for transaction structure
return totalSize + baseOverhead, nil
}
func (m *Message) Done() {
m.wg.Done()
}
func NewQueue(w *wallet.Wallet, interval uint64, maxSizeBytes int64, domain string, rlCfg config.RateLimitConfig) *Queue {
if maxSizeBytes == 0 {
maxSizeBytes = config.DefaultMaxSizeBytes()
}
if rlCfg.PerTokenMs == 0 {
rlCfg.PerTokenMs = config.DefaultRateLimitConfig().PerTokenMs
}
if rlCfg.Burst == 0 {
rlCfg.Burst = config.DefaultRateLimitConfig().Burst
}
q := &Queue{
wallet: w,
messages: make([]*Message, 0),
processed: time.Now(),
running: false,
interval: interval,
maxSizeBytes: maxSizeBytes,
domain: domain,
limiter: rate.NewLimiter(rate.Every(time.Duration(rlCfg.PerTokenMs)*time.Millisecond), rlCfg.Burst),
}
return q
}
func (q *Queue) Add(msg types.Msg) (*Message, *sync.WaitGroup) {
var wg sync.WaitGroup
m := &Message{
msg: msg,
wg: &wg,
err: nil,
}
proofMessage, ok := msg.(*storageTypes.MsgPostProof)
if ok {
for _, message := range q.messages {
if message == nil {
continue
}
if message.msg == nil {
continue
}
queueMessage, ok := message.msg.(*storageTypes.MsgPostProof)
if !ok {
continue
}
if bytes.Equal(
queueMessage.Merkle, proofMessage.Merkle) &&
queueMessage.Start == proofMessage.Start &&
queueMessage.Owner == proofMessage.Owner {
m.msgIndex = -1
return m, &wg
}
}
}
wg.Add(1)
q.messages = append(q.messages, m) // adding the message to the end of the list
return m, &wg
}
func (q *Queue) Stop() {
q.running = false
}
func (q *Queue) Listen() {
q.running = true
defer log.Info().Msg("Queue module stopped")
log.Info().Msg("Queue module started")
for q.running {
time.Sleep(time.Millisecond * 100) // pauses for one third of a second
if !q.processed.Add(time.Second * time.Duration(q.interval+2)).Before(time.Now()) { // minimum wait for 2 seconds
continue
}
// Update gauge and attempt a broadcast cycle
total := len(q.messages)
queueSize.Set(float64(total))
if total == 0 { // skipping this queue cycle if there is no messages to be pushed
continue
}
// Token-bucket rate limit: allow calling BroadcastPending at most 20 times per 6 seconds
if !q.limiter.Allow() {
continue
}
now := time.Now()
_, _ = q.BroadcastPending()
q.processed = now
}
}
// BroadcastPending selects a batch that fits within max size, broadcasts it,
// updates per-message results, and returns the number of messages processed
// along with a terminal error if the broadcast attempts all failed.
func (q *Queue) BroadcastPending() (int, error) {
total := len(q.messages)
log.Info().Msg(fmt.Sprintf("Queue: %d messages waiting to be put on-chain...", total))
msgs := make([]types.Msg, 0)
cutoff := 0
for i := 0; i < total; i++ {
msgs = append(msgs, q.messages[i].msg)
size, err := calculateTransactionSize(msgs)
if err != nil {
log.Warn().Err(err).Msg("Failed to calculate transaction size")
break
}
if size > q.maxSizeBytes {
break
}
cutoff = i + 1 // cutoff is now the count of messages that fit
}
// If nothing fits, process at least the first 45 messages or the total number of messages if less than 45
if cutoff == 0 {
cutoff = 45
}
if cutoff > total {
cutoff = total
}
log.Info().Msg(fmt.Sprintf("Queue: Posting %d messages to chain...", cutoff))
toProcess := q.messages[:cutoff]
q.messages = q.messages[cutoff:]
allMsgs := make([]types.Msg, len(toProcess))
for i, process := range toProcess {
allMsgs[i] = process.msg
}
data := walletTypes.NewTransactionData(
allMsgs...,
).WithGasAuto().WithFeeAuto().WithMemo(fmt.Sprintf("Proven by %s", q.domain))
complete := false
var res *types.TxResponse
var err error
var i int
for !complete && i < 10 {
i++
res, err = q.wallet.BroadcastTxSync(data)
if err != nil {
if strings.Contains(err.Error(), "tx already exists in cache") {
if data.Sequence != nil {
data = data.WithSequence(*data.Sequence + 1)
continue
}
}
if strings.Contains(err.Error(), "mempool is full") {
log.Info().Msg("Mempool is full, waiting for 5 minutes before trying again")
time.Sleep(time.Minute * 5)
continue
}
log.Warn().Err(err).Msg("tx broadcast failed from queue")
continue
}
if res != nil {
if res.Code != 0 {
if strings.Contains(res.RawLog, "account sequence mismatch") {
if data.Sequence != nil {
data = data.WithSequence(*data.Sequence + 1)
continue
}
}
}
complete = true
} else {
log.Warn().Msg("response is nil")
continue
}
}
if !complete {
err = errors.New("could not complete broadcast in 10 loops")
}
for i, process := range toProcess {
process.err = err
process.res = res
process.msgIndex = i
process.Done()
}
return cutoff, err
}