diff --git a/config/types.go b/config/types.go index 534bd5b..562f804 100644 --- a/config/types.go +++ b/config/types.go @@ -68,7 +68,7 @@ type RateLimitConfig struct { } func DefaultRateLimitConfig() RateLimitConfig { - return RateLimitConfig{PerTokenMs: 400, Burst: 10} + return RateLimitConfig{PerTokenMs: 100, Burst: 30} } type StrayManagerConfig struct { diff --git a/queue/queue.go b/queue/queue.go index 1b09433..e51973c 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -5,6 +5,8 @@ import ( "context" "errors" "fmt" + "regexp" + "strconv" "strings" "sync" "time" @@ -21,6 +23,20 @@ import ( // Rate limiter defaults are provided by config.DefaultRateLimitPerTokenMs and config.DefaultRateLimitBurst +// extractExpectedSequence extracts the expected sequence number from an account sequence mismatch error message. +// It looks for the pattern "expected " in the error message, allowing for optional whitespace. +// Returns the expected sequence number and true if found, or 0 and false if not found. +func extractExpectedSequence(errorMsg string) (uint64, bool) { + re := regexp.MustCompile(`expected\s+(\d+)`) + matches := re.FindStringSubmatch(errorMsg) + if len(matches) > 1 { + if expectedSeq, parseErr := strconv.ParseUint(matches[1], 10, 64); parseErr == nil { + return expectedSeq, true + } + } + return 0, false +} + func calculateTransactionSize(messages []types.Msg) (int64, error) { if len(messages) == 0 { return 0, nil @@ -127,8 +143,8 @@ func (q *Queue) Listen() { log.Info().Msg("Queue module started") for q.running { - time.Sleep(time.Millisecond * 100) // pauses for one third of a second - if !q.processed.Add(time.Second * time.Duration(q.interval)).Before(time.Now()) { // minimum wait for 2 seconds + time.Sleep(time.Millisecond * 100) + if !q.processed.Add(time.Second * time.Duration(q.interval)).Before(time.Now()) { continue } @@ -145,13 +161,6 @@ func (q *Queue) Listen() { continue } - // bunch into 25 message chunks if possible - if total < 25 { // if total is less than 25 messages, and it's been less than 10 minutes passed, skip - if q.processed.Add(time.Minute * 10).After(time.Now()) { - continue - } - } - _, _ = q.BroadcastPending() q.processed = time.Now() } @@ -222,7 +231,7 @@ func (q *Queue) BroadcastPending() (int, error) { var i int for !complete && i < 10 { i++ - res, err = q.wallet.BroadcastTxCommit(data) + res, err = q.wallet.BroadcastTxSync(data) if err != nil { if strings.Contains(err.Error(), "tx already exists in cache") { log.Info().Msg("TX already exists in mempool, we're going to skip it.") @@ -234,6 +243,17 @@ func (q *Queue) BroadcastPending() (int, error) { q.messages = make([]*Message, 0) return 0, nil } + if strings.Contains(err.Error(), "account sequence mismatch") { + if expectedSeq, found := extractExpectedSequence(err.Error()); found { + data = data.WithSequence(expectedSeq) + continue + } + // Fallback to incrementing if extraction fails + if data.Sequence != nil { + data = data.WithSequence(*data.Sequence + 1) + continue + } + } log.Warn().Err(err).Msg("tx broadcast failed from queue") continue } @@ -241,6 +261,11 @@ func (q *Queue) BroadcastPending() (int, error) { if res != nil { if res.Code != 0 { if strings.Contains(res.RawLog, "account sequence mismatch") { + if expectedSeq, found := extractExpectedSequence(res.RawLog); found { + data = data.WithSequence(expectedSeq) + continue + } + // Fallback to incrementing if extraction fails if data.Sequence != nil { data = data.WithSequence(*data.Sequence + 1) continue diff --git a/queue/queue_test.go b/queue/queue_test.go new file mode 100644 index 0000000..0d35d37 --- /dev/null +++ b/queue/queue_test.go @@ -0,0 +1,131 @@ +package queue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExtractExpectedSequence(t *testing.T) { + tests := []struct { + name string + errorMsg string + expectedSeq uint64 + expectedFound bool + }{ + { + name: "valid error message with expected sequence", + errorMsg: "error while simulating tx: rpc error: code = Unknown desc = account sequence mismatch, expected 1471614, got 1471613: incorrect account sequence", + expectedSeq: 1471614, + expectedFound: true, + }, + { + name: "error message from log example", + errorMsg: "account sequence mismatch, expected 1471614, got 1471613: incorrect account sequence [cosmos/cosmos-sdk@v0.45.17/x/auth/ante/sigverify.go:264] With gas wanted: '0' and gas used: '5043313'", + expectedSeq: 1471614, + expectedFound: true, + }, + { + name: "error message with different sequence numbers", + errorMsg: "account sequence mismatch, expected 999999, got 999998", + expectedSeq: 999999, + expectedFound: true, + }, + { + name: "error message with zero sequence", + errorMsg: "account sequence mismatch, expected 0, got 1", + expectedSeq: 0, + expectedFound: true, + }, + { + name: "error message with large sequence number", + errorMsg: "account sequence mismatch, expected 18446744073709551615, got 18446744073709551614", + expectedSeq: 18446744073709551615, + expectedFound: true, + }, + { + name: "error message without expected keyword", + errorMsg: "account sequence mismatch, got 1471613: incorrect account sequence", + expectedSeq: 0, + expectedFound: false, + }, + { + name: "error message with account sequence mismatch but no numbers", + errorMsg: "account sequence mismatch: incorrect account sequence", + expectedSeq: 0, + expectedFound: false, + }, + { + name: "empty error message", + errorMsg: "", + expectedSeq: 0, + expectedFound: false, + }, + { + name: "error message with unrelated expected keyword", + errorMsg: "something went wrong, expected something else", + expectedSeq: 0, + expectedFound: false, + }, + { + name: "error message with expected but no number", + errorMsg: "account sequence mismatch, expected abc, got 1471613", + expectedSeq: 0, + expectedFound: false, + }, + { + name: "error message with multiple expected keywords", + errorMsg: "account sequence mismatch, expected 1471614, got 1471613, but expected something else", + expectedSeq: 1471614, + expectedFound: true, + }, + { + name: "error message with sequence in different format (commas)", + errorMsg: "account sequence mismatch, expected 1,471,614, got 1,471,613", + expectedSeq: 1, // Regex will match the first digit sequence "1" + expectedFound: true, + }, + { + name: "error message with negative number (should not match)", + errorMsg: "account sequence mismatch, expected -1471614, got 1471613", + expectedSeq: 0, + expectedFound: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + seq, found := extractExpectedSequence(tt.errorMsg) + require.Equal(t, tt.expectedFound, found, "found flag should match") + if tt.expectedFound { + assert.Equal(t, tt.expectedSeq, seq, "sequence number should match") + } else { + assert.Equal(t, uint64(0), seq, "sequence should be 0 when not found") + } + }) + } +} + +func TestExtractExpectedSequence_EdgeCases(t *testing.T) { + t.Run("very long error message", func(t *testing.T) { + longMsg := "error while simulating tx: rpc error: code = Unknown desc = account sequence mismatch, expected 1234567890, got 1234567889: incorrect account sequence [cosmos/cosmos-sdk@v0.45.17/x/auth/ante/sigverify.go:264] With gas wanted: '0' and gas used: '5043313' and some other very long text that goes on and on" + seq, found := extractExpectedSequence(longMsg) + require.True(t, found) + assert.Equal(t, uint64(1234567890), seq) + }) + + t.Run("error message with whitespace", func(t *testing.T) { + msg := "account sequence mismatch, expected 1471614 , got 1471613" + seq, found := extractExpectedSequence(msg) + require.True(t, found) + assert.Equal(t, uint64(1471614), seq) + }) + + t.Run("error message with newlines", func(t *testing.T) { + msg := "account sequence mismatch,\nexpected 1471614,\ngot 1471613" + seq, found := extractExpectedSequence(msg) + require.True(t, found) + assert.Equal(t, uint64(1471614), seq) + }) +} diff --git a/strays/manager.go b/strays/manager.go index c896834..f42d986 100644 --- a/strays/manager.go +++ b/strays/manager.go @@ -20,7 +20,10 @@ import ( // refreshIntervalBufferSeconds adds a small buffer (in seconds) to the configured // refresh interval to account for scheduling jitter, network latency, and block // timing variance so we don't hammer the endpoint exactly on the boundary. -const refreshIntervalBufferSeconds int64 = 15 +const ( + refreshIntervalBufferSeconds int64 = 15 + maxReturn uint64 = 500 +) // NewStrayManager creates and initializes a new StrayManager with the specified number of hands, authorizing each hand to transact on behalf of the provided wallet if not already authorized. func NewStrayManager(w *wallet.Wallet, q *queue.Queue, interval int64, refreshInterval int64, handCount int, authList []string) *StrayManager { @@ -157,18 +160,16 @@ func (s *StrayManager) Stop() { func (s *StrayManager) RefreshList() error { log.Debug().Msg("Refreshing stray list...") - s.strays = make([]*types.UnifiedFile, 0) - var val uint64 reverse := false - if s.lastSize > 300 { - val = uint64(s.rand.Int63n(s.lastSize)) + if s.lastSize > maxReturn { + val = uint64(s.rand.Int63n(int64(s.lastSize))) reverse = s.rand.Intn(2) == 0 } page := &query.PageRequest{ // more randomly pick from the stray pile Offset: val, - Limit: 300, + Limit: maxReturn, Reverse: reverse, CountTotal: true, } @@ -186,14 +187,15 @@ func (s *StrayManager) RefreshList() error { } strayCount := len(res.Files) - s.lastSize = int64(res.Pagination.Total) + s.lastSize = res.Pagination.Total if strayCount > 0 { log.Info().Msgf("Got updated list of strays of size %d", strayCount) - - for _, stray := range res.Files { - newStray := stray - s.strays = append(s.strays, &newStray) + newStrays := make([]*types.UnifiedFile, strayCount) + for i := 0; i < strayCount; i++ { + stray := res.Files[i] + newStrays[i] = &stray } + s.strays = newStrays } return nil diff --git a/strays/types.go b/strays/types.go index 8993efd..4f88a38 100644 --- a/strays/types.go +++ b/strays/types.go @@ -18,7 +18,7 @@ type Hand struct { type StrayManager struct { strays []*types.UnifiedFile wallet *wallet.Wallet - lastSize int64 + lastSize uint64 rand *rand.Rand interval time.Duration refreshInterval time.Duration