Skip to content

Commit ebafc5d

Browse files
hpatroPatrickJS
authored andcommitted
Sharded pubsub command execution within multi/exec (valkey-io#13)
Allow SPUBLISH command within multi/exec on replica.
1 parent f48c695 commit ebafc5d

File tree

2 files changed

+64
-7
lines changed

2 files changed

+64
-7
lines changed

src/cluster.c

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,9 +1030,11 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
10301030
mc.cmd = cmd;
10311031
}
10321032

1033-
int is_pubsubshard = cmd->proc == ssubscribeCommand ||
1034-
cmd->proc == sunsubscribeCommand ||
1035-
cmd->proc == spublishCommand;
1033+
uint64_t cmd_flags = getCommandFlags(c);
1034+
1035+
/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
1036+
int pubsubshard_included = (cmd_flags & CMD_PUBSUB) ||
1037+
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_PUBSUB));
10361038

10371039
/* Check that all the keys are in the same hash slot, and obtain this
10381040
* slot and the node associated. */
@@ -1109,7 +1111,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
11091111
* node until the migration completes with CLUSTER SETSLOT <slot>
11101112
* NODE <node-id>. */
11111113
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
1112-
if ((migrating_slot || importing_slot) && !is_pubsubshard)
1114+
if ((migrating_slot || importing_slot) && !pubsubshard_included)
11131115
{
11141116
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
11151117
else existing_keys++;
@@ -1122,11 +1124,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
11221124
* without redirections or errors in all the cases. */
11231125
if (n == NULL) return myself;
11241126

1125-
uint64_t cmd_flags = getCommandFlags(c);
11261127
/* Cluster is globally down but we got keys? We only serve the request
11271128
* if it is a read command and when allow_reads_when_down is enabled. */
11281129
if (!isClusterHealthy()) {
1129-
if (is_pubsubshard) {
1130+
if (pubsubshard_included) {
11301131
if (!server.cluster_allow_pubsubshard_when_down) {
11311132
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
11321133
return NULL;
@@ -1189,7 +1190,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
11891190
* is serving, we can reply without redirection. */
11901191
int is_write_command = (cmd_flags & CMD_WRITE) ||
11911192
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
1192-
if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
1193+
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) &&
11931194
!is_write_command &&
11941195
clusterNodeIsSlave(myself) &&
11951196
clusterNodeGetSlaveof(myself) == n)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
start_cluster 1 1 {tags {external:skip cluster}} {
2+
set primary_id 0
3+
set replica1_id 1
4+
5+
set primary [Rn $primary_id]
6+
set replica [Rn $replica1_id]
7+
8+
test "Sharded pubsub publish behavior within multi/exec" {
9+
foreach {node} {primary replica} {
10+
set node [set $node]
11+
$node MULTI
12+
$node SPUBLISH ch1 "hello"
13+
$node EXEC
14+
}
15+
}
16+
17+
test "Sharded pubsub within multi/exec with cross slot operation" {
18+
$primary MULTI
19+
$primary SPUBLISH ch1 "hello"
20+
$primary GET foo
21+
catch {[$primary EXEC]} err
22+
assert_match {CROSSSLOT*} $err
23+
}
24+
25+
test "Sharded pubsub publish behavior within multi/exec with read operation on primary" {
26+
$primary MULTI
27+
$primary SPUBLISH foo "hello"
28+
$primary GET foo
29+
$primary EXEC
30+
} {0 {}}
31+
32+
test "Sharded pubsub publish behavior within multi/exec with read operation on replica" {
33+
$replica MULTI
34+
$replica SPUBLISH foo "hello"
35+
catch {[$replica GET foo]} err
36+
assert_match {MOVED*} $err
37+
catch {[$replica EXEC]} err
38+
assert_match {EXECABORT*} $err
39+
}
40+
41+
test "Sharded pubsub publish behavior within multi/exec with write operation on primary" {
42+
$primary MULTI
43+
$primary SPUBLISH foo "hello"
44+
$primary SET foo bar
45+
$primary EXEC
46+
} {0 OK}
47+
48+
test "Sharded pubsub publish behavior within multi/exec with write operation on replica" {
49+
$replica MULTI
50+
$replica SPUBLISH foo "hello"
51+
catch {[$replica SET foo bar]} err
52+
assert_match {MOVED*} $err
53+
catch {[$replica EXEC]} err
54+
assert_match {EXECABORT*} $err
55+
}
56+
}

0 commit comments

Comments
 (0)