Conversation
WalkthroughUpdated default rate-limit values; increased minimum broadcast batch cutoff and clamped it to total messages; switched broadcast from asynchronous to synchronous in retry loop; added Queue.Count(); added Prover.lastCount and a queue-fullness guard to skip cycles when queue grows. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Prover
participant Queue
participant Broadcaster
Note over Prover,Queue: Prover Start cycle
Prover->>Queue: Count()
alt queue grew (Count > lastCount)
Prover-->>Prover: log warning and skip cycle
else queue not grown
Prover->>Queue: pop/process up to cutoff (min 45, clamped to total)
loop per-batch retry
Queue->>Broadcaster: BroadcastTxSync(batch) %% synchronous call
Broadcaster-->>Queue: response / error
alt error & retry
Queue-->>Queue: retry loop
end
end
Prover-->>Prover: update lastCount
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
config/types.go(1 hunks)queue/queue.go(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Build
- GitHub Check: lint
🔇 Additional comments (1)
queue/queue.go (1)
207-207: Good change to synchronous broadcast, but verify performance impact.Switching from
BroadcastTxAsynctoBroadcastTxSyncimproves reliability by confirming the transaction reaches the mempool before proceeding. This is especially valuable within the retry loop, as it provides immediate feedback on mempool acceptance.However, this change makes each retry iteration blocking, which could slow down queue processing during high-load periods or network latency. Combined with the more restrictive rate limiting (400ms per token, burst of 10) from
config/types.go, ensure this doesn't create excessive backlog.Consider monitoring queue metrics after deployment to verify the synchronous broadcast doesn't cause unacceptable delays.
| func DefaultRateLimitConfig() RateLimitConfig { | ||
| return RateLimitConfig{PerTokenMs: 300, Burst: 20} | ||
| return RateLimitConfig{PerTokenMs: 400, Burst: 10} | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
Verify rate limit changes align with batch size increases.
The rate limit has become more restrictive (PerTokenMs increased from 300 to 400, Burst decreased from 20 to 10), which reduces broadcast frequency. This aligns with the larger minimum batch size (45 messages) introduced in queue/queue.go. However, ensure this combination doesn't cause excessive queue buildup during high-traffic periods, as fewer broadcasts with larger batches could lead to delays.
Run the following script to check how the rate limiter is used and verify no other components depend on the previous rate limit values:
🏁 Script executed:
#!/bin/bash
# Description: Find all references to rate limit configuration and verify usage patterns
# Search for RateLimitConfig usage
echo "=== RateLimitConfig usage ==="
rg -n "RateLimitConfig" --type=go -C 3
# Search for rate limiter Allow() calls to understand frequency patterns
echo -e "\n=== Rate limiter Allow() calls ==="
rg -n "limiter\.Allow\(\)" --type=go -C 3
# Check for any hardcoded rate limit values that might conflict
echo -e "\n=== Hardcoded rate limit values ==="
rg -n "PerTokenMs|Burst" --type=go -C 2Length of output: 4549
Wait, this needs codebase access. Let me correct:
</parameter>
</invoke>
---
🏁 Script executed:
```shell
#!/bin/bash
# Verify batch size changes and understand rate limiter interaction
# Search for batch size configuration
echo "=== Batch size configuration ==="
rg -n "batch|Batch" --type=go queue/queue.go -C 3
# Check BroadcastPending implementation to understand frequency
echo -e "\n=== BroadcastPending usage ==="
rg -n "BroadcastPending" --type=go -C 2
# Search for message count thresholds in queue.go
echo -e "\n=== Message count/thresholds ==="
rg -n "[0-9]{2,}" --type=go queue/queue.go | head -30
Length of output: 2332
🏁 Script executed:
#!/bin/bash
# Verify batch size changes and understand rate limiter interaction
# Search for batch size configuration
echo "=== Batch size configuration ==="
rg -n "batch|Batch" --type=go queue/queue.go -C 3
# Check BroadcastPending implementation to understand frequency
echo -e "\n=== BroadcastPending method ==="
rg -n "func.*BroadcastPending" --type=go -A 20
# Search for message accumulation/minimum size logic
echo -e "\n=== Message size/count checks ==="
rg -n "messages.*len|len.*messages|MinBatch" --type=go queue/queue.go -C 2Length of output: 1861
Update outdated rate limit comment and verify configuration aligns with batch processing logic.
The rate limit change is verified: PerTokenMs increased from 300 to 400ms (reducing frequency from 3.33 to 2.5 tokens/sec), and Burst decreased from 20 to 10. This aligns with the minimum batch size of 45 messages enforced in BroadcastPending() (queue/queue.go:180), supporting fewer but larger broadcasts.
However, the comment at queue/queue.go:142 is now outdated. It states "Token-bucket rate limit: allow calling BroadcastPending at most 20 times per 6 seconds," but with the new configuration, the rate allows approximately 15 refilled tokens over 6 seconds (2.5 tokens/sec × 6), not 20. Update this comment to reflect the new rate limit.
🤖 Prompt for AI Agents
In queue/queue.go around line 142 (and note config/types.go lines 70-72 define
the current DefaultRateLimitConfig), update the outdated token-bucket comment to
reflect the new rate: PerTokenMs is 400ms (≈2.5 tokens/sec) and Burst 10, so
over 6 seconds you get about 15 tokens (not 20); change the comment text to
state the correct refill math and ensure it mentions that the config supports
fewer, larger broadcasts (minimum batch size 45) so the comment and config are
consistent.
| // If nothing fits, process at least the first 45 messages or the total number of messages if less than 45 | ||
| if cutoff == 0 { | ||
| cutoff = 1 | ||
| cutoff = 45 | ||
| } | ||
|
|
||
| if cutoff > total { | ||
| cutoff = total | ||
| } |
There was a problem hiding this comment.
Critical logic flaw: Forcing 45 messages when size limit is exceeded.
When cutoff == 0 after the size-checking loop (lines 162-176), it indicates that even the first message exceeds maxSizeBytes. Setting cutoff = 45 in this scenario forces the inclusion of messages that are known to violate the size constraint, virtually guaranteeing the broadcast will fail.
The previous logic of setting cutoff = 1 was correct—it attempted a best-effort broadcast of the first message. If the intent is to process multiple messages when the queue is large but none fit, the logic should distinguish between:
- Size exceeded: Keep
cutoff = 1(best effort) - Empty queue or other conditions: Use a larger minimum
Apply this diff to restore safe behavior:
- // If nothing fits, process at least the first 45 messages or the total number of messages if less than 45
+ // If nothing fits, process at least the first message
if cutoff == 0 {
- cutoff = 45
+ cutoff = 1
}
if cutoff > total {
cutoff = total
}Alternatively, if larger batches are required for performance, consider removing or increasing maxSizeBytes rather than bypassing the size check.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // If nothing fits, process at least the first 45 messages or the total number of messages if less than 45 | |
| if cutoff == 0 { | |
| cutoff = 1 | |
| cutoff = 45 | |
| } | |
| if cutoff > total { | |
| cutoff = total | |
| } | |
| // If nothing fits, process at least the first message | |
| if cutoff == 0 { | |
| cutoff = 1 | |
| } | |
| if cutoff > total { | |
| cutoff = total | |
| } |
🤖 Prompt for AI Agents
In queue/queue.go around lines 178 to 185, the code currently forces cutoff = 45
when cutoff == 0 which allows messages that exceed maxSizeBytes to be included;
change this to set cutoff = 1 so we attempt a best-effort broadcast of the first
(oversized) message instead of batching many violating messages. Preserve the
subsequent guard that limits cutoff to total; if you need larger batches for
performance, adjust maxSizeBytes elsewhere rather than bypassing the size check.
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
queue/queue.go (1)
178-185: Critical: Forcing 45 messages when size limit exceeded.When
cutoff == 0after the size-checking loop (lines 162-176), it means even the first message exceedsmaxSizeBytes. Settingcutoff = 45forces the broadcast of up to 45 messages that are known to violate the size constraint, virtually guaranteeing broadcast failure.The previous logic of
cutoff = 1was correct—it performed a best-effort broadcast of the first oversized message. The current implementation bypasses the size check entirely when batching.Apply this diff to restore safe behavior:
- // If nothing fits, process at least the first 45 messages or the total number of messages if less than 45 + // If nothing fits, process at least the first message if cutoff == 0 { - cutoff = 45 + cutoff = 1 } if cutoff > total { cutoff = total }If larger batches are needed for throughput, consider increasing
maxSizeBytesrather than bypassing the size validation.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
proofs/proofs.go(3 hunks)proofs/types.go(1 hunks)queue/queue.go(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
queue/queue.go (1)
queue/types.go (1)
Queue(12-22)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: lint
🔇 Additional comments (2)
queue/queue.go (2)
207-207: Verify throughput impact of synchronous broadcast.Switching from
BroadcastTxAsynctoBroadcastTxSyncmakes the retry loop wait for mempool admission before continuing. While this enables better error detection and retry logic, it may reduce throughput compared to fire-and-forget async broadcasts.Ensure this change aligns with your performance requirements and that the added latency is acceptable.
254-256: LGTM!The
Count()method provides clean read-only access to the queue size, enabling the queue fullness guard implemented inproofs/proofs.go.
| if p.q.Count() > p.lastCount { // don't run if the queue has more than the amount of files on disk | ||
| log.Warn(). | ||
| Msg("Queue is full, skipping proof cycle") | ||
| continue | ||
| } |
There was a problem hiding this comment.
Logic bug: Guard triggers on first cycle with any queued messages.
Since p.lastCount defaults to 0 (not initialized in NewProver), the guard if p.q.Count() > p.lastCount will trigger on the first cycle if there are any messages in the queue, preventing any work from being done initially.
Additionally, the guard compares queue size against the number of files processed last cycle, not the previous queue size. This doesn't directly measure queue growth—if you processed 10 files last cycle but the queue now has 15 messages, it could mean the queue grew from 20 to 15 (actually shrinking).
Consider one of these approaches:
Option 1: Initialize lastCount to prevent first-cycle skip
In NewProver (around line 415), initialize lastCount:
func NewProver(wallet *wallet.Wallet, q *queue.Queue, io FileSystem, interval uint64, threads int16, chunkSize int) *Prover {
p := Prover{
running: false,
wallet: wallet,
q: q,
processed: time.Time{},
interval: interval,
io: io,
threads: threads,
chunkSize: chunkSize,
+ lastCount: 1000, // or another reasonable threshold
}
return &p
}Option 2: Track actual queue size to measure growth
Replace lastCount with lastQueueSize and track queue size at the start of each cycle:
- if p.q.Count() > p.lastCount { // don't run if the queue has more than the amount of files on disk
+ currentQueueSize := p.q.Count()
+ if p.lastQueueSize > 0 && currentQueueSize > p.lastQueueSize { // skip if queue is growing
log.Warn().
Msg("Queue is full, skipping proof cycle")
+ p.lastQueueSize = currentQueueSize
continue
}
+ p.lastQueueSize = currentQueueSizeThen update at the end (replace line 347):
- p.lastCount = count
+ p.lastQueueSize = p.q.Count()Committable suggestion skipped: line range outside the PR's diff.
| } | ||
| height := abciInfo.Response.LastBlockHeight | ||
|
|
||
| var count int // reset last count here |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Minor: Clarify comment.
The comment "reset last count here" is slightly misleading—this line declares a new local variable count and initializes it to 0, rather than resetting p.lastCount. The actual update to p.lastCount happens at line 347.
Consider clarifying:
- var count int // reset last count here
+ var count int // track files processed this cycle📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var count int // reset last count here | |
| var count int // track files processed this cycle |
🤖 Prompt for AI Agents
In proofs/proofs.go around line 328, the comment "reset last count here" is
misleading because the line declares a new local variable `count` (initialized
to 0) rather than resetting `p.lastCount`; the actual update to `p.lastCount`
occurs later (around line 347). Change the comment to clearly state that this is
a local declaration/initialization of `count` (e.g., "declare/initialize local
count to 0") so readers won't assume `p.lastCount` is being modified here.
| threads int16 | ||
| currentThreads int16 | ||
| chunkSize int | ||
| lastCount int |
There was a problem hiding this comment.
🧩 Analysis chain
Verify initialization and first-cycle behavior.
The lastCount field defaults to 0 and is not explicitly initialized in NewProver. This means on the first processing cycle in proofs/proofs.go, if there are any queued messages, the guard if p.q.Count() > p.lastCount will trigger and skip the cycle, potentially preventing any work from being done initially.
Run the following script to confirm lastCount is not initialized:
🏁 Script executed:
#!/bin/bash
# Check if lastCount is initialized in NewProver
ast-grep --pattern 'func NewProver($$$) *Prover {
$$$
}'Length of output: 619
I can confirm lastCount is not initialized. Now I need to verify the actual impact on first-cycle behavior by examining where lastCount is used.
🏁 Script executed:
#!/bin/bash
# Find all usages of lastCount to understand guard behavior and impact
rg -nP -C5 'lastCount' proofs/Length of output: 1507
🏁 Script executed:
#!/bin/bash
# Check git history and surrounding comments for context on first-cycle behavior intention
git log -p --follow -S 'lastCount' -- proofs/ | head -100Length of output: 59
🏁 Script executed:
#!/bin/bash
# Get more context around the guard logic with comments
sed -n '300,360p' proofs/proofs.goLength of output: 1567
Address first-cycle processing skip caused by uninitialized lastCount.
The lastCount field defaults to 0 and is not initialized in NewProver. On the first processing cycle at line 314, the guard if p.q.Count() > p.lastCount will trigger if any messages are queued, logging "Queue is full, skipping proof cycle" and skipping the entire cycle. This prevents proof processing from starting until a second cycle occurs.
The intended guard logic (line 314 comment: "don't run if the queue has more than the amount of files on disk") is to detect backlogging, not to skip the first cycle. Initialize lastCount appropriately in NewProver or adjust the guard to account for the first cycle.
🤖 Prompt for AI Agents
In proofs/types.go around line 22 (field declaration) and the processing guard
at ~line 314, lastCount is left at the zero value so the first cycle can be
incorrectly skipped; initialize lastCount in NewProver to the current queue
count (e.g., p.lastCount = p.q.Count()) so the first cycle uses the real
baseline, or alternatively change the guard to ignore comparison when lastCount
is uninitialized (e.g., treat negative/unset as first-run). Ensure the fix is
applied in NewProver so lastCount reflects the actual queue length before the
first processing cycle.
Summary by CodeRabbit
Bug Fixes
New Features