Skip to content

Commit 8b98eab

Browse files
Adds support for the XNACK command introduced in Redis 8.8
1 parent 6cc94eb commit 8b98eab

3 files changed

Lines changed: 292 additions & 16 deletions

File tree

commands_test.go

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"crypto/sha1"
77
"encoding/json"
88
"fmt"
9+
"math"
910
"reflect"
1011
"strconv"
1112
"time"
@@ -7625,6 +7626,206 @@ var _ = Describe("Commands", func() {
76257626
Expect(n).To(Equal(int64(2)))
76267627
})
76277628

7629+
It("should not XNack with no mode", func() {
7630+
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")
7631+
7632+
// Mode is required by Redis; omitting it should return an error.
7633+
_, err := client.XNack(ctx, &redis.XNackArgs{
7634+
Stream: "stream",
7635+
Group: "group",
7636+
IDs: []string{"1-0", "2-0"},
7637+
}).Result()
7638+
Expect(err).To(HaveOccurred())
7639+
Expect(err.Error()).To(ContainSubstring("mode must be SILENT, FAIL, or FATAL"))
7640+
})
7641+
7642+
It("should XNack with SILENT mode", func() {
7643+
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")
7644+
7645+
// All 3 messages are pending (delivered by BeforeEach), each with delivery_count=1.
7646+
// SILENT: consumer shutting down; delivery counter decremented by 1 (1 → 0).
7647+
n, err := client.XNack(ctx, &redis.XNackArgs{
7648+
Stream: "stream",
7649+
Group: "group",
7650+
Mode: "SILENT",
7651+
IDs: []string{"1-0", "2-0"},
7652+
}).Result()
7653+
Expect(err).NotTo(HaveOccurred())
7654+
Expect(n).To(Equal(int64(2)))
7655+
7656+
// NACKed messages move to unassigned "nacked" state in the PEL.
7657+
// Total PEL count stays 3; only 3-0 remains assigned to consumer.
7658+
pendingInfo, err := client.XPending(ctx, "stream", "group").Result()
7659+
Expect(err).NotTo(HaveOccurred())
7660+
Expect(pendingInfo.Count).To(Equal(int64(3)))
7661+
Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 1}))
7662+
7663+
// Verify the delivery counter was decremented from 1 to 0 for the NACKed messages.
7664+
infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{
7665+
Stream: "stream",
7666+
Group: "group",
7667+
Start: "-",
7668+
End: "+",
7669+
Count: 10,
7670+
}).Result()
7671+
Expect(err).NotTo(HaveOccurred())
7672+
for _, e := range infoExt {
7673+
if e.ID == "1-0" || e.ID == "2-0" {
7674+
Expect(e.RetryCount).To(Equal(int64(0)), "SILENT should decrement delivery counter to 0 for %s", e.ID)
7675+
}
7676+
}
7677+
})
7678+
7679+
It("should XNack with FAIL mode", func() {
7680+
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")
7681+
7682+
// FAIL: delivery counter stays the same (1 → 1).
7683+
n, err := client.XNack(ctx, &redis.XNackArgs{
7684+
Stream: "stream",
7685+
Group: "group",
7686+
Mode: "FAIL",
7687+
IDs: []string{"1-0"},
7688+
}).Result()
7689+
Expect(err).NotTo(HaveOccurred())
7690+
Expect(n).To(Equal(int64(1)))
7691+
7692+
// NACKed message moves to unassigned "nacked" state.
7693+
// Only 2 messages (2-0, 3-0) remain assigned to consumer.
7694+
pendingInfo, err := client.XPending(ctx, "stream", "group").Result()
7695+
Expect(err).NotTo(HaveOccurred())
7696+
Expect(pendingInfo.Count).To(Equal(int64(3)))
7697+
Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 2}))
7698+
7699+
// Verify the delivery counter was left unchanged at 1.
7700+
infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{
7701+
Stream: "stream",
7702+
Group: "group",
7703+
Start: "-",
7704+
End: "+",
7705+
Count: 10,
7706+
}).Result()
7707+
Expect(err).NotTo(HaveOccurred())
7708+
for _, e := range infoExt {
7709+
if e.ID == "1-0" {
7710+
Expect(e.RetryCount).To(Equal(int64(1)), "FAIL should leave delivery counter unchanged")
7711+
}
7712+
}
7713+
})
7714+
7715+
It("should XNack with FATAL mode", func() {
7716+
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")
7717+
7718+
// FATAL: delivery counter set to MAXINT (for invalid/malicious messages).
7719+
n, err := client.XNack(ctx, &redis.XNackArgs{
7720+
Stream: "stream",
7721+
Group: "group",
7722+
Mode: "FATAL",
7723+
IDs: []string{"1-0"},
7724+
}).Result()
7725+
Expect(err).NotTo(HaveOccurred())
7726+
Expect(n).To(Equal(int64(1)))
7727+
7728+
// NACKed message moves to unassigned state; 2-0 and 3-0 remain assigned.
7729+
pendingInfo, err := client.XPending(ctx, "stream", "group").Result()
7730+
Expect(err).NotTo(HaveOccurred())
7731+
Expect(pendingInfo.Count).To(Equal(int64(3)))
7732+
Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 2}))
7733+
7734+
// Verify the delivery counter was set to MAXINT (math.MaxInt64).
7735+
infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{
7736+
Stream: "stream",
7737+
Group: "group",
7738+
Start: "-",
7739+
End: "+",
7740+
Count: 10,
7741+
}).Result()
7742+
Expect(err).NotTo(HaveOccurred())
7743+
for _, e := range infoExt {
7744+
if e.ID == "1-0" {
7745+
Expect(e.RetryCount).To(Equal(int64(math.MaxInt64)), "FATAL should set delivery counter to MAXINT")
7746+
}
7747+
}
7748+
})
7749+
7750+
It("should XNack nacked-count reflected in XINFO STREAM FULL", func() {
7751+
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")
7752+
7753+
// NACK two messages.
7754+
n, err := client.XNack(ctx, &redis.XNackArgs{
7755+
Stream: "stream",
7756+
Group: "group",
7757+
Mode: "FAIL",
7758+
IDs: []string{"1-0", "2-0"},
7759+
}).Result()
7760+
Expect(err).NotTo(HaveOccurred())
7761+
Expect(n).To(Equal(int64(2)))
7762+
7763+
// Verify nacked-count in XINFO STREAM FULL.
7764+
info, err := client.XInfoStreamFull(ctx, "stream", 10).Result()
7765+
Expect(err).NotTo(HaveOccurred())
7766+
Expect(info.Groups).To(HaveLen(1))
7767+
Expect(info.Groups[0].NackedCount).To(Equal(uint64(2)))
7768+
})
7769+
7770+
It("should XNack with RetryCount", func() {
7771+
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")
7772+
7773+
retryCount := uint64(5)
7774+
n, err := client.XNack(ctx, &redis.XNackArgs{
7775+
Stream: "stream",
7776+
Group: "group",
7777+
Mode: "FAIL",
7778+
IDs: []string{"1-0"},
7779+
RetryCount: &retryCount,
7780+
}).Result()
7781+
Expect(err).NotTo(HaveOccurred())
7782+
Expect(n).To(Equal(int64(1)))
7783+
7784+
// Verify the delivery counter was set to the explicit RetryCount value,
7785+
// overriding the mode's default adjustment.
7786+
infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{
7787+
Stream: "stream",
7788+
Group: "group",
7789+
Start: "-",
7790+
End: "+",
7791+
Count: 10,
7792+
}).Result()
7793+
Expect(err).NotTo(HaveOccurred())
7794+
// Find 1-0 and verify its delivery counter.
7795+
var entry redis.XPendingExt
7796+
for _, e := range infoExt {
7797+
if e.ID == "1-0" {
7798+
entry = e
7799+
break
7800+
}
7801+
}
7802+
Expect(entry.ID).To(Equal("1-0"))
7803+
Expect(entry.RetryCount).To(Equal(int64(retryCount)))
7804+
})
7805+
7806+
It("should XNack with Force", func() {
7807+
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")
7808+
7809+
// Force creates a new NACKed PEL entry for an ID that was never
7810+
// delivered to a consumer via XREADGROUP. Without Force this would
7811+
// be a no-op (the ID is not in any consumer's PEL).
7812+
n, err := client.XNack(ctx, &redis.XNackArgs{
7813+
Stream: "stream",
7814+
Group: "group",
7815+
Mode: "FAIL",
7816+
IDs: []string{"1-0"},
7817+
Force: true,
7818+
}).Result()
7819+
Expect(err).NotTo(HaveOccurred())
7820+
Expect(n).To(Equal(int64(1)))
7821+
7822+
// The entry is now in the group's PEL as an unassigned NACKed entry.
7823+
pendingInfo, err := client.XPending(ctx, "stream", "group").Result()
7824+
Expect(err).NotTo(HaveOccurred())
7825+
Expect(pendingInfo.Count).To(Equal(int64(3)))
7826+
Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 2}))
7827+
})
7828+
76287829
It("should XReadGroup with CLAIM argument", func() {
76297830
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")
76307831

doctests/stream_tutorial_test.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -889,38 +889,53 @@ func ExampleClient_raceitaly() {
889889
fmt.Println(res31a) // >>> 0-0
890890
// STEP_END
891891

892+
// STEP_START xnack
893+
res32, err := rdb.XNack(ctx, &redis.XNackArgs{
894+
Stream: "race:italy",
895+
Group: "italy_riders",
896+
Mode: "FAIL",
897+
IDs: []string{"1692632662819-0"},
898+
}).Result()
899+
900+
if err != nil {
901+
panic(err)
902+
}
903+
904+
fmt.Println(res32) // >>> 1
905+
// STEP_END
906+
892907
// STEP_START xinfo
893-
res32, err := rdb.XInfoStream(ctx, "race:italy").Result()
908+
res33, err := rdb.XInfoStream(ctx, "race:italy").Result()
894909

895910
if err != nil {
896911
panic(err)
897912
}
898913

899-
fmt.Println(res32.Length)
914+
fmt.Println(res33.Length)
900915
// >>> 5
901-
fmt.Println(res32.FirstEntry)
916+
fmt.Println(res33.FirstEntry)
902917
// >>> {1692632639151-0 map[rider:Castilla] 0 0}
903918
// STEP_END
904919

905920
// STEP_START xinfo_groups
906-
res33, err := rdb.XInfoGroups(ctx, "race:italy").Result()
921+
res34, err := rdb.XInfoGroups(ctx, "race:italy").Result()
907922

908923
if err != nil {
909924
panic(err)
910925
}
911926

912-
fmt.Println(res33)
927+
fmt.Println(res34)
913928
// >>> [{italy_riders 3 2 1692632662819-0 3 2}]
914929
// STEP_END
915930

916931
// STEP_START xinfo_consumers
917-
res34, err := rdb.XInfoConsumers(ctx, "race:italy", "italy_riders").Result()
932+
res35, err := rdb.XInfoConsumers(ctx, "race:italy", "italy_riders").Result()
918933

919934
if err != nil {
920935
panic(err)
921936
}
922937

923-
// fmt.Println(res34)
938+
// fmt.Println(res35)
924939
// >>> [{Alice 1 1ms 1ms} {Bob 1 2ms 2ms} {Lora 0 1ms -1ms}]
925940
// STEP_END
926941

@@ -958,46 +973,46 @@ func ExampleClient_raceitaly() {
958973
panic(err)
959974
}
960975

961-
res35, err := rdb.XLen(ctx, "race:italy").Result()
976+
res36, err := rdb.XLen(ctx, "race:italy").Result()
962977

963978
if err != nil {
964979
panic(err)
965980
}
966981

967-
fmt.Println(res35) // >>> 2
982+
fmt.Println(res36) // >>> 2
968983

969-
res36, err := rdb.XRange(ctx, "race:italy", "-", "+").Result()
984+
res37, err := rdb.XRange(ctx, "race:italy", "-", "+").Result()
970985

971986
if err != nil {
972987
panic(err)
973988
}
974989

975-
// fmt.Println(res36)
990+
// fmt.Println(res37)
976991
// >>> [{1726649529170-1 map[rider:Wood] 0 0} {1726649529171-0 map[rider:Henshaw] 0 0}]
977992
// STEP_END
978993

979994
// STEP_START xtrim
980-
res37, err := rdb.XTrimMaxLen(ctx, "race:italy", 10).Result()
995+
res38, err := rdb.XTrimMaxLen(ctx, "race:italy", 10).Result()
981996

982997
if err != nil {
983998
panic(err)
984999
}
9851000

986-
fmt.Println(res37) // >>> 0
1001+
fmt.Println(res38) // >>> 0
9871002
// STEP_END
9881003

9891004
// STEP_START xtrim2
990-
res38, err := rdb.XTrimMaxLenApprox(ctx, "race:italy", 10, 20).Result()
1005+
res39, err := rdb.XTrimMaxLenApprox(ctx, "race:italy", 10, 20).Result()
9911006

9921007
if err != nil {
9931008
panic(err)
9941009
}
9951010

996-
fmt.Println(res38) // >>> 0
1011+
fmt.Println(res39) // >>> 0
9971012
// STEP_END
9981013

9991014
// REMOVE_START
1000-
UNUSED(res27, res34, res36)
1015+
UNUSED(res27, res35, res37)
10011016
// REMOVE_END
10021017

10031018
// Output:
@@ -1012,6 +1027,7 @@ func ExampleClient_raceitaly() {
10121027
// 1692632662819-0
10131028
// []
10141029
// 0-0
1030+
// 1
10151031
// 5
10161032
// {1692632639151-0 map[rider:Castilla] 0 0}
10171033
// [{italy_riders 3 2 1692632662819-0 3 2}]

0 commit comments

Comments
 (0)