Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .config/typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ exat = "exat"
optin = "optin"
smove = "smove"
Parth = "Parth" # seems like the spellchecker does not like it is similar to "Path"
nd = "nd"

[type.c]
extend-ignore-re = [
Expand All @@ -33,7 +34,6 @@ extend-ignore-re = [
advices = "advices"
clen = "clen"
fle = "fle"
nd = "nd"
ot = "ot"

[type.tcl.extend-identifiers]
Expand Down
206 changes: 136 additions & 70 deletions src/valkey-benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@

#define CLIENT_GET_EVENTLOOP(c) (c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el)

#define PLACEHOLDER_COUNT 10
static const size_t PLACEHOLDER_LEN = 12; // length of BENCHMARK_PLACEHOLDERS strings
static const char *PLACEHOLDERS[PLACEHOLDER_COUNT] = {
"__rand_int__", "__rand_1st__", "__rand_2nd__", "__rand_3rd__", "__rand_4th__",
"__rand_5th__", "__rand_6th__", "__rand_7th__", "__rand_8th__", "__rand_9th__"};

struct benchmarkThread;
struct clusterNode;
struct serverConfig;
Expand Down Expand Up @@ -103,7 +109,7 @@ static struct config {
long long previous_tick;
int keysize;
int datasize;
int replacekeys;
int replace_placeholders;
int keyspacelen;
int sequential_replacement;
int keepalive;
Expand Down Expand Up @@ -145,12 +151,18 @@ static struct config {
uint64_t time_per_burst;
} config;

/* Locations of the placeholders __rand_int__, __rand_1st__,
* __rand_2nd, etc. within the RESP encoded command buffer. */
static struct placeholders {
size_t cmd_len; /* length of the command */
size_t count[PLACEHOLDER_COUNT]; /* number of each placeholder in the command */
size_t *indices[PLACEHOLDER_COUNT]; /* pointer to indices for each placeholder */
size_t *index_data; /* allocation holding all index data */
} placeholders;

typedef struct _client {
valkeyContext *context;
sds obuf;
char **randptr; /* Pointers to :rand: strings inside the command buf */
size_t randlen; /* Number of pointers in client->randptr */
size_t randfree; /* Number of unused pointers in client->randptr */
char **stagptr; /* Pointers to slot hashtags (cluster mode only) */
size_t staglen; /* Number of pointers in client->stagptr */
size_t stagfree; /* Number of unused pointers in client->stagptr */
Expand Down Expand Up @@ -366,6 +378,108 @@ static void freeServerConfig(serverConfig *cfg) {
zfree(cfg);
}

void resetPlaceholders(void) {
if (placeholders.index_data)
zfree(placeholders.index_data); /* indices are a single contiguous allocation */
memset(&placeholders, 0, sizeof(placeholders));
}

void initPlaceholders(const char *cmd, size_t cmd_len) {
resetPlaceholders();
placeholders.cmd_len = cmd_len;

/* store placeholder locations in temp arrays */
size_t total_count = 0;
size_t *temp_indices[PLACEHOLDER_COUNT];
for (size_t placeholder = 0; placeholder < PLACEHOLDER_COUNT; placeholder++) {
size_t *count = &placeholders.count[placeholder];
*count = 0;

size_t temp_size = RANDPTR_INITIAL_SIZE;
temp_indices[placeholder] = zmalloc(sizeof(size_t) * temp_size);
const char *p = cmd;
const char *end = cmd + cmd_len;
while ((p = strstr(p, PLACEHOLDERS[placeholder])) != NULL && p < end) {
if (*count == temp_size) {
temp_size *= 2;
temp_indices[placeholder] = zrealloc(temp_indices[placeholder], sizeof(size_t) * temp_size);
}
size_t index = p - cmd;
temp_indices[placeholder][*count] = index;
(*count)++;
total_count++;
p += PLACEHOLDER_LEN; // Move past the placeholder
}
}

/* consolidate temp data into contiguous allocation */
placeholders.index_data = zmalloc(sizeof(size_t) * total_count);
size_t overall_index = 0;
for (size_t placeholder = 0; placeholder < PLACEHOLDER_COUNT; placeholder++) {
placeholders.indices[placeholder] = placeholders.index_data + overall_index;

const size_t count = placeholders.count[placeholder];
memcpy(placeholders.indices[placeholder], temp_indices[placeholder],
sizeof(size_t) * count);
overall_index += count;

zfree(temp_indices[placeholder]);
}
return;
}

static void replacePlaceholder(const size_t *indices, const size_t count, char *cmd, _Atomic uint64_t *key_counter) {
if (count == 0) return;

uint64_t key = 0;
if (config.keyspacelen != 0) {
if (config.sequential_replacement) {
key = atomic_fetch_add_explicit(key_counter, 1, memory_order_relaxed);
} else {
key = random();
}
key %= config.keyspacelen;
}

/* convert key to string at first location */
char *p = cmd + indices[0] + PLACEHOLDER_LEN - 1;
for (size_t j = 0; j < PLACEHOLDER_LEN; j++) {
*p = '0' + key % 10;
key /= 10;
p--;
}

/* copy the first instance to the other locations */
for (size_t i = 1; i < count; i++) {
char *placeholder = cmd + indices[i];
memcpy(placeholder, cmd + indices[0], PLACEHOLDER_LEN);
}
}

static void replacePlaceholders(char *cmd_data, int cmd_count) {
static _Atomic uint64_t seq_key[PLACEHOLDER_COUNT] = {0};

for (int cmd_index = 0; cmd_index < cmd_count; cmd_index++) {
char *cmd = cmd_data + cmd_index * placeholders.cmd_len;

/* for __rand_int__, multiple instances will have different values */
size_t *indices = placeholders.indices[0];
_Atomic uint64_t *key_counter = &seq_key[0];
for (size_t i = 0; i < placeholders.count[0]; i++) {
replacePlaceholder(indices + i, 1, cmd, key_counter);
}

/* For other placeholders, multiple occurrences within the command will
* have the same value */
for (size_t placeholder = 1; placeholder < PLACEHOLDER_COUNT; placeholder++) {
size_t *indices = placeholders.indices[placeholder];
size_t count = placeholders.count[placeholder];
_Atomic uint64_t *key_counter = &seq_key[placeholder];
replacePlaceholder(indices, count, cmd, key_counter);
}
}
}

static void releasePausedClient(client c) {
if (c->thread_id >= 0) {
benchmarkThread *thread = config.threads[c->thread_id];
Expand Down Expand Up @@ -395,7 +509,6 @@ static void freeClient(client c) {
valkeyFree(c->context);
if (c->paused) releasePausedClient(c);
sdsfree(c->obuf);
zfree(c->randptr);
zfree(c->stagptr);
zfree(c);
if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
Expand Down Expand Up @@ -429,28 +542,6 @@ static void resetClient(client c) {
c->pending = config.pipeline * c->seqlen;
}

static void generateClientKey(client c) {
static _Atomic size_t seq_key = 0;
for (size_t i = 0; i < c->randlen; i++) {
char *p = c->randptr[i] + 11;
size_t key = 0;
if (config.keyspacelen != 0) {
if (config.sequential_replacement) {
key = atomic_fetch_add_explicit(&seq_key, 1, memory_order_relaxed);
} else {
key = random();
}
key %= config.keyspacelen;
}

for (size_t j = 0; j < 12; j++) {
*p = '0' + key % 10;
key /= 10;
p--;
}
}
}

static void setClusterKeyHashTag(client c) {
assert(c->thread_id >= 0);
clusterNode *node = c->cluster_node;
Expand Down Expand Up @@ -619,9 +710,6 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
if (c->prefixlen > 0) {
size_t j;
sdsrange(c->obuf, c->prefixlen, -1);
/* We also need to fix the pointers to the strings
* we need to randomize. */
for (j = 0; j < c->randlen; j++) c->randptr[j] -= c->prefixlen;
/* Fix the pointers to the slot hash tags */
for (j = 0; j < c->staglen; j++) c->stagptr[j] -= c->prefixlen;
c->prefixlen = 0;
Expand Down Expand Up @@ -752,7 +840,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
}

/* Really initialize: replace keys and set start time. */
if (config.replacekeys) generateClientKey(c);
if (config.replace_placeholders) replacePlaceholders(c->obuf + c->prefixlen, config.pipeline);
if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c);
c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed);
c->start = ustime();
Expand Down Expand Up @@ -806,7 +894,6 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
*
* Even when cloning another client, prefix commands are applied if needed.*/
static client createClient(char *cmd, int len, int seqlen, client from, int thread_id) {
int j;
int is_cluster_client = (config.cluster_mode && thread_id >= 0);
client c = zmalloc(sizeof(struct _client));

Expand Down Expand Up @@ -914,54 +1001,23 @@ static client createClient(char *cmd, int len, int seqlen, client from, int thre
c->obuf = sdscatlen(c->obuf, from->obuf + from->prefixlen, sdslen(from->obuf) - from->prefixlen);
seqlen = from->seqlen;
} else {
for (j = 0; j < config.pipeline; j++) c->obuf = sdscatlen(c->obuf, cmd, len);
for (int j = 0; j < config.pipeline; j++) c->obuf = sdscatlen(c->obuf, cmd, len);
}

c->written = 0;
c->seqlen = seqlen;
c->pending = config.pipeline * seqlen + c->prefix_pending;
c->randptr = NULL;
c->randlen = 0;
c->stagptr = NULL;
c->staglen = 0;

/* Find substrings in the output buffer that need to be replaced. */
if (config.replacekeys) {
if (from) {
c->randlen = from->randlen;
c->randfree = 0;
c->randptr = zmalloc(sizeof(char *) * c->randlen);
/* copy the offsets. */
for (j = 0; j < (int)c->randlen; j++) {
c->randptr[j] = c->obuf + (from->randptr[j] - from->obuf);
/* Adjust for the different select prefix length. */
c->randptr[j] += c->prefixlen - from->prefixlen;
}
} else {
char *p = c->obuf;

c->randlen = 0;
c->randfree = RANDPTR_INITIAL_SIZE;
c->randptr = zmalloc(sizeof(char *) * c->randfree);
while ((p = strstr(p, "__rand_int__")) != NULL) {
if (c->randfree == 0) {
c->randptr = zrealloc(c->randptr, sizeof(char *) * c->randlen * 2);
c->randfree += c->randlen;
}
c->randptr[c->randlen++] = p;
c->randfree--;
p += 12; /* 12 is strlen("__rand_int__). */
}
}
}
/* If cluster mode is enabled, set slot hashtags pointers. */
if (config.cluster_mode) {
if (from) {
c->staglen = from->staglen;
c->stagfree = 0;
c->stagptr = zmalloc(sizeof(char *) * c->staglen);
/* copy the offsets. */
for (j = 0; j < (int)c->staglen; j++) {
for (size_t j = 0; j < c->staglen; j++) {
c->stagptr[j] = c->obuf + (from->stagptr[j] - from->obuf);
/* Adjust for the different select prefix length. */
c->stagptr[j] += c->prefixlen - from->prefixlen;
Expand Down Expand Up @@ -1156,6 +1212,7 @@ static void benchmarkSequence(const char *title, char *cmd, int len, int seqlen)
config.precision, // Number of significant figures
&config.current_sec_latency_histogram); // Pointer to initialise

initPlaceholders(cmd, len);
if (config.num_threads) initBenchmarkThreads();

if (config.rps > 0) {
Expand Down Expand Up @@ -1567,7 +1624,7 @@ int parseOptions(int argc, char **argv) {
p++;
if (*p < '0' || *p > '9') goto invalid;
}
config.replacekeys = 1;
config.replace_placeholders = 1;
config.keyspacelen = atoi(next);
if (config.keyspacelen < 0) config.keyspacelen = 0;
} else if (!strcmp(argv[i], "--sequential")) {
Expand Down Expand Up @@ -1738,7 +1795,10 @@ int parseOptions(int argc, char **argv) {
"a number N to repeat the command N times. In command arguments, the following\n"
"placeholders are substituted:\n\n"
" __rand_int__ Replaced with a zero-padded random integer in the range\n"
" selected using the -r option.\n"
" selected using the -r option. Multiple occurrences within the\n"
" command will have different values.\n"
"__rand_1st__ Like __rand_int__ but multiple occurrences will have the same\n"
" value. __rand_2nd__ through __rand_9th__ are also available.\n"
" __data__ Replaced with data of the size specified by the -d option.\n"
" {tag} Replaced with a tag that routes the command to each node in\n"
" a cluster. Include this in key names when running in cluster\n"
Expand Down Expand Up @@ -1790,6 +1850,10 @@ int parseOptions(int argc, char **argv) {
" use the same key.\n"
" --sequential Modifies the -r argument to replace the string __rand_int__\n"
" with 12 digit numbers sequentially instead of randomly.\n"
" __rand_1st__ through __rand_9th__ are available with independent\n"
" counters. Used to create expected number of elements with multiple\n"
" replacements.\n"
" example: ZADD myzset __rand_int__ element:__rand_1st__\n"
" -P <numreq> Pipeline <numreq> requests. That is, send multiple requests\n"
" before waiting for the replies. Default 1 (no pipeline).\n"
" When multiple commands are specified on the command line,\n"
Expand Down Expand Up @@ -1954,7 +2018,7 @@ int main(int argc, char **argv) {
config.keepalive = 1;
config.datasize = 3;
config.pipeline = 1;
config.replacekeys = 0;
config.replace_placeholders = 0;
config.keyspacelen = 0;
config.sequential_replacement = 0;
config.quiet = 0;
Expand Down Expand Up @@ -1985,6 +2049,7 @@ int main(int argc, char **argv) {
config.num_functions = 10;
config.num_keys_in_fcall = 1;
config.resp3 = 0;
resetPlaceholders();

i = parseOptions(argc, argv);
argc -= i;
Expand Down Expand Up @@ -2253,8 +2318,8 @@ int main(int argc, char **argv) {

if (test_is_selected("zadd")) {
char *score = "0";
if (config.replacekeys) score = "__rand_int__";
len = valkeyFormatCommand(&cmd, "ZADD myzset%s %s element:__rand_int__", tag, score);
if (config.replace_placeholders) score = "__rand_int__";
len = valkeyFormatCommand(&cmd, "ZADD myzset%s %s element:__rand_1st__", tag, score);
benchmark("ZADD", cmd, len);
free(cmd);
}
Expand Down Expand Up @@ -2380,6 +2445,7 @@ int main(int argc, char **argv) {
zfree(data);
freeCliConnInfo(config.conn_info);
if (config.server_config != NULL) freeServerConfig(config.server_config);
resetPlaceholders();

return 0;
}
Loading
Loading