|
75 | 75 |
|
76 | 76 | #define CLIENT_GET_EVENTLOOP(c) (c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el) |
77 | 77 |
|
| 78 | +#define PLACEHOLDER_COUNT 10 |
| 79 | +static const size_t PLACEHOLDER_LEN = 12; // length of BENCHMARK_PLACEHOLDERS strings |
| 80 | +static const char *PLACEHOLDERS[PLACEHOLDER_COUNT] = { |
| 81 | + "__rand_int__", "__rand_1st__", "__rand_2nd__", "__rand_3rd__", "__rand_4th__", |
| 82 | + "__rand_5th__", "__rand_6th__", "__rand_7th__", "__rand_8th__", "__rand_9th__"}; |
| 83 | + |
78 | 84 | struct benchmarkThread; |
79 | 85 | struct clusterNode; |
80 | 86 | struct serverConfig; |
@@ -103,7 +109,7 @@ static struct config { |
103 | 109 | long long previous_tick; |
104 | 110 | int keysize; |
105 | 111 | int datasize; |
106 | | - int replacekeys; |
| 112 | + int replace_placeholders; |
107 | 113 | int keyspacelen; |
108 | 114 | int sequential_replacement; |
109 | 115 | int keepalive; |
@@ -145,12 +151,18 @@ static struct config { |
145 | 151 | uint64_t time_per_burst; |
146 | 152 | } config; |
147 | 153 |
|
| 154 | +/* Locations of the placeholders __rand_int__, __rand_1st__, |
| 155 | + * __rand_2nd, etc. within the RESP encoded command buffer. */ |
| 156 | +static struct placeholders { |
| 157 | + size_t cmd_len; /* length of the command */ |
| 158 | + size_t count[PLACEHOLDER_COUNT]; /* number of each placeholder in the command */ |
| 159 | + size_t *indices[PLACEHOLDER_COUNT]; /* pointer to indices for each placeholder */ |
| 160 | + size_t *index_data; /* allocation holding all index data */ |
| 161 | +} placeholders; |
| 162 | + |
148 | 163 | typedef struct _client { |
149 | 164 | valkeyContext *context; |
150 | 165 | sds obuf; |
151 | | - char **randptr; /* Pointers to :rand: strings inside the command buf */ |
152 | | - size_t randlen; /* Number of pointers in client->randptr */ |
153 | | - size_t randfree; /* Number of unused pointers in client->randptr */ |
154 | 166 | char **stagptr; /* Pointers to slot hashtags (cluster mode only) */ |
155 | 167 | size_t staglen; /* Number of pointers in client->stagptr */ |
156 | 168 | size_t stagfree; /* Number of unused pointers in client->stagptr */ |
@@ -366,6 +378,108 @@ static void freeServerConfig(serverConfig *cfg) { |
366 | 378 | zfree(cfg); |
367 | 379 | } |
368 | 380 |
|
| 381 | +void resetPlaceholders(void) { |
| 382 | + if (placeholders.index_data) |
| 383 | + zfree(placeholders.index_data); /* indices are a single contiguous allocation */ |
| 384 | + memset(&placeholders, 0, sizeof(placeholders)); |
| 385 | +} |
| 386 | + |
| 387 | +void initPlaceholders(const char *cmd, size_t cmd_len) { |
| 388 | + resetPlaceholders(); |
| 389 | + placeholders.cmd_len = cmd_len; |
| 390 | + |
| 391 | + /* store placeholder locations in temp arrays */ |
| 392 | + size_t total_count = 0; |
| 393 | + size_t *temp_indices[PLACEHOLDER_COUNT]; |
| 394 | + for (size_t placeholder = 0; placeholder < PLACEHOLDER_COUNT; placeholder++) { |
| 395 | + size_t *count = &placeholders.count[placeholder]; |
| 396 | + *count = 0; |
| 397 | + |
| 398 | + size_t temp_size = RANDPTR_INITIAL_SIZE; |
| 399 | + temp_indices[placeholder] = zmalloc(sizeof(size_t) * temp_size); |
| 400 | + const char *p = cmd; |
| 401 | + const char *end = cmd + cmd_len; |
| 402 | + while ((p = strstr(p, PLACEHOLDERS[placeholder])) != NULL && p < end) { |
| 403 | + if (*count == temp_size) { |
| 404 | + temp_size *= 2; |
| 405 | + temp_indices[placeholder] = zrealloc(temp_indices[placeholder], sizeof(size_t) * temp_size); |
| 406 | + } |
| 407 | + size_t index = p - cmd; |
| 408 | + temp_indices[placeholder][*count] = index; |
| 409 | + (*count)++; |
| 410 | + total_count++; |
| 411 | + p += PLACEHOLDER_LEN; // Move past the placeholder |
| 412 | + } |
| 413 | + } |
| 414 | + |
| 415 | + /* consolidate temp data into contiguous allocation */ |
| 416 | + placeholders.index_data = zmalloc(sizeof(size_t) * total_count); |
| 417 | + size_t overall_index = 0; |
| 418 | + for (size_t placeholder = 0; placeholder < PLACEHOLDER_COUNT; placeholder++) { |
| 419 | + placeholders.indices[placeholder] = placeholders.index_data + overall_index; |
| 420 | + |
| 421 | + const size_t count = placeholders.count[placeholder]; |
| 422 | + memcpy(placeholders.indices[placeholder], temp_indices[placeholder], |
| 423 | + sizeof(size_t) * count); |
| 424 | + overall_index += count; |
| 425 | + |
| 426 | + zfree(temp_indices[placeholder]); |
| 427 | + } |
| 428 | + return; |
| 429 | +} |
| 430 | + |
| 431 | +static void replacePlaceholder(const size_t *indices, const size_t count, char *cmd, _Atomic uint64_t *key_counter) { |
| 432 | + if (count == 0) return; |
| 433 | + |
| 434 | + uint64_t key = 0; |
| 435 | + if (config.keyspacelen != 0) { |
| 436 | + if (config.sequential_replacement) { |
| 437 | + key = atomic_fetch_add_explicit(key_counter, 1, memory_order_relaxed); |
| 438 | + } else { |
| 439 | + key = random(); |
| 440 | + } |
| 441 | + key %= config.keyspacelen; |
| 442 | + } |
| 443 | + |
| 444 | + /* convert key to string at first location */ |
| 445 | + char *p = cmd + indices[0] + PLACEHOLDER_LEN - 1; |
| 446 | + for (size_t j = 0; j < PLACEHOLDER_LEN; j++) { |
| 447 | + *p = '0' + key % 10; |
| 448 | + key /= 10; |
| 449 | + p--; |
| 450 | + } |
| 451 | + |
| 452 | + /* copy the first instance to the other locations */ |
| 453 | + for (size_t i = 1; i < count; i++) { |
| 454 | + char *placeholder = cmd + indices[i]; |
| 455 | + memcpy(placeholder, cmd + indices[0], PLACEHOLDER_LEN); |
| 456 | + } |
| 457 | +} |
| 458 | + |
| 459 | +static void replacePlaceholders(char *cmd_data, int cmd_count) { |
| 460 | + static _Atomic uint64_t seq_key[PLACEHOLDER_COUNT] = {0}; |
| 461 | + |
| 462 | + for (int cmd_index = 0; cmd_index < cmd_count; cmd_index++) { |
| 463 | + char *cmd = cmd_data + cmd_index * placeholders.cmd_len; |
| 464 | + |
| 465 | + /* for __rand_int__, multiple instances will have different values */ |
| 466 | + size_t *indices = placeholders.indices[0]; |
| 467 | + _Atomic uint64_t *key_counter = &seq_key[0]; |
| 468 | + for (size_t i = 0; i < placeholders.count[0]; i++) { |
| 469 | + replacePlaceholder(indices + i, 1, cmd, key_counter); |
| 470 | + } |
| 471 | + |
| 472 | + /* For other placeholders, multiple occurrences within the command will |
| 473 | + * have the same value */ |
| 474 | + for (size_t placeholder = 1; placeholder < PLACEHOLDER_COUNT; placeholder++) { |
| 475 | + size_t *indices = placeholders.indices[placeholder]; |
| 476 | + size_t count = placeholders.count[placeholder]; |
| 477 | + _Atomic uint64_t *key_counter = &seq_key[placeholder]; |
| 478 | + replacePlaceholder(indices, count, cmd, key_counter); |
| 479 | + } |
| 480 | + } |
| 481 | +} |
| 482 | + |
369 | 483 | static void releasePausedClient(client c) { |
370 | 484 | if (c->thread_id >= 0) { |
371 | 485 | benchmarkThread *thread = config.threads[c->thread_id]; |
@@ -395,7 +509,6 @@ static void freeClient(client c) { |
395 | 509 | valkeyFree(c->context); |
396 | 510 | if (c->paused) releasePausedClient(c); |
397 | 511 | sdsfree(c->obuf); |
398 | | - zfree(c->randptr); |
399 | 512 | zfree(c->stagptr); |
400 | 513 | zfree(c); |
401 | 514 | if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex)); |
@@ -429,28 +542,6 @@ static void resetClient(client c) { |
429 | 542 | c->pending = config.pipeline * c->seqlen; |
430 | 543 | } |
431 | 544 |
|
432 | | -static void generateClientKey(client c) { |
433 | | - static _Atomic size_t seq_key = 0; |
434 | | - for (size_t i = 0; i < c->randlen; i++) { |
435 | | - char *p = c->randptr[i] + 11; |
436 | | - size_t key = 0; |
437 | | - if (config.keyspacelen != 0) { |
438 | | - if (config.sequential_replacement) { |
439 | | - key = atomic_fetch_add_explicit(&seq_key, 1, memory_order_relaxed); |
440 | | - } else { |
441 | | - key = random(); |
442 | | - } |
443 | | - key %= config.keyspacelen; |
444 | | - } |
445 | | - |
446 | | - for (size_t j = 0; j < 12; j++) { |
447 | | - *p = '0' + key % 10; |
448 | | - key /= 10; |
449 | | - p--; |
450 | | - } |
451 | | - } |
452 | | -} |
453 | | - |
454 | 545 | static void setClusterKeyHashTag(client c) { |
455 | 546 | assert(c->thread_id >= 0); |
456 | 547 | clusterNode *node = c->cluster_node; |
@@ -619,9 +710,6 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
619 | 710 | if (c->prefixlen > 0) { |
620 | 711 | size_t j; |
621 | 712 | sdsrange(c->obuf, c->prefixlen, -1); |
622 | | - /* We also need to fix the pointers to the strings |
623 | | - * we need to randomize. */ |
624 | | - for (j = 0; j < c->randlen; j++) c->randptr[j] -= c->prefixlen; |
625 | 713 | /* Fix the pointers to the slot hash tags */ |
626 | 714 | for (j = 0; j < c->staglen; j++) c->stagptr[j] -= c->prefixlen; |
627 | 715 | c->prefixlen = 0; |
@@ -752,7 +840,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
752 | 840 | } |
753 | 841 |
|
754 | 842 | /* Really initialize: replace keys and set start time. */ |
755 | | - if (config.replacekeys) generateClientKey(c); |
| 843 | + if (config.replace_placeholders) replacePlaceholders(c->obuf + c->prefixlen, config.pipeline); |
756 | 844 | if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c); |
757 | 845 | c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); |
758 | 846 | c->start = ustime(); |
@@ -806,7 +894,6 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
806 | 894 | * |
807 | 895 | * Even when cloning another client, prefix commands are applied if needed.*/ |
808 | 896 | static client createClient(char *cmd, int len, int seqlen, client from, int thread_id) { |
809 | | - int j; |
810 | 897 | int is_cluster_client = (config.cluster_mode && thread_id >= 0); |
811 | 898 | client c = zmalloc(sizeof(struct _client)); |
812 | 899 |
|
@@ -914,54 +1001,23 @@ static client createClient(char *cmd, int len, int seqlen, client from, int thre |
914 | 1001 | c->obuf = sdscatlen(c->obuf, from->obuf + from->prefixlen, sdslen(from->obuf) - from->prefixlen); |
915 | 1002 | seqlen = from->seqlen; |
916 | 1003 | } else { |
917 | | - for (j = 0; j < config.pipeline; j++) c->obuf = sdscatlen(c->obuf, cmd, len); |
| 1004 | + for (int j = 0; j < config.pipeline; j++) c->obuf = sdscatlen(c->obuf, cmd, len); |
918 | 1005 | } |
919 | 1006 |
|
920 | 1007 | c->written = 0; |
921 | 1008 | c->seqlen = seqlen; |
922 | 1009 | c->pending = config.pipeline * seqlen + c->prefix_pending; |
923 | | - c->randptr = NULL; |
924 | | - c->randlen = 0; |
925 | 1010 | c->stagptr = NULL; |
926 | 1011 | c->staglen = 0; |
927 | 1012 |
|
928 | | - /* Find substrings in the output buffer that need to be replaced. */ |
929 | | - if (config.replacekeys) { |
930 | | - if (from) { |
931 | | - c->randlen = from->randlen; |
932 | | - c->randfree = 0; |
933 | | - c->randptr = zmalloc(sizeof(char *) * c->randlen); |
934 | | - /* copy the offsets. */ |
935 | | - for (j = 0; j < (int)c->randlen; j++) { |
936 | | - c->randptr[j] = c->obuf + (from->randptr[j] - from->obuf); |
937 | | - /* Adjust for the different select prefix length. */ |
938 | | - c->randptr[j] += c->prefixlen - from->prefixlen; |
939 | | - } |
940 | | - } else { |
941 | | - char *p = c->obuf; |
942 | | - |
943 | | - c->randlen = 0; |
944 | | - c->randfree = RANDPTR_INITIAL_SIZE; |
945 | | - c->randptr = zmalloc(sizeof(char *) * c->randfree); |
946 | | - while ((p = strstr(p, "__rand_int__")) != NULL) { |
947 | | - if (c->randfree == 0) { |
948 | | - c->randptr = zrealloc(c->randptr, sizeof(char *) * c->randlen * 2); |
949 | | - c->randfree += c->randlen; |
950 | | - } |
951 | | - c->randptr[c->randlen++] = p; |
952 | | - c->randfree--; |
953 | | - p += 12; /* 12 is strlen("__rand_int__). */ |
954 | | - } |
955 | | - } |
956 | | - } |
957 | 1013 | /* If cluster mode is enabled, set slot hashtags pointers. */ |
958 | 1014 | if (config.cluster_mode) { |
959 | 1015 | if (from) { |
960 | 1016 | c->staglen = from->staglen; |
961 | 1017 | c->stagfree = 0; |
962 | 1018 | c->stagptr = zmalloc(sizeof(char *) * c->staglen); |
963 | 1019 | /* copy the offsets. */ |
964 | | - for (j = 0; j < (int)c->staglen; j++) { |
| 1020 | + for (size_t j = 0; j < c->staglen; j++) { |
965 | 1021 | c->stagptr[j] = c->obuf + (from->stagptr[j] - from->obuf); |
966 | 1022 | /* Adjust for the different select prefix length. */ |
967 | 1023 | c->stagptr[j] += c->prefixlen - from->prefixlen; |
@@ -1156,6 +1212,7 @@ static void benchmarkSequence(const char *title, char *cmd, int len, int seqlen) |
1156 | 1212 | config.precision, // Number of significant figures |
1157 | 1213 | &config.current_sec_latency_histogram); // Pointer to initialise |
1158 | 1214 |
|
| 1215 | + initPlaceholders(cmd, len); |
1159 | 1216 | if (config.num_threads) initBenchmarkThreads(); |
1160 | 1217 |
|
1161 | 1218 | if (config.rps > 0) { |
@@ -1567,7 +1624,7 @@ int parseOptions(int argc, char **argv) { |
1567 | 1624 | p++; |
1568 | 1625 | if (*p < '0' || *p > '9') goto invalid; |
1569 | 1626 | } |
1570 | | - config.replacekeys = 1; |
| 1627 | + config.replace_placeholders = 1; |
1571 | 1628 | config.keyspacelen = atoi(next); |
1572 | 1629 | if (config.keyspacelen < 0) config.keyspacelen = 0; |
1573 | 1630 | } else if (!strcmp(argv[i], "--sequential")) { |
@@ -1738,7 +1795,10 @@ int parseOptions(int argc, char **argv) { |
1738 | 1795 | "a number N to repeat the command N times. In command arguments, the following\n" |
1739 | 1796 | "placeholders are substituted:\n\n" |
1740 | 1797 | " __rand_int__ Replaced with a zero-padded random integer in the range\n" |
1741 | | - " selected using the -r option.\n" |
| 1798 | + " selected using the -r option. Multiple occurrences within the\n" |
| 1799 | + " command will have different values.\n" |
| 1800 | + "__rand_1st__ Like __rand_int__ but multiple occurrences will have the same\n" |
| 1801 | + " value. __rand_2nd__ through __rand_9th__ are also available.\n" |
1742 | 1802 | " __data__ Replaced with data of the size specified by the -d option.\n" |
1743 | 1803 | " {tag} Replaced with a tag that routes the command to each node in\n" |
1744 | 1804 | " a cluster. Include this in key names when running in cluster\n" |
@@ -1790,6 +1850,10 @@ int parseOptions(int argc, char **argv) { |
1790 | 1850 | " use the same key.\n" |
1791 | 1851 | " --sequential Modifies the -r argument to replace the string __rand_int__\n" |
1792 | 1852 | " with 12 digit numbers sequentially instead of randomly.\n" |
| 1853 | + " __rand_1st__ through __rand_9th__ are available with independent\n" |
| 1854 | + " counters. Used to create expected number of elements with multiple\n" |
| 1855 | + " replacements.\n" |
| 1856 | + " example: ZADD myzset __rand_int__ element:__rand_1st__\n" |
1793 | 1857 | " -P <numreq> Pipeline <numreq> requests. That is, send multiple requests\n" |
1794 | 1858 | " before waiting for the replies. Default 1 (no pipeline).\n" |
1795 | 1859 | " When multiple commands are specified on the command line,\n" |
@@ -1954,7 +2018,7 @@ int main(int argc, char **argv) { |
1954 | 2018 | config.keepalive = 1; |
1955 | 2019 | config.datasize = 3; |
1956 | 2020 | config.pipeline = 1; |
1957 | | - config.replacekeys = 0; |
| 2021 | + config.replace_placeholders = 0; |
1958 | 2022 | config.keyspacelen = 0; |
1959 | 2023 | config.sequential_replacement = 0; |
1960 | 2024 | config.quiet = 0; |
@@ -1985,6 +2049,7 @@ int main(int argc, char **argv) { |
1985 | 2049 | config.num_functions = 10; |
1986 | 2050 | config.num_keys_in_fcall = 1; |
1987 | 2051 | config.resp3 = 0; |
| 2052 | + resetPlaceholders(); |
1988 | 2053 |
|
1989 | 2054 | i = parseOptions(argc, argv); |
1990 | 2055 | argc -= i; |
@@ -2253,8 +2318,8 @@ int main(int argc, char **argv) { |
2253 | 2318 |
|
2254 | 2319 | if (test_is_selected("zadd")) { |
2255 | 2320 | char *score = "0"; |
2256 | | - if (config.replacekeys) score = "__rand_int__"; |
2257 | | - len = valkeyFormatCommand(&cmd, "ZADD myzset%s %s element:__rand_int__", tag, score); |
| 2321 | + if (config.replace_placeholders) score = "__rand_int__"; |
| 2322 | + len = valkeyFormatCommand(&cmd, "ZADD myzset%s %s element:__rand_1st__", tag, score); |
2258 | 2323 | benchmark("ZADD", cmd, len); |
2259 | 2324 | free(cmd); |
2260 | 2325 | } |
@@ -2380,6 +2445,7 @@ int main(int argc, char **argv) { |
2380 | 2445 | zfree(data); |
2381 | 2446 | freeCliConnInfo(config.conn_info); |
2382 | 2447 | if (config.server_config != NULL) freeServerConfig(config.server_config); |
| 2448 | + resetPlaceholders(); |
2383 | 2449 |
|
2384 | 2450 | return 0; |
2385 | 2451 | } |
0 commit comments