Skip to content
This repository was archived by the owner on Jun 23, 2025. It is now read-only.

Commit fee772b

Browse files
authored
migrate zadd (#1613)
1 parent f2201d4 commit fee772b

File tree

9 files changed

+315
-41
lines changed

9 files changed

+315
-41
lines changed

internal/cmd/cmd_del.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/dicedb/dice/internal/errors"
88
"github.com/dicedb/dice/internal/shardmanager"
99
dstore "github.com/dicedb/dice/internal/store"
10-
"github.com/dicedb/dicedb-go/wire"
1110
)
1211

1312
var cDEL = &CommandMeta{
@@ -54,9 +53,7 @@ func evalDEL(c *Cmd, s *dstore.Store) (*CmdRes, error) {
5453
}
5554
}
5655

57-
return &CmdRes{R: &wire.Response{
58-
Value: &wire.Response_VInt{VInt: int64(count)},
59-
}}, nil
56+
return cmdResInt(int64(count)), nil
6057
}
6158

6259
func executeDEL(c *Cmd, sm *shardmanager.ShardManager) (*CmdRes, error) {
@@ -73,7 +70,5 @@ func executeDEL(c *Cmd, sm *shardmanager.ShardManager) (*CmdRes, error) {
7370
}
7471
count += r.R.GetVInt()
7572
}
76-
return &CmdRes{R: &wire.Response{
77-
Value: &wire.Response_VInt{VInt: count},
78-
}}, nil
73+
return cmdResInt(count), nil
7974
}

internal/cmd/cmd_exists.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/dicedb/dice/internal/shard"
99
"github.com/dicedb/dice/internal/shardmanager"
1010
dstore "github.com/dicedb/dice/internal/store"
11-
"github.com/dicedb/dicedb-go/wire"
1211
)
1312

1413
var cEXISTS = &CommandMeta{
@@ -50,9 +49,7 @@ func evalEXISTS(c *Cmd, s *dstore.Store) (*CmdRes, error) {
5049
}
5150

5251
// Return the count as a response
53-
return &CmdRes{R: &wire.Response{
54-
Value: &wire.Response_VInt{VInt: count},
55-
}}, nil
52+
return cmdResInt(count), nil
5653
}
5754

5855
func executeEXISTS(c *Cmd, sm *shardmanager.ShardManager) (*CmdRes, error) {
@@ -74,7 +71,5 @@ func executeEXISTS(c *Cmd, sm *shardmanager.ShardManager) (*CmdRes, error) {
7471
}
7572
count += r.R.GetVInt()
7673
}
77-
return &CmdRes{R: &wire.Response{
78-
Value: &wire.Response_VInt{VInt: count},
79-
}}, nil
74+
return cmdResInt(count), nil
8075
}

internal/cmd/cmd_expiretime.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/dicedb/dice/internal/errors"
88
"github.com/dicedb/dice/internal/shardmanager"
99
dstore "github.com/dicedb/dice/internal/store"
10-
"github.com/dicedb/dicedb-go/wire"
1110
)
1211

1312
var cExpireTime *CommandMeta = &CommandMeta{
@@ -57,11 +56,7 @@ func evalEXPIRETIME(c *Cmd, dst *dstore.Store) (*CmdRes, error) {
5756
}
5857

5958
// returns the absolute Unix timestamp (since January 1, 1970) in seconds at which the given key will expire
60-
return &CmdRes{R: &wire.Response{
61-
Value: &wire.Response_VInt{
62-
VInt: int64(expiry / 1000),
63-
},
64-
}}, nil
59+
return cmdResInt(int64(expiry / 1000)), nil
6560
}
6661

6762
func executeEXPIRETIME(c *Cmd, sm *shardmanager.ShardManager) (*CmdRes, error) {

internal/cmd/cmd_hset.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/dicedb/dice/internal/object"
99
"github.com/dicedb/dice/internal/shardmanager"
1010
dstore "github.com/dicedb/dice/internal/store"
11-
"github.com/dicedb/dicedb-go/wire"
1211
)
1312

1413
type SSMap map[string]string
@@ -101,9 +100,7 @@ func evalHSET(c *Cmd, s *dstore.Store) (*CmdRes, error) {
101100
obj = s.NewObj(m, -1, object.ObjTypeSSMap)
102101
s.Put(key, obj)
103102

104-
return &CmdRes{R: &wire.Response{
105-
Value: &wire.Response_VInt{VInt: newFields},
106-
}}, nil
103+
return cmdResInt(newFields), nil
107104
}
108105

109106
func executeHSET(c *Cmd, sm *shardmanager.ShardManager) (*CmdRes, error) {

internal/cmd/cmd_incrby.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/dicedb/dice/internal/object"
1111
"github.com/dicedb/dice/internal/shardmanager"
1212
dstore "github.com/dicedb/dice/internal/store"
13-
"github.com/dicedb/dicedb-go/wire"
1413
)
1514

1615
var cINCRBY = &CommandMeta{
@@ -56,9 +55,7 @@ func doIncr(c *Cmd, s *dstore.Store, delta int64) (*CmdRes, error) {
5655
if obj == nil {
5756
obj = s.NewObj(delta, -1, object.ObjTypeInt)
5857
s.Put(key, obj)
59-
return &CmdRes{R: &wire.Response{
60-
Value: &wire.Response_VInt{VInt: delta},
61-
}}, nil
58+
return cmdResInt(delta), nil
6259
}
6360

6461
switch obj.Type {
@@ -73,9 +70,7 @@ func doIncr(c *Cmd, s *dstore.Store, delta int64) (*CmdRes, error) {
7370
value += delta
7471
obj.Value = value
7572

76-
return &CmdRes{R: &wire.Response{
77-
Value: &wire.Response_VInt{VInt: value},
78-
}}, nil
73+
return cmdResInt(value), nil
7974
}
8075

8176
func executeINCRBY(c *Cmd, sm *shardmanager.ShardManager) (*CmdRes, error) {

internal/cmd/cmd_ttl.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/dicedb/dice/internal/server/utils"
99
"github.com/dicedb/dice/internal/shardmanager"
1010
dstore "github.com/dicedb/dice/internal/store"
11-
"github.com/dicedb/dicedb-go/wire"
1211
)
1312

1413
var cTTL = &CommandMeta{
@@ -50,24 +49,18 @@ func evalTTL(c *Cmd, s *dstore.Store) (*CmdRes, error) {
5049

5150
obj := s.Get(key)
5251
if obj == nil {
53-
return &CmdRes{R: &wire.Response{
54-
Value: &wire.Response_VInt{VInt: -2},
55-
}}, nil
52+
return cmdResInt(-2), nil
5653
}
5754

5855
exp, isExpirySet := dstore.GetExpiry(obj, s)
5956

6057
if !isExpirySet {
61-
return &CmdRes{R: &wire.Response{
62-
Value: &wire.Response_VInt{VInt: -1},
63-
}}, nil
58+
return cmdResInt(-1), nil
6459
}
6560

6661
durationMs := exp - uint64(utils.GetCurrentTime().UnixMilli())
6762

68-
return &CmdRes{R: &wire.Response{
69-
Value: &wire.Response_VInt{VInt: int64(durationMs / 1000)},
70-
}}, nil
63+
return cmdResInt(int64(durationMs / 1000)), nil
7164
}
7265

7366
func executeTTL(c *Cmd, sm *shardmanager.ShardManager) (*CmdRes, error) {

internal/cmd/cmd_zadd.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package cmd
2+
3+
import (
4+
"math"
5+
"strconv"
6+
"strings"
7+
8+
"github.com/dicedb/dice/internal/errors"
9+
"github.com/dicedb/dice/internal/eval/sortedset"
10+
"github.com/dicedb/dice/internal/object"
11+
"github.com/dicedb/dice/internal/shardmanager"
12+
dsstore "github.com/dicedb/dice/internal/store"
13+
)
14+
15+
var cZADD = &CommandMeta{
16+
Name: "ZADD",
17+
Syntax: "ZADD key [NX | XX] [GT | LT] [CH] [INCR] score member [score member...]",
18+
HelpShort: "Adds all the specified members with the specified scores to the sorted set stored at key",
19+
HelpLong: `
20+
Adds all the specified members with the specified scores to the sorted set stored at key
21+
Options: NX, XX, CH, INCR, GT, LT, CH
22+
- NX: Only add new elements and do not update existing elements
23+
- XX: Only update existing elements and do not add new elements
24+
- CH: Modify the return value from the number of new elements added, to the total number of elements changed
25+
- INCR: When this option is specified, the elements are treated as increments to the score of the existing elements
26+
- GT: Only add new elements if the score is greater than the existing score
27+
- LT: Only add new elements if the score is less than the existing score
28+
Returns the number of elements added to the sorted set, not including elements already existing for which the score was updated.
29+
`,
30+
Examples: `
31+
localhost:7379> ZADD mySortedSet 1 foo 2 bar
32+
OK 2
33+
`,
34+
Eval: evalZADD,
35+
Execute: executeZADD,
36+
}
37+
38+
func init() {
39+
CommandRegistry.AddCommand(cZADD)
40+
}
41+
42+
func evalZADD(c *Cmd, s *dsstore.Store) (*CmdRes, error) {
43+
if len(c.C.Args) < 3 {
44+
return cmdResNil, errors.ErrWrongArgumentCount("ZADD")
45+
}
46+
47+
key := c.C.Args[0]
48+
params := map[float64]string{}
49+
flags := map[string]bool{}
50+
51+
for i := 1; i < len(c.C.Args); i++ {
52+
arg := strings.ToUpper(c.C.Args[i])
53+
switch arg {
54+
// Only valid options are allowed
55+
case "XX", "NX", "LT", "GT", "CH", "INCR":
56+
flags[arg] = true
57+
default:
58+
// This should be a float repesneting the score. If its not, its a problem.
59+
score, err := strconv.ParseFloat(arg, 64)
60+
if err != nil || math.IsNaN(score) {
61+
return cmdResNil, errors.ErrInvalidNumberFormat
62+
}
63+
if i+1 >= len(c.C.Args) {
64+
return cmdResNil, errors.ErrWrongArgumentCount("ZADD")
65+
}
66+
// Its a score-member pair
67+
params[score] = c.C.Args[i+1]
68+
i++
69+
}
70+
}
71+
72+
if flags["NX"] && flags["XX"] {
73+
return cmdResNil, errors.ErrGeneral("XX and NX options at the same time are not compatible")
74+
}
75+
if (flags["GT"] && flags["NX"]) || (flags["LT"] && flags["NX"]) || (flags["GT"] && flags["LT"]) {
76+
return cmdResNil, errors.ErrGeneral("GT, LT, and/or NX options at the same time are not compatible")
77+
}
78+
if flags["INCR"] && len(params) > 1 {
79+
return cmdResNil, errors.ErrGeneral("INCR option supports a single increment-element pair")
80+
}
81+
sortedSet, err := getOrCreateSortedSet(s, key)
82+
if err != nil {
83+
return cmdResNil, err
84+
}
85+
// all processing takes place here
86+
return processMembersWithFlags(params, sortedSet, s, key, flags)
87+
}
88+
89+
func executeZADD(c *Cmd, sm *shardmanager.ShardManager) (*CmdRes, error) {
90+
if len(c.C.Args) < 3 {
91+
return cmdResNil, errors.ErrWrongArgumentCount("ZADD")
92+
}
93+
94+
shard := sm.GetShardForKey(c.C.Args[0])
95+
return evalZADD(c, shard.Thread.Store())
96+
}
97+
98+
func getOrCreateSortedSet(store *dsstore.Store, key string) (*sortedset.Set, error) {
99+
obj := store.Get(key)
100+
if obj != nil {
101+
sortedSet, err := sortedset.FromObject(obj)
102+
if err != nil {
103+
return nil, errors.ErrWrongTypeOperation
104+
}
105+
return sortedSet, nil
106+
}
107+
return sortedset.New(), nil
108+
}
109+
110+
// processMembersWithFlags processes the members and scores while handling flags.
111+
func processMembersWithFlags(params map[float64]string, sortedSet *sortedset.Set, store *dsstore.Store, key string, flags map[string]bool) (*CmdRes, error) {
112+
added, updated := 0, 0
113+
114+
for score, member := range params {
115+
currentScore, exists := sortedSet.Get(member)
116+
117+
// If INCR is used, increment the score first
118+
if flags["INCR"] {
119+
if exists {
120+
score += currentScore
121+
} else {
122+
score = 0.0 + score
123+
}
124+
125+
// Now check GT and LT conditions based on the incremented score and return accordingly
126+
if (flags["GT"] && exists && score <= currentScore) ||
127+
(flags["LT"] && exists && score >= currentScore) {
128+
return nil, nil
129+
}
130+
}
131+
132+
// Check if the member should be skipped based on NX or XX flags
133+
if shouldSkipMember(score, currentScore, exists, flags) {
134+
continue
135+
}
136+
137+
// Insert or update the member in the sorted set
138+
wasInserted := sortedSet.Upsert(score, member)
139+
140+
if wasInserted && !exists {
141+
added++
142+
} else if exists && score != currentScore {
143+
updated++
144+
}
145+
146+
// If INCR is used, exit after processing one score-member pair
147+
if flags["INCR"] {
148+
return cmdResFloat(score), nil
149+
}
150+
}
151+
152+
// Store the updated sorted set in the store
153+
storeUpdatedSet(store, key, sortedSet)
154+
155+
if flags["CH"] {
156+
return cmdResInt(int64(added + updated)), nil
157+
}
158+
159+
// Return only the count of added members
160+
return cmdResInt(int64(added)), nil
161+
}
162+
163+
// shouldSkipMember determines if a member should be skipped based on flags.
164+
func shouldSkipMember(score, currentScore float64, exists bool, flags map[string]bool) bool {
165+
if flags["NX"] && exists || flags["XX"] && !exists {
166+
return true
167+
}
168+
if exists {
169+
if flags["LT"] && score >= currentScore {
170+
return true
171+
}
172+
if flags["GT"] && score <= currentScore {
173+
return true
174+
}
175+
}
176+
return false
177+
}
178+
179+
// storeUpdatedSet stores the updated sorted set in the store.
180+
func storeUpdatedSet(store *dsstore.Store, key string, sortedSet *sortedset.Set) {
181+
store.Put(key, store.NewObj(sortedSet, -1, object.ObjTypeSortedSet), dsstore.WithPutCmd(dsstore.ZAdd))
182+
}

internal/cmd/cmds.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,3 +181,16 @@ var cmdResIntNegOne = &CmdRes{R: &wire.Response{
181181
var cmdResIntNegTwo = &CmdRes{R: &wire.Response{
182182
Value: &wire.Response_VInt{VInt: -2},
183183
}}
184+
185+
// Utility functions to create int CmdRes object. This function will get inlined so should cause no overhead.
186+
func cmdResInt(i int64) *CmdRes {
187+
return &CmdRes{R: &wire.Response{
188+
Value: &wire.Response_VInt{VInt: i},
189+
}}
190+
}
191+
192+
func cmdResFloat(f float64) *CmdRes {
193+
return &CmdRes{R: &wire.Response{
194+
Value: &wire.Response_VFloat{VFloat: f},
195+
}}
196+
}

0 commit comments

Comments
 (0)