Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type RateLimitConfig struct {
}

func DefaultRateLimitConfig() RateLimitConfig {
return RateLimitConfig{PerTokenMs: 300, Burst: 20}
return RateLimitConfig{PerTokenMs: 400, Burst: 10}
}
Comment on lines 70 to 72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify rate limit changes align with batch size increases.

The rate limit has become more restrictive (PerTokenMs increased from 300 to 400, Burst decreased from 20 to 10), which reduces broadcast frequency. This aligns with the larger minimum batch size (45 messages) introduced in queue/queue.go. However, ensure this combination doesn't cause excessive queue buildup during high-traffic periods, as fewer broadcasts with larger batches could lead to delays.

Run the following script to check how the rate limiter is used and verify no other components depend on the previous rate limit values:


🏁 Script executed:

#!/bin/bash
# Description: Find all references to rate limit configuration and verify usage patterns

# Search for RateLimitConfig usage
echo "=== RateLimitConfig usage ==="
rg -n "RateLimitConfig" --type=go -C 3

# Search for rate limiter Allow() calls to understand frequency patterns
echo -e "\n=== Rate limiter Allow() calls ==="
rg -n "limiter\.Allow\(\)" --type=go -C 3

# Check for any hardcoded rate limit values that might conflict
echo -e "\n=== Hardcoded rate limit values ==="
rg -n "PerTokenMs|Burst" --type=go -C 2

Length of output: 4549



Wait, this needs codebase access. Let me correct:


</parameter>
</invoke>

---

🏁 Script executed:

```shell
#!/bin/bash
# Verify batch size changes and understand rate limiter interaction

# Search for batch size configuration
echo "=== Batch size configuration ==="
rg -n "batch|Batch" --type=go queue/queue.go -C 3

# Check BroadcastPending implementation to understand frequency
echo -e "\n=== BroadcastPending usage ==="
rg -n "BroadcastPending" --type=go -C 2

# Search for message count thresholds in queue.go
echo -e "\n=== Message count/thresholds ==="
rg -n "[0-9]{2,}" --type=go queue/queue.go | head -30

Length of output: 2332


🏁 Script executed:

#!/bin/bash
# Verify batch size changes and understand rate limiter interaction

# Search for batch size configuration
echo "=== Batch size configuration ==="
rg -n "batch|Batch" --type=go queue/queue.go -C 3

# Check BroadcastPending implementation to understand frequency
echo -e "\n=== BroadcastPending method ==="
rg -n "func.*BroadcastPending" --type=go -A 20

# Search for message accumulation/minimum size logic
echo -e "\n=== Message size/count checks ==="
rg -n "messages.*len|len.*messages|MinBatch" --type=go queue/queue.go -C 2

Length of output: 1861


Update outdated rate limit comment and verify configuration aligns with batch processing logic.

The rate limit change is verified: PerTokenMs increased from 300 to 400ms (reducing frequency from 3.33 to 2.5 tokens/sec), and Burst decreased from 20 to 10. This aligns with the minimum batch size of 45 messages enforced in BroadcastPending() (queue/queue.go:180), supporting fewer but larger broadcasts.

However, the comment at queue/queue.go:142 is now outdated. It states "Token-bucket rate limit: allow calling BroadcastPending at most 20 times per 6 seconds," but with the new configuration, the rate allows approximately 15 refilled tokens over 6 seconds (2.5 tokens/sec × 6), not 20. Update this comment to reflect the new rate limit.

🤖 Prompt for AI Agents
In queue/queue.go around line 142 (and note config/types.go lines 70-72 define
the current DefaultRateLimitConfig), update the outdated token-bucket comment to
reflect the new rate: PerTokenMs is 400ms (≈2.5 tokens/sec) and Burst 10, so
over 6 seconds you get about 15 tokens (not 20); change the comment text to
state the correct refill math and ensure it mentions that the config supports
fewer, larger broadcasts (minimum batch size 45) so the comment and config are
consistent.


type StrayManagerConfig struct {
Expand Down
11 changes: 10 additions & 1 deletion proofs/proofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ func (p *Prover) Start() {
continue
}

if p.q.Count() > p.lastCount { // don't run if the queue has more than the amount of files on disk
log.Warn().
Msg("Queue is full, skipping proof cycle")
continue
}
Comment on lines +314 to +318
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Logic bug: Guard triggers on first cycle with any queued messages.

Since p.lastCount defaults to 0 (not initialized in NewProver), the guard if p.q.Count() > p.lastCount will trigger on the first cycle if there are any messages in the queue, preventing any work from being done initially.

Additionally, the guard compares queue size against the number of files processed last cycle, not the previous queue size. This doesn't directly measure queue growth—if you processed 10 files last cycle but the queue now has 15 messages, it could mean the queue grew from 20 to 15 (actually shrinking).

Consider one of these approaches:

Option 1: Initialize lastCount to prevent first-cycle skip

In NewProver (around line 415), initialize lastCount:

 func NewProver(wallet *wallet.Wallet, q *queue.Queue, io FileSystem, interval uint64, threads int16, chunkSize int) *Prover {
 	p := Prover{
 		running:   false,
 		wallet:    wallet,
 		q:         q,
 		processed: time.Time{},
 		interval:  interval,
 		io:        io,
 		threads:   threads,
 		chunkSize: chunkSize,
+		lastCount: 1000, // or another reasonable threshold
 	}
 
 	return &p
 }

Option 2: Track actual queue size to measure growth

Replace lastCount with lastQueueSize and track queue size at the start of each cycle:

-	if p.q.Count() > p.lastCount { // don't run if the queue has more than the amount of files on disk
+	currentQueueSize := p.q.Count()
+	if p.lastQueueSize > 0 && currentQueueSize > p.lastQueueSize { // skip if queue is growing
 		log.Warn().
 			Msg("Queue is full, skipping proof cycle")
+		p.lastQueueSize = currentQueueSize
 		continue
 	}
+	p.lastQueueSize = currentQueueSize

Then update at the end (replace line 347):

-	p.lastCount = count
+	p.lastQueueSize = p.q.Count()

Committable suggestion skipped: line range outside the PR's diff.


log.Debug().Msg("Starting proof cycle...")

abciInfo, err := p.wallet.Client.RPCClient.ABCIInfo(context.Background())
Expand All @@ -319,7 +325,7 @@ func (p *Prover) Start() {
continue
}
height := abciInfo.Response.LastBlockHeight

var count int // reset last count here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Minor: Clarify comment.

The comment "reset last count here" is slightly misleading—this line declares a new local variable count and initializes it to 0, rather than resetting p.lastCount. The actual update to p.lastCount happens at line 347.

Consider clarifying:

-	var count int // reset last count here
+	var count int // track files processed this cycle
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var count int // reset last count here
var count int // track files processed this cycle
🤖 Prompt for AI Agents
In proofs/proofs.go around line 328, the comment "reset last count here" is
misleading because the line declares a new local variable `count` (initialized
to 0) rather than resetting `p.lastCount`; the actual update to `p.lastCount`
occurs later (around line 347). Change the comment to clearly state that this is
a local declaration/initialization of `count` (e.g., "declare/initialize local
count to 0") so readers won't assume `p.lastCount` is being modified here.

t := time.Now()

err = p.io.ProcessFiles(func(merkle []byte, owner string, start int64) {
Expand All @@ -331,12 +337,15 @@ func (p *Prover) Start() {
log.Debug().Msg(fmt.Sprintf("proving: %x", merkle))
filesProving.Inc()
p.Inc()
count++
go p.wrapPostProof(merkle, owner, start, height, t)
})
if err != nil {
log.Error().Err(err)
}

p.lastCount = count

p.processed = time.Now()
}
log.Info().Msg("Prover module stopped")
Expand Down
1 change: 1 addition & 0 deletions proofs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Prover struct {
threads int16
currentThreads int16
chunkSize int
lastCount int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify initialization and first-cycle behavior.

The lastCount field defaults to 0 and is not explicitly initialized in NewProver. This means on the first processing cycle in proofs/proofs.go, if there are any queued messages, the guard if p.q.Count() > p.lastCount will trigger and skip the cycle, potentially preventing any work from being done initially.

Run the following script to confirm lastCount is not initialized:


🏁 Script executed:

#!/bin/bash
# Check if lastCount is initialized in NewProver
ast-grep --pattern 'func NewProver($$$) *Prover {
  $$$
}'

Length of output: 619


I can confirm lastCount is not initialized. Now I need to verify the actual impact on first-cycle behavior by examining where lastCount is used.


🏁 Script executed:

#!/bin/bash
# Find all usages of lastCount to understand guard behavior and impact
rg -nP -C5 'lastCount' proofs/

Length of output: 1507


🏁 Script executed:

#!/bin/bash
# Check git history and surrounding comments for context on first-cycle behavior intention
git log -p --follow -S 'lastCount' -- proofs/ | head -100

Length of output: 59


🏁 Script executed:

#!/bin/bash
# Get more context around the guard logic with comments
sed -n '300,360p' proofs/proofs.go

Length of output: 1567


Address first-cycle processing skip caused by uninitialized lastCount.

The lastCount field defaults to 0 and is not initialized in NewProver. On the first processing cycle at line 314, the guard if p.q.Count() > p.lastCount will trigger if any messages are queued, logging "Queue is full, skipping proof cycle" and skipping the entire cycle. This prevents proof processing from starting until a second cycle occurs.

The intended guard logic (line 314 comment: "don't run if the queue has more than the amount of files on disk") is to detect backlogging, not to skip the first cycle. Initialize lastCount appropriately in NewProver or adjust the guard to account for the first cycle.

🤖 Prompt for AI Agents
In proofs/types.go around line 22 (field declaration) and the processing guard
at ~line 314, lastCount is left at the zero value so the first cycle can be
incorrectly skipped; initialize lastCount in NewProver to the current queue
count (e.g., p.lastCount = p.q.Count()) so the first cycle uses the real
baseline, or alternatively change the guard to ignore comparison when lastCount
is uninitialized (e.g., treat negative/unset as first-run). Ensure the fix is
applied in NewProver so lastCount reflects the actual queue length before the
first processing cycle.

}

type FileSystem interface {
Expand Down
14 changes: 11 additions & 3 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,13 @@ func (q *Queue) BroadcastPending() (int, error) {
cutoff = i + 1 // cutoff is now the count of messages that fit
}

// If nothing fits, process at least the first message
// If nothing fits, process at least the first 45 messages or the total number of messages if less than 45
if cutoff == 0 {
cutoff = 1
cutoff = 45
}

if cutoff > total {
cutoff = total
}
Comment on lines +178 to 185
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical logic flaw: Forcing 45 messages when size limit is exceeded.

When cutoff == 0 after the size-checking loop (lines 162-176), it indicates that even the first message exceeds maxSizeBytes. Setting cutoff = 45 in this scenario forces the inclusion of messages that are known to violate the size constraint, virtually guaranteeing the broadcast will fail.

The previous logic of setting cutoff = 1 was correct—it attempted a best-effort broadcast of the first message. If the intent is to process multiple messages when the queue is large but none fit, the logic should distinguish between:

  1. Size exceeded: Keep cutoff = 1 (best effort)
  2. Empty queue or other conditions: Use a larger minimum

Apply this diff to restore safe behavior:

-	// If nothing fits, process at least the first 45 messages or the total number of messages if less than 45
+	// If nothing fits, process at least the first message
 	if cutoff == 0 {
-		cutoff = 45
+		cutoff = 1
 	}
 
 	if cutoff > total {
 		cutoff = total
 	}

Alternatively, if larger batches are required for performance, consider removing or increasing maxSizeBytes rather than bypassing the size check.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// If nothing fits, process at least the first 45 messages or the total number of messages if less than 45
if cutoff == 0 {
cutoff = 1
cutoff = 45
}
if cutoff > total {
cutoff = total
}
// If nothing fits, process at least the first message
if cutoff == 0 {
cutoff = 1
}
if cutoff > total {
cutoff = total
}
🤖 Prompt for AI Agents
In queue/queue.go around lines 178 to 185, the code currently forces cutoff = 45
when cutoff == 0 which allows messages that exceed maxSizeBytes to be included;
change this to set cutoff = 1 so we attempt a best-effort broadcast of the first
(oversized) message instead of batching many violating messages. Preserve the
subsequent guard that limits cutoff to total; if you need larger batches for
performance, adjust maxSizeBytes elsewhere rather than bypassing the size check.


log.Info().Msg(fmt.Sprintf("Queue: Posting %d messages to chain...", cutoff))
Expand All @@ -200,7 +204,7 @@ func (q *Queue) BroadcastPending() (int, error) {
var i int
for !complete && i < 10 {
i++
res, err = q.wallet.BroadcastTxAsync(data)
res, err = q.wallet.BroadcastTxSync(data)
if err != nil {
if strings.Contains(err.Error(), "tx already exists in cache") {
if data.Sequence != nil {
Expand Down Expand Up @@ -246,3 +250,7 @@ func (q *Queue) BroadcastPending() (int, error) {

return cutoff, err
}

func (q *Queue) Count() int {
return len(q.messages)
}
Loading