Skip to content

Commit 3a6c455

Browse files
xbaselzuiderkwast
authored andcommitted
On-demand database allocation instead of preallocation (valkey-io#1609)
Allocate database structures lazily to prevent excessive memory usage when a large number of databases is configured but not actually used. fixes valkey-io#1597 --------- Signed-off-by: xbasel <[email protected]> Signed-off-by: Viktor Söderqvist <[email protected]> Co-authored-by: Viktor Söderqvist <[email protected]> Signed-off-by: chzhoo <[email protected]>
1 parent b4966b3 commit 3a6c455

File tree

13 files changed

+186
-138
lines changed

13 files changed

+186
-138
lines changed

src/aof.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2216,8 +2216,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
22162216

22172217
for (j = 0; j < server.dbnum; j++) {
22182218
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
2219-
serverDb *db = server.db + j;
2220-
if (kvstoreSize(db->keys) == 0) continue;
2219+
if (dbHasNoKeys(j)) continue;
2220+
serverDb *db = server.db[j];
22212221

22222222
/* SELECT the new DB */
22232223
if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr;

src/cluster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -824,7 +824,7 @@ unsigned int countKeysInSlotForDb(unsigned int hashslot, serverDb *db) {
824824
unsigned int countKeysInSlot(unsigned int slot) {
825825
unsigned int result = 0;
826826
for (int i = 0; i < server.dbnum; i++) {
827-
result += countKeysInSlotForDb(slot, server.db + i);
827+
result += server.db[i] ? countKeysInSlotForDb(slot, server.db[i]) : 0;
828828
}
829829
return result;
830830
}

src/cluster_legacy.c

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6565,29 +6565,30 @@ unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del,
65656565
for (int i = 0; i < server.dbnum; i++) {
65666566
kvstoreHashtableIterator *kvs_di = NULL;
65676567
void *next;
6568-
serverDb db = server.db[i];
6569-
kvs_di = kvstoreGetHashtableIterator(db.keys, hashslot, HASHTABLE_ITER_SAFE);
6568+
serverDb *db = server.db[i];
6569+
if (db == NULL) continue;
6570+
kvs_di = kvstoreGetHashtableIterator(db->keys, hashslot, HASHTABLE_ITER_SAFE);
65706571
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
65716572
robj *valkey = next;
65726573
enterExecutionUnit(1, 0);
65736574
sds sdskey = objectGetKey(valkey);
65746575
robj *key = createStringObject(sdskey, sdslen(sdskey));
65756576
if (lazy) {
6576-
dbAsyncDelete(&db, key);
6577+
dbAsyncDelete(db, key);
65776578
} else {
6578-
dbSyncDelete(&db, key);
6579+
dbSyncDelete(db, key);
65796580
}
65806581
// if is command, skip del propagate
6581-
if (propagate_del) propagateDeletion(&db, key, lazy);
6582-
signalModifiedKey(NULL, &db, key);
6582+
if (propagate_del) propagateDeletion(db, key, lazy);
6583+
signalModifiedKey(NULL, db, key);
65836584
if (send_del_event) {
65846585
/* In the `cluster flushslot` scenario, the keys are actually deleted so notify everyone. */
6585-
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id);
6586+
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id);
65866587
} else {
65876588
/* The keys are not actually logically deleted from the database, just moved to another node.
65886589
* The modules needs to know that these keys are no longer available locally, so just send the
65896590
* keyspace notification to the modules, but not to clients. */
6590-
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id);
6591+
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id);
65916592
}
65926593
exitExecutionUnit();
65936594
postExecutionUnitOperations();
@@ -7065,7 +7066,7 @@ int clusterCommandSpecial(client *c) {
70657066
}
70667067
} else if (!strcasecmp(c->argv[1]->ptr, "flushslots") && c->argc == 2) {
70677068
/* CLUSTER FLUSHSLOTS */
7068-
if (!dbHasNoKeys()) {
7069+
if (!dbsHaveNoKeys()) {
70697070
addReplyError(c, "DB must be empty to perform CLUSTER FLUSHSLOTS.");
70707071
return 1;
70717072
}
@@ -7235,7 +7236,7 @@ int clusterCommandSpecial(client *c) {
72357236
/* If the instance is currently a primary, it should have no assigned
72367237
* slots nor keys to accept to replicate some other node.
72377238
* Replicas can switch to another primary without issues. */
7238-
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || !dbHasNoKeys())) {
7239+
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || !dbsHaveNoKeys())) {
72397240
addReplyError(c, "To set a master the node must be empty and "
72407241
"without assigned slots.");
72417242
return 1;
@@ -7387,7 +7388,7 @@ int clusterCommandSpecial(client *c) {
73877388

73887389
/* Replicas can be reset while containing data, but not primary nodes
73897390
* that must be empty. */
7390-
if (clusterNodeIsPrimary(myself) && !dbHasNoKeys()) {
7391+
if (clusterNodeIsPrimary(myself) && !dbsHaveNoKeys()) {
73917392
addReplyError(c, "CLUSTER RESET can't be called with "
73927393
"master nodes containing keys");
73937394
return 1;

src/db.c

Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ robj *dbUnshareStringValue(serverDb *db, robj *key, robj *o) {
572572
* The dbnum can be -1 if all the DBs should be emptied, or the specified
573573
* DB index if we want to empty only a single database.
574574
* The function returns the number of keys removed from the database(s). */
575-
long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callback)(hashtable *)) {
575+
long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callback)(hashtable *)) {
576576
long long removed = 0;
577577
int startdb, enddb;
578578

@@ -584,18 +584,18 @@ long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callbac
584584
}
585585

586586
for (int j = startdb; j <= enddb; j++) {
587-
if (kvstoreSize(dbarray[j].keys) == 0) continue;
587+
if (dbarray[j] == NULL || kvstoreSize(dbarray[j]->keys) == 0) continue;
588588

589-
removed += kvstoreSize(dbarray[j].keys);
589+
removed += kvstoreSize(dbarray[j]->keys);
590590
if (async) {
591-
emptyDbAsync(&dbarray[j]);
591+
emptyDbAsync(dbarray[j]);
592592
} else {
593-
kvstoreEmpty(dbarray[j].keys, callback);
594-
kvstoreEmpty(dbarray[j].expires, callback);
593+
kvstoreEmpty(dbarray[j]->keys, callback);
594+
kvstoreEmpty(dbarray[j]->expires, callback);
595595
}
596596
/* Because all keys of database are removed, reset average ttl. */
597-
dbarray[j].avg_ttl = 0;
598-
dbarray[j].expires_cursor = 0;
597+
dbarray[j]->avg_ttl = 0;
598+
dbarray[j]->expires_cursor = 0;
599599
}
600600

601601
return removed;
@@ -653,47 +653,44 @@ long long emptyData(int dbnum, int flags, void(callback)(hashtable *)) {
653653
return removed;
654654
}
655655

656-
/* Initialize temporary db on replica for use during diskless replication. */
657-
serverDb *initTempDb(void) {
658-
int slot_count_bits = 0;
659-
int flags = KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND;
660-
if (server.cluster_enabled) {
661-
slot_count_bits = CLUSTER_SLOT_MASK_BITS;
662-
flags |= KVSTORE_FREE_EMPTY_HASHTABLES;
663-
}
664-
serverDb *tempDb = zcalloc(sizeof(serverDb) * server.dbnum);
665-
for (int i = 0; i < server.dbnum; i++) {
666-
tempDb[i].id = i;
667-
tempDb[i].keys = kvstoreCreate(&kvstoreKeysHashtableType, slot_count_bits, flags);
668-
tempDb[i].expires = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags);
669-
}
670-
671-
return tempDb;
672-
}
673-
674-
/* Discard tempDb, it's always async. */
675-
void discardTempDb(serverDb *tempDb) {
656+
/* Discard tempDb array. It's always async. */
657+
void discardTempDb(serverDb **tempDb) {
676658
/* Release temp DBs. */
677659
emptyDbStructure(tempDb, -1, 1, NULL);
678660
for (int i = 0; i < server.dbnum; i++) {
679-
kvstoreRelease(tempDb[i].keys);
680-
kvstoreRelease(tempDb[i].expires);
661+
if (tempDb[i]) {
662+
kvstoreRelease(tempDb[i]->keys);
663+
kvstoreRelease(tempDb[i]->expires);
664+
665+
/* These are expected to be empty on temporary databases */
666+
serverAssert(dictSize(tempDb[i]->blocking_keys) == 0);
667+
serverAssert(dictSize(tempDb[i]->blocking_keys_unblock_on_nokey) == 0);
668+
serverAssert(dictSize(tempDb[i]->ready_keys) == 0);
669+
serverAssert(dictSize(tempDb[i]->watched_keys) == 0);
670+
671+
dictRelease(tempDb[i]->blocking_keys);
672+
dictRelease(tempDb[i]->blocking_keys_unblock_on_nokey);
673+
dictRelease(tempDb[i]->ready_keys);
674+
dictRelease(tempDb[i]->watched_keys);
675+
zfree(tempDb[i]);
676+
tempDb[i] = NULL;
677+
}
681678
}
682-
683679
zfree(tempDb);
684680
}
685681

686682
int selectDb(client *c, int id) {
687683
if (id < 0 || id >= server.dbnum) return C_ERR;
688-
c->db = &server.db[id];
684+
c->db = createDatabaseIfNeeded(id);
689685
return C_OK;
690686
}
691687

692688
long long dbTotalServerKeyCount(void) {
693689
long long total = 0;
694690
int j;
695691
for (j = 0; j < server.dbnum; j++) {
696-
total += kvstoreSize(server.db[j].keys);
692+
if (dbHasNoKeys(j)) continue;
693+
total += kvstoreSize(server.db[j]->keys);
697694
}
698695
return total;
699696
}
@@ -724,8 +721,9 @@ void signalFlushedDb(int dbid, int async) {
724721
}
725722

726723
for (int j = startdb; j <= enddb; j++) {
727-
scanDatabaseForDeletedKeys(&server.db[j], NULL);
728-
touchAllWatchedKeysInDb(&server.db[j], NULL);
724+
if (server.db[j] == NULL) continue;
725+
scanDatabaseForDeletedKeys(server.db[j], NULL);
726+
touchAllWatchedKeysInDb(server.db[j], NULL);
729727
}
730728

731729
trackingInvalidateKeysOnFlush(async);
@@ -1642,8 +1640,9 @@ void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with) {
16421640
int dbSwapDatabases(int id1, int id2) {
16431641
if (id1 < 0 || id1 >= server.dbnum || id2 < 0 || id2 >= server.dbnum) return C_ERR;
16441642
if (id1 == id2) return C_OK;
1645-
serverDb aux = server.db[id1];
1646-
serverDb *db1 = &server.db[id1], *db2 = &server.db[id2];
1643+
serverDb *db1 = createDatabaseIfNeeded(id1);
1644+
serverDb *db2 = createDatabaseIfNeeded(id2);
1645+
serverDb aux = *db1;
16471646

16481647
/* Swapdb should make transaction fail if there is any
16491648
* client watching keys */
@@ -1684,10 +1683,13 @@ int dbSwapDatabases(int id1, int id2) {
16841683
/* Logically, this discards (flushes) the old main database, and apply the newly loaded
16851684
* database (temp) as the main (active) database, the actual freeing of old database
16861685
* (which will now be placed in the temp one) is done later. */
1687-
void swapMainDbWithTempDb(serverDb *tempDb) {
1686+
void swapMainDbWithTempDb(serverDb **tempDb) {
16881687
for (int i = 0; i < server.dbnum; i++) {
1689-
serverDb aux = server.db[i];
1690-
serverDb *activedb = &server.db[i], *newdb = &tempDb[i];
1688+
if (tempDb[i] == NULL && server.db[i] == NULL) continue;
1689+
if (tempDb[i] == NULL) tempDb[i] = createDatabase(i);
1690+
if (server.db[i] == NULL) server.db[i] = createDatabase(i);
1691+
serverDb aux = *server.db[i];
1692+
serverDb *activedb = server.db[i], *newdb = tempDb[i];
16911693

16921694
/* Swapping databases should make transaction fail if there is any
16931695
* client watching keys. */
@@ -2848,12 +2850,3 @@ int bitfieldGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysRes
28482850
}
28492851
return 1;
28502852
}
2851-
2852-
bool dbHasNoKeys(void) {
2853-
for (int i = 0; i < server.dbnum; i++) {
2854-
if (kvstoreSize(server.db[i].keys) != 0) {
2855-
return false;
2856-
}
2857-
}
2858-
return true;
2859-
}

src/debug.c

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,8 @@ void computeDatasetDigest(unsigned char *final) {
289289
memset(final, 0, 20); /* Start with a clean result */
290290

291291
for (int j = 0; j < server.dbnum; j++) {
292-
serverDb *db = server.db + j;
293-
if (kvstoreSize(db->keys) == 0) continue;
292+
serverDb *db = server.db[j];
293+
if (db == NULL || kvstoreSize(db->keys) == 0) continue;
294294
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);
295295

296296
/* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */
@@ -914,14 +914,20 @@ void debugCommand(client *c) {
914914
if (c->argc >= 4 && !strcasecmp(c->argv[3]->ptr, "full")) full = 1;
915915

916916
stats = sdscatprintf(stats, "[Dictionary HT]\n");
917-
kvstoreGetStats(server.db[dbid].keys, buf, sizeof(buf), full);
918-
stats = sdscat(stats, buf);
917+
serverDb *db = server.db[dbid];
918+
if (db) {
919+
kvstoreGetStats(db->keys, buf, sizeof(buf), full);
920+
stats = sdscat(stats, buf);
921+
}
919922

920923
stats = sdscatprintf(stats, "[Expires HT]\n");
921-
kvstoreGetStats(server.db[dbid].expires, buf, sizeof(buf), full);
922-
stats = sdscat(stats, buf);
924+
if (db) {
925+
kvstoreGetStats(db->expires, buf, sizeof(buf), full);
926+
stats = sdscat(stats, buf);
927+
}
923928

924929
addReplyVerbatim(c, stats, sdslen(stats), "txt");
930+
925931
sdsfree(stats);
926932
} else if (!strcasecmp(c->argv[1]->ptr, "htstats-key") && c->argc >= 3) {
927933
int full = 0;

src/defrag.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,7 @@ static void defragModule(serverDb *db, robj *obj) {
684684
/* for each key we scan in the main dict, this function will attempt to defrag
685685
* all the various pointers it has. */
686686
static void defragKey(defragKeysCtx *ctx, robj **elemref) {
687-
serverDb *db = &server.db[ctx->dbid];
687+
serverDb *db = server.db[ctx->dbid];
688688
int slot = ctx->kvstate.slot;
689689
robj *newob, *ob;
690690
unsigned char *newzl;
@@ -940,7 +940,7 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
940940
static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) {
941941
UNUSED(privdata);
942942
int dbid = (uintptr_t)target;
943-
serverDb *db = &server.db[dbid];
943+
serverDb *db = server.db[dbid];
944944

945945
static defragKeysCtx ctx; // STATIC - this persists
946946
if (endtime == 0) {
@@ -958,7 +958,7 @@ static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privda
958958
static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) {
959959
UNUSED(privdata);
960960
int dbid = (uintptr_t)target;
961-
serverDb *db = &server.db[dbid];
961+
serverDb *db = server.db[dbid];
962962
return defragStageKvstoreHelper(endtime, db->expires,
963963
scanHashtableCallbackCountScanned, NULL, NULL);
964964
}
@@ -1227,6 +1227,7 @@ static void beginDefragCycle(void) {
12271227
defrag.remaining_stages = listCreate();
12281228

12291229
for (int dbid = 0; dbid < server.dbnum; dbid++) {
1230+
if (dbHasNoKeys(dbid)) continue;
12301231
addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL);
12311232
addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL);
12321233
}

src/evict.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,8 @@ int performEvictions(void) {
568568
* so to start populate the eviction pool sampling keys from
569569
* every DB. */
570570
for (i = 0; i < server.dbnum; i++) {
571-
db = server.db + i;
571+
db = server.db[i];
572+
if (db == NULL) continue;
572573
kvstore *kvs;
573574
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
574575
kvs = db->keys;
@@ -601,9 +602,9 @@ int performEvictions(void) {
601602

602603
kvstore *kvs;
603604
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
604-
kvs = server.db[bestdbid].keys;
605+
kvs = server.db[bestdbid]->keys;
605606
} else {
606-
kvs = server.db[bestdbid].expires;
607+
kvs = server.db[bestdbid]->expires;
607608
}
608609
void *entry = NULL;
609610
int found = kvstoreHashtableFind(kvs, pool[k].slot, pool[k].key, &entry);
@@ -634,7 +635,8 @@ int performEvictions(void) {
634635
* incrementally visit all DBs. */
635636
for (i = 0; i < server.dbnum; i++) {
636637
j = (++next_db) % server.dbnum;
637-
db = server.db + j;
638+
db = server.db[j];
639+
if (db == NULL) continue;
638640
kvstore *kvs;
639641
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) {
640642
kvs = db->keys;
@@ -653,7 +655,7 @@ int performEvictions(void) {
653655

654656
/* Finally remove the selected key. */
655657
if (bestkey) {
656-
db = server.db + bestdbid;
658+
db = server.db[bestdbid];
657659
robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
658660
/* We compute the amount of memory freed by db*Delete() alone.
659661
* It is possible that actually the memory needed to propagate

src/expire.c

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ void activeExpireCycle(int type) {
239239
data.ttl_sum = 0;
240240
data.ttl_samples = 0;
241241

242-
serverDb *db = server.db + (current_db % server.dbnum);
242+
serverDb *db = server.db[(current_db % server.dbnum)];
243243
data.db = db;
244244

245245
int db_done = 0; /* The scan of the current DB is done? */
@@ -250,13 +250,17 @@ void activeExpireCycle(int type) {
250250
* distribute the time evenly across DBs. */
251251
current_db++;
252252

253-
if (kvstoreSize(db->expires)) dbs_performed++;
253+
if (db && kvstoreSize(db->expires)) dbs_performed++;
254254

255255
/* Continue to expire if at the end of the cycle there are still
256256
* a big percentage of keys to expire, compared to the number of keys
257257
* we scanned. The percentage, stored in config_cycle_acceptable_stale
258258
* is not fixed, but depends on the configured "expire effort". */
259259
do {
260+
if (db == NULL) {
261+
break; /* DB not allocated since it was never used */
262+
}
263+
260264
unsigned long num;
261265
iteration++;
262266

@@ -427,11 +431,11 @@ void expireReplicaKeys(void) {
427431
int dbid = 0;
428432
while (dbids && dbid < server.dbnum) {
429433
if ((dbids & 1) != 0) {
430-
serverDb *db = server.db + dbid;
431-
robj *expire = dbFindExpires(db, keyname);
434+
serverDb *db = server.db[dbid];
435+
robj *expire = db == NULL ? NULL : dbFindExpires(db, keyname);
432436
int expired = 0;
433437

434-
if (expire && activeExpireCycleTryExpire(server.db + dbid, expire, start)) {
438+
if (expire && activeExpireCycleTryExpire(db, expire, start)) {
435439
expired = 1;
436440
/* Propagate the DEL (writable replicas do not propagate anything to other replicas,
437441
* but they might propagate to AOF) and trigger module hooks. */

0 commit comments

Comments
 (0)