diff --git a/config/types.go b/config/types.go index b092351..534bd5b 100644 --- a/config/types.go +++ b/config/types.go @@ -68,7 +68,7 @@ type RateLimitConfig struct { } func DefaultRateLimitConfig() RateLimitConfig { - return RateLimitConfig{PerTokenMs: 300, Burst: 20} + return RateLimitConfig{PerTokenMs: 400, Burst: 10} } type StrayManagerConfig struct { diff --git a/proofs/proofs.go b/proofs/proofs.go index b423f75..cc954f2 100644 --- a/proofs/proofs.go +++ b/proofs/proofs.go @@ -311,6 +311,12 @@ func (p *Prover) Start() { continue } + if p.q.Count() > p.lastCount { // don't run if the queue has more than the amount of files on disk + log.Warn(). + Msg("Queue is full, skipping proof cycle") + continue + } + log.Debug().Msg("Starting proof cycle...") abciInfo, err := p.wallet.Client.RPCClient.ABCIInfo(context.Background()) @@ -319,7 +325,7 @@ func (p *Prover) Start() { continue } height := abciInfo.Response.LastBlockHeight - + var count int // reset last count here t := time.Now() err = p.io.ProcessFiles(func(merkle []byte, owner string, start int64) { @@ -331,12 +337,15 @@ func (p *Prover) Start() { log.Debug().Msg(fmt.Sprintf("proving: %x", merkle)) filesProving.Inc() p.Inc() + count++ go p.wrapPostProof(merkle, owner, start, height, t) }) if err != nil { log.Error().Err(err) } + p.lastCount = count + p.processed = time.Now() } log.Info().Msg("Prover module stopped") diff --git a/proofs/types.go b/proofs/types.go index 072e8b3..4099ef3 100644 --- a/proofs/types.go +++ b/proofs/types.go @@ -19,6 +19,7 @@ type Prover struct { threads int16 currentThreads int16 chunkSize int + lastCount int } type FileSystem interface { diff --git a/queue/queue.go b/queue/queue.go index 2ba0958..12eff9d 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -175,9 +175,13 @@ func (q *Queue) BroadcastPending() (int, error) { cutoff = i + 1 // cutoff is now the count of messages that fit } - // If nothing fits, process at least the first message + // If nothing fits, process at least the first 45 messages or the total number of messages if less than 45 if cutoff == 0 { - cutoff = 1 + cutoff = 45 + } + + if cutoff > total { + cutoff = total } log.Info().Msg(fmt.Sprintf("Queue: Posting %d messages to chain...", cutoff)) @@ -200,7 +204,7 @@ func (q *Queue) BroadcastPending() (int, error) { var i int for !complete && i < 10 { i++ - res, err = q.wallet.BroadcastTxAsync(data) + res, err = q.wallet.BroadcastTxSync(data) if err != nil { if strings.Contains(err.Error(), "tx already exists in cache") { if data.Sequence != nil { @@ -246,3 +250,7 @@ func (q *Queue) BroadcastPending() (int, error) { return cutoff, err } + +func (q *Queue) Count() int { + return len(q.messages) +}