Skip to content

Commit 9de2f81

Browse files
committed
Add --rfa and --rfro option
- When users enable rfa(read_from_all) option, they can read from replica and primary. - When users enable rfro(read_from_replicas only) option, they can read from replicas only. - If users don't use any option related to replicas, they can read from primaries only.
1 parent d1a13ed commit 9de2f81

File tree

1 file changed

+44
-15
lines changed

1 file changed

+44
-15
lines changed

src/valkey-benchmark.c

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ static struct config {
112112
int num_threads;
113113
struct benchmarkThread **threads;
114114
int cluster_mode;
115-
int read_from_replicas;
115+
int read_from_all;
116+
int read_from_replicas_only;
116117
int cluster_node_count;
117118
struct clusterNode **cluster_nodes;
118119
struct serverConfig *redis_config;
@@ -714,7 +715,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
714715
c->prefix_pending++;
715716
}
716717

717-
if (config.cluster_mode && config.read_from_replicas) {
718+
if (config.cluster_mode && config.read_from_replicas_only) {
718719
char *buf = NULL;
719720
int len;
720721
len = redisFormatCommand(&buf, "READONLY");
@@ -848,7 +849,14 @@ static void showLatencyReport(void) {
848849
printf(" %d bytes payload\n", config.datasize);
849850
printf(" keep alive: %d\n", config.keepalive);
850851
if (config.cluster_mode) {
851-
char *node_prefix = config.read_from_replicas ? "replicas" : "primaries";
852+
const char *node_prefix = NULL;
853+
if (config.read_from_all) {
854+
node_prefix = "all";
855+
} else if (config.read_from_replicas_only) {
856+
node_prefix = "replica";
857+
} else {
858+
node_prefix = "primary";
859+
}
852860
printf(" cluster mode: yes (%d %s)\n", config.cluster_node_count, node_prefix);
853861
int m;
854862
for (m = 0; m < config.cluster_node_count; m++) {
@@ -1067,6 +1075,7 @@ static int fetchClusterConfiguration(void) {
10671075
if (ctx == NULL) {
10681076
exit(1);
10691077
}
1078+
assert(!(config.read_from_all && config.read_from_replicas_only) && "--rfa and --rfro cannot be enabled simultaneously");
10701079

10711080
reply = redisCommand(ctx, "CLUSTER SLOTS");
10721081
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
@@ -1084,7 +1093,8 @@ static int fetchClusterConfiguration(void) {
10841093
int to = r->element[1]->integer;
10851094
for (j = 2; j < r->elements; j++) {
10861095
int is_primary = (j == 2);
1087-
if (config.read_from_replicas == is_primary) continue;
1096+
int is_cluster_option_only = (!config.read_from_all && !config.read_from_replicas_only);
1097+
if ((config.read_from_replicas_only && is_primary) || (is_cluster_option_only && !is_primary)) continue;
10881098

10891099
redisReply *nr = r->element[j];
10901100
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3);
@@ -1093,6 +1103,8 @@ static int fetchClusterConfiguration(void) {
10931103
sds ip = sdsnew(nr->element[0]->str);
10941104
sds name = sdsnew(nr->element[2]->str);
10951105
int port = nr->element[1]->integer;
1106+
int slot_start = from;
1107+
int slot_end = to;
10961108

10971109
clusterNode *node = NULL;
10981110
dictEntry *entry = dictFind(nodes, name);
@@ -1106,11 +1118,11 @@ static int fetchClusterConfiguration(void) {
11061118
} else {
11071119
node = dictGetVal(entry);
11081120
}
1109-
if (from == to) {
1110-
node->slots[node->slots_count++] = from;
1121+
if (slot_start == slot_end) {
1122+
node->slots[node->slots_count++] = slot_start;
11111123
} else {
1112-
while (from <= to) {
1113-
int slot = from++;
1124+
while (slot_start <= slot_end) {
1125+
int slot = slot_start++;
11141126
node->slots[node->slots_count++] = slot;
11151127
}
11161128
}
@@ -1154,7 +1166,7 @@ static int fetchClusterSlotsConfiguration(client c) {
11541166
if (is_fetching_slots) return -1; // TODO: use other codes || errno ?
11551167
atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed);
11561168
fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n");
1157-
fprintf(stderr, "If you are using the --rfr option and sending write requests (set type commands),\nthe requests could not be processed properly.\n");
1169+
fprintf(stderr, "If you are using the --rfa and --rfro option and sending write requests (set type commands),\nthe requests could not be processed properly.\n");
11581170

11591171
const char *errmsg = "Failed to update cluster slots configuration";
11601172

@@ -1194,7 +1206,10 @@ static int fetchClusterSlotsConfiguration(client c) {
11941206
from = r->element[0]->integer;
11951207
to = r->element[1]->integer;
11961208
size_t start, end;
1197-
if (config.read_from_replicas) {
1209+
if (config.read_from_all) {
1210+
start = 2;
1211+
end = r->elements;
1212+
} else if (config.read_from_replicas_only) {
11981213
start = 3;
11991214
end = r->elements;
12001215
} else {
@@ -1385,8 +1400,10 @@ int parseOptions(int argc, char **argv) {
13851400
config.num_threads = 0;
13861401
} else if (!strcmp(argv[i], "--cluster")) {
13871402
config.cluster_mode = 1;
1388-
} else if (!strcmp(argv[i], "--rfr")) {
1389-
config.read_from_replicas = 1;
1403+
} else if (!strcmp(argv[i], "--rfa")) {
1404+
config.read_from_all = 1;
1405+
} else if (!strcmp(argv[i], "--rfro")) {
1406+
config.read_from_replicas_only = 1;
13901407
} else if (!strcmp(argv[i], "--enable-tracking")) {
13911408
config.enable_tracking = 1;
13921409
} else if (!strcmp(argv[i], "--help")) {
@@ -1484,7 +1501,11 @@ int parseOptions(int argc, char **argv) {
14841501
" If the command is supplied on the command line in cluster\n"
14851502
" mode, the key must contain \"{tag}\". Otherwise, the\n"
14861503
" command will not be sent to the right cluster node.\n"
1487-
" --rfr Enable read from replicas in cluster mode.\n"
1504+
" --rfa Enable read from all nodes(primary and replica) in cluster mode.\n"
1505+
" This command must be used with the --cluster option.\n"
1506+
" When using this option, it is recommended to use only \n"
1507+
" the commands for read requests.\n"
1508+
" --rfro Enable read from replicas only in cluster mode.\n"
14881509
" This command must be used with the --cluster option.\n"
14891510
" When using this option, it is recommended to use only \n"
14901511
" the commands for read requests.\n"
@@ -1629,7 +1650,8 @@ int main(int argc, char **argv) {
16291650
config.num_threads = 0;
16301651
config.threads = NULL;
16311652
config.cluster_mode = 0;
1632-
config.read_from_replicas = 0;
1653+
config.read_from_all = 0;
1654+
config.read_from_replicas_only = 0;
16331655
config.cluster_node_count = 0;
16341656
config.cluster_nodes = NULL;
16351657
config.redis_config = NULL;
@@ -1674,7 +1696,14 @@ int main(int argc, char **argv) {
16741696
fprintf(stderr, "Invalid cluster: %d node(s).\n", config.cluster_node_count);
16751697
exit(1);
16761698
}
1677-
char *node_prefix = config.read_from_replicas ? "replica" : "primary";
1699+
const char *node_prefix = NULL;
1700+
if (config.read_from_all) {
1701+
node_prefix = "all";
1702+
} else if (config.read_from_replicas_only) {
1703+
node_prefix = "replica";
1704+
} else {
1705+
node_prefix = "primary";
1706+
}
16781707
printf("Cluster has %d %s nodes:\n\n", config.cluster_node_count, node_prefix);
16791708
int i = 0;
16801709
for (; i < config.cluster_node_count; i++) {

0 commit comments

Comments
 (0)