Skip to content

Commit fbe0586

Browse files
uriyagezuiderkwast
authored andcommitted
Fix bad slot used in sharded pubsub unsubscribe (valkey-io#2137)
This commit fixes two issues in `pubsubUnsubscribeChannel` that could lead to memory corruption: 1. When calculating the slot for a channel, we were using getKeySlot() which might use the current_client's slot if available. This is problematic when a client kills another client (e.g., via CLIENT KILL command) as the slot won't match the channel's actual slot. 2. The `found` variable was not initialized to `NULL`, causing the serverAssert to potentially pass incorrectly when the hashtable lookup failed, leading to memory corruption in subsequent operations. The fix: - Calculate the slot directly from the channel name using keyHashSlot() instead of relying on the current client's slot - Initialize 'found' to NULL Added a test case that reproduces the issue by having one client kill another client that is subscribed to a sharded pubsub channel during a transaction. Crash log (After initializing the variable 'found' to null, without initialization, memory corruption could occur): ``` VALKEY BUG REPORT START: Cut & paste starting from here === 59707:M 24 May 2025 23:10:40.429 # === ASSERTION FAILED CLIENT CONTEXT === 59707:M 24 May 2025 23:10:40.429 # client->flags = 108086391057154048 59707:M 24 May 2025 23:10:40.429 # client->conn = fd=11 59707:M 24 May 2025 23:10:40.429 # client->argc = 0 59707:M 24 May 2025 23:10:40.429 # === RECURSIVE ASSERTION FAILED === 59707:M 24 May 2025 23:10:40.429 # ==> pubsub.c:348 'found' is not true ------ STACK TRACE ------ Backtrace: 0 valkey-server 0x0000000104974054 _serverAssertWithInfo + 112 1 valkey-server 0x000000010496c7fc pubsubUnsubscribeChannel + 268 2 valkey-server 0x000000010496cea0 pubsubUnsubscribeAllChannelsInternal + 216 3 valkey-server 0x000000010496c2e0 pubsubUnsubscribeShardAllChannels + 76 4 valkey-server 0x000000010496c1d4 freeClientPubSubData + 60 5 valkey-server 0x00000001048f3cbc freeClient + 792 6 valkey-server 0x0000000104900870 clientKillCommand + 356 7 valkey-server 0x00000001048d1790 call + 428 8 valkey-server 0x000000010496ef4c execCommand + 872 9 valkey-server 0x00000001048d1790 call + 428 10 valkey-server 0x00000001048d3a44 processCommand + 5056 11 valkey-server 0x00000001048fdc20 processCommandAndResetClient + 64 12 valkey-server 0x00000001048fdeac processInputBuffer + 276 13 valkey-server 0x00000001048f2ff0 readQueryFromClient + 148 14 valkey-server 0x0000000104a182e8 callHandler + 60 15 valkey-server 0x0000000104a1731c connSocketEventHandler + 488 16 valkey-server 0x00000001048b5e80 aeProcessEvents + 820 17 valkey-server 0x00000001048b6598 aeMain + 64 18 valkey-server 0x00000001048dcecc main + 4084 19 dyld 0x0000000186b34274 start + 2840 ```` --------- Signed-off-by: Uri Yagelnik <[email protected]> (cherry picked from commit bd5dcb2)
1 parent a50b634 commit fbe0586

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

src/pubsub.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,9 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty
307307
retval = 1;
308308
/* Remove the client from the channel -> clients list hash table */
309309
if (server.cluster_enabled && type.shard) {
310-
slot = getKeySlot(channel->ptr);
310+
/* Using keyHashSlot directly because we can't rely on the current_client's slot via getKeySlot() here,
311+
* as it might differ from the channel's slot. */
312+
slot = keyHashSlot(channel->ptr, (int)sdslen(channel->ptr));
311313
}
312314
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
313315
serverAssertWithInfo(c, NULL, de != NULL);

tests/unit/cluster/sharded-pubsub.tcl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,29 @@ start_cluster 1 1 {tags {external:skip cluster}} {
5353
catch {[$replica EXEC]} err
5454
assert_match {EXECABORT*} $err
5555
}
56+
57+
test "SSUBSCRIBE client killed during transaction" {
58+
# Create two clients
59+
set rd1 [valkey_deferring_client $primary_id]
60+
61+
# Get client 1 ID
62+
$rd1 client id
63+
set rd1_id [$rd1 read]
64+
# Client1 subscribes to a shard channel
65+
$rd1 ssubscribe channel0
66+
67+
# Wait for the subscription to be acknowledged
68+
assert_equal {ssubscribe channel0 1} [$rd1 read]
69+
70+
# Client2 starts a transaction
71+
assert_equal {OK} [$primary multi]
72+
73+
# sets a key so that its slot will be set to the slot of that key.
74+
assert_equal {QUEUED} [$primary set k v]
75+
# Kill client1 inside client2's transaction
76+
assert_equal {QUEUED} [$primary client kill id $rd1_id]
77+
78+
# Execute the transaction
79+
assert_equal {OK 1} [$primary exec] "Transaction execution should return OK and kill count"
80+
}
5681
}

0 commit comments

Comments
 (0)