Skip to content
This repository was archived by the owner on Jun 23, 2025. It is now read-only.

Commit cf52728

Browse files
authored
Fix concurrent client connections watch mode (#1651)
* Fix concurrent client connection issue watch mode * Fix concurrent client connection issue watch mode for shared cmdResNil obj
1 parent f3b7278 commit cf52728

File tree

3 files changed

+25
-2
lines changed

3 files changed

+25
-2
lines changed

internal/cmd/cmd_get.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func executeGET(c *Cmd, sm *shardmanager.ShardManager) (*CmdRes, error) {
5858

5959
func cmdResFromObject(obj *object.Obj) (*CmdRes, error) {
6060
if obj == nil {
61-
return cmdResNil, nil
61+
return GetNilRes(), nil
6262
}
6363

6464
switch obj.Type {

internal/cmd/cmds.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ func (c *Cmd) Key() string {
4444
}
4545

4646
func (c *Cmd) Execute(sm *shardmanager.ShardManager) (*CmdRes, error) {
47-
res := cmdResNil
47+
res := GetNilRes()
48+
4849
err := errors.ErrUnknownCmd(c.C.Cmd)
4950
start := time.Now()
5051
if c.Meta == nil {
@@ -151,6 +152,12 @@ func (cmd *DiceDBCmd) Key() string {
151152
return c
152153
}
153154

155+
func GetNilRes() *CmdRes {
156+
return &CmdRes{R: &wire.Response{
157+
Value: &wire.Response_VNil{VNil: true},
158+
}}
159+
}
160+
154161
var cmdResNil = &CmdRes{R: &wire.Response{
155162
Value: &wire.Response_VNil{VNil: true},
156163
}}

internal/server/ironhawk/watch_manager.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88
"log/slog"
99
"strconv"
1010
"strings"
11+
"sync"
1112

1213
"github.com/dicedb/dice/internal/cmd"
1314
"github.com/dicedb/dice/internal/shardmanager"
1415
)
1516

1617
type WatchManager struct {
18+
mu sync.RWMutex
1719
clientWatchThreadMap map[string]*IOThread
1820

1921
keyFPMap map[string]map[uint32]bool
@@ -32,12 +34,17 @@ func NewWatchManager() *WatchManager {
3234
}
3335

3436
func (w *WatchManager) RegisterThread(t *IOThread) {
37+
w.mu.Lock()
38+
defer w.mu.Unlock()
3539
if t.Mode == "watch" {
3640
w.clientWatchThreadMap[t.ClientID] = t
3741
}
3842
}
3943

4044
func (w *WatchManager) HandleWatch(c *cmd.Cmd, t *IOThread) {
45+
w.mu.Lock()
46+
defer w.mu.Unlock()
47+
4148
fp, key := c.Fingerprint(), c.Key()
4249
slog.Debug("creating a new subscription",
4350
slog.String("key", key),
@@ -68,6 +75,9 @@ func (w *WatchManager) HandleWatch(c *cmd.Cmd, t *IOThread) {
6875
}
6976

7077
func (w *WatchManager) HandleUnwatch(c *cmd.Cmd, t *IOThread) {
78+
w.mu.Lock()
79+
defer w.mu.Unlock()
80+
7181
if len(c.C.Args) != 1 {
7282
return
7383
}
@@ -102,6 +112,9 @@ func (w *WatchManager) HandleUnwatch(c *cmd.Cmd, t *IOThread) {
102112
}
103113

104114
func (w *WatchManager) CleanupThreadWatchSubscriptions(t *IOThread) {
115+
w.mu.Lock()
116+
defer w.mu.Unlock()
117+
105118
// Delete the mapping of Watch thread to client id
106119
delete(w.clientWatchThreadMap, t.ClientID)
107120

@@ -117,6 +130,9 @@ func (w *WatchManager) CleanupThreadWatchSubscriptions(t *IOThread) {
117130
}
118131

119132
func (w *WatchManager) NotifyWatchers(c *cmd.Cmd, shardManager *shardmanager.ShardManager, t *IOThread) {
133+
w.mu.Lock()
134+
defer w.mu.Unlock()
135+
120136
key := c.Key()
121137
for fp := range w.keyFPMap[key] {
122138
_c := w.fpCmdMap[fp]

0 commit comments

Comments
 (0)