-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add support for XREADGROUP CLAIM arg #3486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
19e2d4b to
afc4947
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request adds support for Redis XREADGROUP CLAIM functionality, allowing the claiming of idle pending messages from a stream's Pending Entries List (PEL) with a configurable minimum idle time. The implementation introduces delivery metadata tracking for claimed entries.
- Adds
XReadArgs.claim()methods to specify minimum idle time for claiming entries - Introduces
ClaimedStreamMessageclass to carry metadata (milliseconds since last delivery and redelivery count) - Updates
StreamReadOutputto parse optional metadata fields from Redis responses - Adds comprehensive unit and integration tests
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 16 comments.
Show a summary per file
| File | Description |
|---|---|
| src/main/java/io/lettuce/core/XReadArgs.java | Adds claim() builder and instance methods to configure CLAIM min-idle-time parameter |
| src/main/java/io/lettuce/core/StreamMessage.java | Adds isClaimed() method to distinguish claimed entries from fresh ones |
| src/main/java/io/lettuce/core/ClaimedStreamMessage.java | New class extending StreamMessage to hold claim metadata (redelivery count and time since last delivery) |
| src/main/java/io/lettuce/core/output/StreamReadOutput.java | Updates output parser to handle optional metadata fields for claimed entries in both integer and bulk string formats |
| src/test/java/io/lettuce/core/output/StreamReadOutputUnitTests.java | Adds unit tests for parsing claimed entries with metadata in various formats |
| src/test/java/io/lettuce/core/XReadGroupClaimIntegrationTests.java | New integration test suite validating CLAIM behavior with real Redis server |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (redeliveryCount == null && isDigits(s)) { | ||
| redeliveryCount = Long.parseLong(s); | ||
| return; | ||
| } |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential logic issue: If a non-numeric bulk string is received when extras are expected (id != null && bodyReceived && key == null), the code will skip the extras handling (lines 64-71) because isDigits(s) returns false, but then fall through to line 79 where it would incorrectly interpret this data as a body key. This could cause incorrect parsing.
Consider restructuring the logic to handle unexpected data more explicitly, or add an else clause to log/handle non-numeric extras:
if (id != null && bodyReceived && key == null && bytes != null) {
String s = decodeString(bytes.duplicate());
if (msSinceLastDelivery == null && isDigits(s)) {
msSinceLastDelivery = Long.parseLong(s);
return;
}
if (redeliveryCount == null && isDigits(s)) {
redeliveryCount = Long.parseLong(s);
return;
}
// If we reach here with non-numeric data in the extras position,
// it should be handled or logged, not allowed to fall through
}| } | |
| } | |
| // If we reach here, non-numeric data was received in the extras position. | |
| // Log or handle the unexpected data. For now, we ignore and do not fall through. | |
| // System.err.println("Unexpected non-numeric extras data: " + s); | |
| return; |
| sync.xadd(key, body); | ||
| sync.xadd(key, body); | ||
|
|
||
| // First read with CLAIM(minIdle=100ms); entries should be fresh (not claimed) |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is inconsistent with the code. The comment states "minIdle=100ms" but the code uses Duration.ofMillis(50). Please update the comment to match the actual value:
// First read with CLAIM(minIdle=50ms); entries should be fresh (not claimed)| // First read with CLAIM(minIdle=100ms); entries should be fresh (not claimed) | |
| // First read with CLAIM(minIdle=50ms); entries should be fresh (not claimed) |
|
|
||
| /** | ||
| * Claim idle pending messages first with a minimum idle time (milliseconds). Only valid for XREADGROUP. | ||
| * |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing JavaDoc parameters and return value documentation. The method should document:
@param millisecondsthe minimum idle time in milliseconds@return{@code this} XReadArgs instance for method chaining
| * | |
| * | |
| * @param milliseconds the minimum idle time in milliseconds | |
| * @return {@code this} XReadArgs instance for method chaining |
|
|
||
| /** | ||
| * Claim idle pending messages first with a minimum idle time. Only valid for XREADGROUP. | ||
| * |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing JavaDoc parameters and return value documentation. The method should document:
@param timeoutthe minimum idle time as a Duration@return{@code this} XReadArgs instance for method chaining
| * | |
| * | |
| * @param timeout the minimum idle time as a Duration | |
| * @return {@code this} XReadArgs instance for method chaining |
| } | ||
|
|
||
| /** | ||
| * Create a new {@link XReadArgs} and set CLAIM min-idle-time (milliseconds). Only valid for XREADGROUP. |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing JavaDoc parameters and return value documentation. The method should document:
@param millisecondsthe minimum idle time in milliseconds@returnnew {@link XReadArgs} with CLAIM set- Consider adding
@see XReadArgs#claim(long)for consistency with other Builder methods
| * Create a new {@link XReadArgs} and set CLAIM min-idle-time (milliseconds). Only valid for XREADGROUP. | |
| * Create a new {@link XReadArgs} and set CLAIM min-idle-time (milliseconds). Only valid for XREADGROUP. | |
| * | |
| * @param milliseconds the minimum idle time in milliseconds | |
| * @return new {@link XReadArgs} with CLAIM set | |
| * @see XReadArgs#claim(long) |
| public long getMsSinceLastDelivery() { | ||
| return msSinceLastDelivery; | ||
| } | ||
|
|
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing JavaDoc for the public method. Should include:
- A description of what the method returns
@returnDuration since the last delivery of this message
| /** | |
| * Returns the duration since the last delivery of this message. | |
| * | |
| * @return Duration since the last delivery of this message. | |
| */ |
| msSinceLastDelivery = Long.parseLong(s); | ||
| return; | ||
| } | ||
| if (redeliveryCount == null && isDigits(s)) { | ||
| redeliveryCount = Long.parseLong(s); |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential uncaught 'java.lang.NumberFormatException'.
| msSinceLastDelivery = Long.parseLong(s); | |
| return; | |
| } | |
| if (redeliveryCount == null && isDigits(s)) { | |
| redeliveryCount = Long.parseLong(s); | |
| try { | |
| msSinceLastDelivery = Long.parseLong(s); | |
| } catch (NumberFormatException e) { | |
| msSinceLastDelivery = null; // or handle as appropriate | |
| } | |
| return; | |
| } | |
| if (redeliveryCount == null && isDigits(s)) { | |
| try { | |
| redeliveryCount = Long.parseLong(s); | |
| } catch (NumberFormatException e) { | |
| redeliveryCount = null; // or handle as appropriate | |
| } |
| msSinceLastDelivery = Long.parseLong(s); | ||
| return; | ||
| } | ||
| if (redeliveryCount == null && isDigits(s)) { | ||
| redeliveryCount = Long.parseLong(s); |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential uncaught 'java.lang.NumberFormatException'.
| msSinceLastDelivery = Long.parseLong(s); | |
| return; | |
| } | |
| if (redeliveryCount == null && isDigits(s)) { | |
| redeliveryCount = Long.parseLong(s); | |
| try { | |
| msSinceLastDelivery = Long.parseLong(s); | |
| } catch (NumberFormatException e) { | |
| // Optionally log or handle the error here | |
| } | |
| return; | |
| } | |
| if (redeliveryCount == null && isDigits(s)) { | |
| try { | |
| redeliveryCount = Long.parseLong(s); | |
| } catch (NumberFormatException e) { | |
| // Optionally log or handle the error here | |
| } |
| } | ||
| } | ||
| super.set(integer); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly unused code. Seems server sends string instead the specified integer.
see #3482