Skip to content

Commit 28c5a17

Browse files
authored
replica redirect read&write to primary in standalone mode (#325)
To implement #319 1. replica is able to redirect read and write commands to it's primary in standalone mode * reply with "-REDIRECT primary-ip:port" 2. add a subcommand `CLIENT CAPA redirect`, a client can announce the capability to handle redirection * if a client can handle redirection, the data access commands (read and write) will be redirected 3. allow `readonly` and `readwrite` command in standalone mode, may be a breaking change * a client with redirect capability cannot process read commands on a replica by default * use READONLY command can allow read commands on a replica --------- Signed-off-by: zhaozhao.zz <[email protected]>
1 parent ab38730 commit 28c5a17

File tree

7 files changed

+106
-8
lines changed

7 files changed

+106
-8
lines changed

src/cluster.c

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1449,20 +1449,12 @@ void askingCommand(client *c) {
14491449
* In this mode replica will not redirect clients as long as clients access
14501450
* with read-only commands to keys that are served by the replica's primary. */
14511451
void readonlyCommand(client *c) {
1452-
if (server.cluster_enabled == 0) {
1453-
addReplyError(c, "This instance has cluster support disabled");
1454-
return;
1455-
}
14561452
c->flags |= CLIENT_READONLY;
14571453
addReply(c, shared.ok);
14581454
}
14591455

14601456
/* The READWRITE command just clears the READONLY command state. */
14611457
void readwriteCommand(client *c) {
1462-
if (server.cluster_enabled == 0) {
1463-
addReplyError(c, "This instance has cluster support disabled");
1464-
return;
1465-
}
14661458
c->flags &= ~CLIENT_READONLY;
14671459
addReply(c, shared.ok);
14681460
}

src/commands.def

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,6 +1089,28 @@ struct COMMAND_ARG CLIENT_CACHING_Args[] = {
10891089
{MAKE_ARG("mode",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLIENT_CACHING_mode_Subargs},
10901090
};
10911091

1092+
/********** CLIENT CAPA ********************/
1093+
1094+
#ifndef SKIP_CMD_HISTORY_TABLE
1095+
/* CLIENT CAPA history */
1096+
#define CLIENT_CAPA_History NULL
1097+
#endif
1098+
1099+
#ifndef SKIP_CMD_TIPS_TABLE
1100+
/* CLIENT CAPA tips */
1101+
#define CLIENT_CAPA_Tips NULL
1102+
#endif
1103+
1104+
#ifndef SKIP_CMD_KEY_SPECS_TABLE
1105+
/* CLIENT CAPA key specs */
1106+
#define CLIENT_CAPA_Keyspecs NULL
1107+
#endif
1108+
1109+
/* CLIENT CAPA argument table */
1110+
struct COMMAND_ARG CLIENT_CAPA_Args[] = {
1111+
{MAKE_ARG("capability",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_MULTIPLE,0,NULL)},
1112+
};
1113+
10921114
/********** CLIENT GETNAME ********************/
10931115

10941116
#ifndef SKIP_CMD_HISTORY_TABLE
@@ -1552,6 +1574,7 @@ struct COMMAND_ARG CLIENT_UNBLOCK_Args[] = {
15521574
/* CLIENT command table */
15531575
struct COMMAND_STRUCT CLIENT_Subcommands[] = {
15541576
{MAKE_CMD("caching","Instructs the server whether to track the keys in the next request.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_CACHING_History,0,CLIENT_CACHING_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_CACHING_Keyspecs,0,NULL,1),.args=CLIENT_CACHING_Args},
1577+
{MAKE_CMD("capa","A client claims its capability.","O(1)","8.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_CAPA_History,0,CLIENT_CAPA_Tips,0,clientCommand,-3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_CAPA_Keyspecs,0,NULL,1),.args=CLIENT_CAPA_Args},
15551578
{MAKE_CMD("getname","Returns the name of the connection.","O(1)","2.6.9",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETNAME_History,0,CLIENT_GETNAME_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETNAME_Keyspecs,0,NULL,0)},
15561579
{MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)},
15571580
{MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)},

src/commands/client-capa.json

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"CAPA": {
3+
"summary": "A client claims its capability.",
4+
"complexity": "O(1)",
5+
"group": "connection",
6+
"since": "8.0.0",
7+
"arity": -3,
8+
"container": "CLIENT",
9+
"function": "clientCommand",
10+
"command_flags": [
11+
"NOSCRIPT",
12+
"LOADING",
13+
"STALE"
14+
],
15+
"acl_categories": [
16+
"CONNECTION"
17+
],
18+
"reply_schema": {
19+
"const": "OK"
20+
},
21+
"arguments": [
22+
{
23+
"multiple": "true",
24+
"name": "capability",
25+
"type": "string"
26+
}
27+
]
28+
}
29+
}

src/networking.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ client *createClient(connection *conn) {
168168
c->bulklen = -1;
169169
c->sentlen = 0;
170170
c->flags = 0;
171+
c->capa = 0;
171172
c->slot = -1;
172173
c->ctime = c->last_interaction = server.unixtime;
173174
c->duration = 0;
@@ -3589,6 +3590,13 @@ NULL
35893590
} else {
35903591
addReplyErrorObject(c, shared.syntaxerr);
35913592
}
3593+
} else if (!strcasecmp(c->argv[1]->ptr, "capa") && c->argc >= 3) {
3594+
for (int i = 2; i < c->argc; i++) {
3595+
if (!strcasecmp(c->argv[i]->ptr, "redirect")) {
3596+
c->capa |= CLIENT_CAPA_REDIRECT;
3597+
}
3598+
}
3599+
addReply(c, shared.ok);
35923600
} else {
35933601
addReplySubcommandSyntaxError(c);
35943602
}

src/server.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3867,6 +3867,12 @@ int processCommand(client *c) {
38673867
}
38683868
}
38693869

3870+
if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !mustObeyClient(c) &&
3871+
(is_write_command || (is_read_command && !(c->flags & CLIENT_READONLY)))) {
3872+
addReplyErrorSds(c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
3873+
return C_OK;
3874+
}
3875+
38703876
/* Disconnect some clients if total clients memory is too high. We do this
38713877
* before key eviction, after the last command was executed and consumed
38723878
* some client output buffer memory. */

src/server.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
429429
#define CLIENT_REPLICATION_DONE (1ULL << 51) /* Indicate that replication has been done on the client */
430430
#define CLIENT_AUTHENTICATED (1ULL << 52) /* Indicate a client has successfully authenticated */
431431

432+
/* Client capabilities */
433+
#define CLIENT_CAPA_REDIRECT (1 << 0) /* Indicate that the client can handle redirection */
434+
432435
/* Client block type (btype field in client structure)
433436
* if CLIENT_BLOCKED flag is set. */
434437
typedef enum blocking_type {
@@ -1205,6 +1208,7 @@ typedef struct client {
12051208
uint64_t flags; /* Client flags: CLIENT_* macros. */
12061209
connection *conn;
12071210
int resp; /* RESP protocol version. Can be 2 or 3. */
1211+
uint32_t capa; /* Client capabilities: CLIENT_CAPA* macros. */
12081212
serverDb *db; /* Pointer to currently SELECTed DB. */
12091213
robj *name; /* As set by CLIENT SETNAME. */
12101214
robj *lib_name; /* The client library name as set by CLIENT SETINFO. */
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
start_server {tags {needs:repl external:skip}} {
2+
start_server {} {
3+
set primary_host [srv -1 host]
4+
set primary_port [srv -1 port]
5+
6+
r replicaof $primary_host $primary_port
7+
wait_for_condition 50 100 {
8+
[s 0 master_link_status] eq {up}
9+
} else {
10+
fail "Replicas not replicating from primary"
11+
}
12+
13+
test {replica allow read command by default} {
14+
r get foo
15+
} {}
16+
17+
test {replica reply READONLY error for write command by default} {
18+
assert_error {READONLY*} {r set foo bar}
19+
}
20+
21+
test {replica redirect read and write command after CLIENT CAPA REDIRECT} {
22+
r client capa redirect
23+
assert_error "REDIRECT $primary_host:$primary_port" {r set foo bar}
24+
assert_error "REDIRECT $primary_host:$primary_port" {r get foo}
25+
}
26+
27+
test {non-data access commands are not redirected} {
28+
r ping
29+
} {PONG}
30+
31+
test {replica allow read command in READONLY mode} {
32+
r readonly
33+
r get foo
34+
} {}
35+
}
36+
}

0 commit comments

Comments
 (0)