Skip to content

Commit 19e2d4b

Browse files
committed
Add support for XREADGROUP CLAIM arg
1 parent 5dafb95 commit 19e2d4b

File tree

6 files changed

+537
-3
lines changed

6 files changed

+537
-3
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.lettuce.core;
2+
3+
import java.time.Duration;
4+
import java.util.Map;
5+
6+
/**
7+
* Stream message returned by XREADGROUP when entries were claimed from the PEL using CLAIM min-idle-time. Contains additional
8+
* metadata: milliseconds since last delivery and redelivery count.
9+
*/
10+
public class ClaimedStreamMessage<K, V> extends StreamMessage<K, V> {
11+
12+
private final long msSinceLastDelivery;
13+
14+
private final long redeliveryCount;
15+
16+
public ClaimedStreamMessage(K stream, String id, Map<K, V> body, long msSinceLastDelivery, long redeliveryCount) {
17+
super(stream, id, body);
18+
this.msSinceLastDelivery = msSinceLastDelivery;
19+
this.redeliveryCount = redeliveryCount;
20+
}
21+
22+
public long getMsSinceLastDelivery() {
23+
return msSinceLastDelivery;
24+
}
25+
26+
public Duration getSinceLastDelivery() {
27+
return Duration.ofMillis(msSinceLastDelivery);
28+
}
29+
30+
public long getRedeliveryCount() {
31+
return redeliveryCount;
32+
}
33+
34+
@Override
35+
public boolean isClaimed() {
36+
// "Really claimed" implies it was previously delivered at least once.
37+
return redeliveryCount >= 1;
38+
}
39+
40+
}

src/main/java/io/lettuce/core/StreamMessage.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,16 @@ public Map<K, V> getBody() {
4646
return body;
4747
}
4848

49+
/**
50+
* Whether this message was reclaimed from the pending entries list (PEL) using XREADGROUP … CLAIM. Default: false.
51+
*
52+
* Note: When CLAIM is used, servers may attach delivery metadata to all entries in the reply (including fresh ones). Use
53+
* this indicator to distinguish actually reclaimed entries (true) from normal entries (false).
54+
*/
55+
public boolean isClaimed() {
56+
return false;
57+
}
58+
4959
@Override
5060
public boolean equals(Object o) {
5161
if (this == o)

src/main/java/io/lettuce/core/XReadArgs.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ public class XReadArgs implements CompositeArgument {
2323

2424
private boolean noack;
2525

26+
private Long claimMinIdleTime;
27+
2628
/**
2729
* Builder entry points for {@link XReadArgs}.
2830
*/
@@ -90,6 +92,21 @@ public static XReadArgs noack(boolean noack) {
9092
return new XReadArgs().noack(noack);
9193
}
9294

95+
/**
96+
* Create a new {@link XReadArgs} and set CLAIM min-idle-time (milliseconds). Only valid for XREADGROUP.
97+
*/
98+
public static XReadArgs claim(long milliseconds) {
99+
return new XReadArgs().claim(milliseconds);
100+
}
101+
102+
/**
103+
* Create a new {@link XReadArgs} and set CLAIM min-idle-time. Only valid for XREADGROUP.
104+
*/
105+
public static XReadArgs claim(Duration timeout) {
106+
LettuceAssert.notNull(timeout, "Claim timeout must not be null");
107+
return claim(timeout.toMillis());
108+
}
109+
93110
}
94111

95112
/**
@@ -141,6 +158,29 @@ public XReadArgs noack(boolean noack) {
141158
return this;
142159
}
143160

161+
/**
162+
* Claim idle pending messages first with a minimum idle time (milliseconds). Only valid for XREADGROUP.
163+
*
164+
* @since 7.0
165+
*/
166+
public XReadArgs claim(long milliseconds) {
167+
168+
this.claimMinIdleTime = milliseconds;
169+
return this;
170+
}
171+
172+
/**
173+
* Claim idle pending messages first with a minimum idle time. Only valid for XREADGROUP.
174+
*
175+
* @since 7.0
176+
*/
177+
public XReadArgs claim(Duration timeout) {
178+
179+
LettuceAssert.notNull(timeout, "Claim timeout must not be null");
180+
181+
return claim(timeout.toMillis());
182+
}
183+
144184
public <K, V> void build(CommandArgs<K, V> args) {
145185

146186
if (block != null) {
@@ -154,6 +194,10 @@ public <K, V> void build(CommandArgs<K, V> args) {
154194
if (noack) {
155195
args.add(CommandKeyword.NOACK);
156196
}
197+
198+
if (claimMinIdleTime != null) {
199+
args.add("CLAIM").add(claimMinIdleTime);
200+
}
157201
}
158202

159203
/**

src/main/java/io/lettuce/core/output/StreamReadOutput.java

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
import io.lettuce.core.StreamMessage;
1010
import io.lettuce.core.codec.RedisCodec;
11+
import io.lettuce.core.ClaimedStreamMessage;
12+
1113
import io.lettuce.core.internal.LettuceAssert;
1214

1315
/**
@@ -31,6 +33,10 @@ public class StreamReadOutput<K, V> extends CommandOutput<K, V, List<StreamMessa
3133

3234
private Map<K, V> body;
3335

36+
private Long msSinceLastDelivery;
37+
38+
private Long redeliveryCount;
39+
3440
private boolean bodyReceived = false;
3541

3642
public StreamReadOutput(RedisCodec<K, V> codec) {
@@ -51,6 +57,20 @@ public void set(ByteBuffer bytes) {
5157
return;
5258
}
5359

60+
// Handle extra metadata for claimed entries that may arrive as bulk strings (RESP2/RESP3)
61+
if (id != null && bodyReceived && key == null && bytes != null) {
62+
// Use a duplicate so decoding doesn't advance the original buffer position.
63+
String s = decodeString(bytes.duplicate());
64+
if (msSinceLastDelivery == null && isDigits(s)) {
65+
msSinceLastDelivery = Long.parseLong(s);
66+
return;
67+
}
68+
if (redeliveryCount == null && isDigits(s)) {
69+
redeliveryCount = Long.parseLong(s);
70+
return;
71+
}
72+
}
73+
5474
if (id == null) {
5575
id = decodeString(bytes);
5676
return;
@@ -75,6 +95,23 @@ public void set(ByteBuffer bytes) {
7595
key = null;
7696
}
7797

98+
@Override
99+
public void set(long integer) {
100+
101+
// Extra integers appear only for claimed entries (XREADGROUP with CLAIM)
102+
if (id != null && bodyReceived) {
103+
if (msSinceLastDelivery == null) {
104+
msSinceLastDelivery = integer;
105+
return;
106+
}
107+
if (redeliveryCount == null) {
108+
redeliveryCount = integer;
109+
return;
110+
}
111+
}
112+
super.set(integer);
113+
}
114+
78115
@Override
79116
public void multi(int count) {
80117

@@ -91,15 +128,25 @@ public void multi(int count) {
91128
@Override
92129
public void complete(int depth) {
93130

94-
if (depth == 3 && bodyReceived) {
95-
subscriber.onNext(output, new StreamMessage<>(stream, id, body == null ? Collections.emptyMap() : body));
131+
// Emit the message when the entry array (id/body[/extras]) completes.
132+
if (depth == 2 && bodyReceived) {
133+
Map<K, V> map = body == null ? Collections.emptyMap() : body;
134+
if (msSinceLastDelivery != null || redeliveryCount != null) {
135+
subscriber.onNext(output,
136+
new ClaimedStreamMessage<>(stream, id, map, msSinceLastDelivery == null ? 0L : msSinceLastDelivery,
137+
redeliveryCount == null ? 0L : redeliveryCount));
138+
} else {
139+
subscriber.onNext(output, new StreamMessage<>(stream, id, map));
140+
}
96141
bodyReceived = false;
97142
key = null;
98143
body = null;
99144
id = null;
145+
msSinceLastDelivery = null;
146+
redeliveryCount = null;
100147
}
101148

102-
// RESP2/RESP3 compat
149+
// RESP2/RESP3 compat for stream key reset upon finishing the outer array element
103150
if (depth == 2 && skipStreamKeyReset) {
104151
skipStreamKeyReset = false;
105152
}
@@ -113,6 +160,17 @@ public void complete(int depth) {
113160
}
114161
}
115162

163+
private static boolean isDigits(String s) {
164+
if (s == null || s.isEmpty())
165+
return false;
166+
for (int i = 0; i < s.length(); i++) {
167+
char c = s.charAt(i);
168+
if (c < '0' || c > '9')
169+
return false;
170+
}
171+
return true;
172+
}
173+
116174
@Override
117175
public void setSubscriber(Subscriber<StreamMessage<K, V>> subscriber) {
118176
LettuceAssert.notNull(subscriber, "Subscriber must not be null");

0 commit comments

Comments
 (0)