Skip to content

Commit 12ef00e

Browse files
author
xbasel
committed
Switch from preallocating all databases to lazy allocation,
reducing overhead and improving scalability for large database counts.
1 parent d13aad4 commit 12ef00e

File tree

13 files changed

+157
-101
lines changed

13 files changed

+157
-101
lines changed

src/aof.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2210,7 +2210,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
22102210

22112211
for (j = 0; j < server.dbnum; j++) {
22122212
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
2213-
serverDb *db = server.db + j;
2213+
if (server.db[j] == NULL || kvstoreSize(server.db[j]->keys) == 0) continue;
2214+
serverDb *db = server.db[j];
22142215
if (kvstoreSize(db->keys) == 0) continue;
22152216

22162217
/* SELECT the new DB */

src/cluster.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ static int shouldReturnTlsInfo(void) {
812812
}
813813

814814
unsigned int countKeysInSlot(unsigned int slot) {
815-
return kvstoreHashtableSize(server.db->keys, slot);
815+
return kvstoreHashtableSize(server.db[0]->keys, slot);
816816
}
817817

818818
void clusterCommandHelp(client *c) {
@@ -910,7 +910,7 @@ void clusterCommand(client *c) {
910910
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
911911
addReplyArrayLen(c, numkeys);
912912
kvstoreHashtableIterator *kvs_di = NULL;
913-
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot);
913+
kvs_di = kvstoreGetHashtableIterator(server.db[0]->keys, slot);
914914
for (unsigned int i = 0; i < numkeys; i++) {
915915
void *next;
916916
serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next));
@@ -1099,7 +1099,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
10991099
* NODE <node-id>. */
11001100
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
11011101
if ((migrating_slot || importing_slot) && !pubsubshard_included) {
1102-
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL)
1102+
if (lookupKeyReadWithFlags(server.db[0], thiskey, flags) == NULL)
11031103
missing_keys++;
11041104
else
11051105
existing_keys++;

src/cluster_legacy.c

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5704,7 +5704,7 @@ int verifyClusterConfigWithData(void) {
57045704

57055705
/* Make sure we only have keys in DB0. */
57065706
for (j = 1; j < server.dbnum; j++) {
5707-
if (kvstoreSize(server.db[j].keys)) return C_ERR;
5707+
if (server.db[j] && kvstoreSize(server.db[j]->keys)) return C_ERR;
57085708
}
57095709

57105710
/* Check that all the slots we see populated memory have a corresponding
@@ -6340,19 +6340,20 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
63406340

63416341
kvstoreHashtableIterator *kvs_di = NULL;
63426342
void *next;
6343-
kvs_di = kvstoreGetHashtableSafeIterator(server.db->keys, hashslot);
6343+
serverDb *db = server.db[0];
6344+
kvs_di = kvstoreGetHashtableSafeIterator(db->keys, hashslot);
63446345
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
63456346
robj *valkey = next;
63466347
enterExecutionUnit(1, 0);
63476348
sds sdskey = objectGetKey(valkey);
63486349
robj *key = createStringObject(sdskey, sdslen(sdskey));
6349-
dbDelete(&server.db[0], key);
6350-
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
6351-
signalModifiedKey(NULL, &server.db[0], key);
6350+
dbDelete(db, key);
6351+
propagateDeletion(db, key, server.lazyfree_lazy_server_del);
6352+
signalModifiedKey(NULL, db, key);
63526353
/* The keys are not actually logically deleted from the database, just moved to another node.
63536354
* The modules needs to know that these keys are no longer available locally, so just send the
63546355
* keyspace notification to the modules, but not to clients. */
6355-
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
6356+
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id);
63566357
exitExecutionUnit();
63576358
postExecutionUnitOperations();
63586359
decrRefCount(key);
@@ -6820,7 +6821,7 @@ int clusterCommandSpecial(client *c) {
68206821
}
68216822
} else if (!strcasecmp(c->argv[1]->ptr, "flushslots") && c->argc == 2) {
68226823
/* CLUSTER FLUSHSLOTS */
6823-
if (kvstoreSize(server.db[0].keys) != 0) {
6824+
if (kvstoreSize(server.db[0]->keys) != 0) {
68246825
addReplyError(c, "DB must be empty to perform CLUSTER FLUSHSLOTS.");
68256826
return 1;
68266827
}
@@ -6961,7 +6962,7 @@ int clusterCommandSpecial(client *c) {
69616962
/* If the instance is currently a primary, it should have no assigned
69626963
* slots nor keys to accept to replicate some other node.
69636964
* Replicas can switch to another primary without issues. */
6964-
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || kvstoreSize(server.db[0].keys) != 0)) {
6965+
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || kvstoreSize(server.db[0]->keys) != 0)) {
69656966
addReplyError(c, "To set a master the node must be empty and "
69666967
"without assigned slots.");
69676968
return 1;

src/db.c

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ robj *dbUnshareStringValue(serverDb *db, robj *key, robj *o) {
571571
* The dbnum can be -1 if all the DBs should be emptied, or the specified
572572
* DB index if we want to empty only a single database.
573573
* The function returns the number of keys removed from the database(s). */
574-
long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callback)(hashtable *)) {
574+
long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callback)(hashtable *)) {
575575
long long removed = 0;
576576
int startdb, enddb;
577577

@@ -583,16 +583,17 @@ long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callbac
583583
}
584584

585585
for (int j = startdb; j <= enddb; j++) {
586-
removed += kvstoreSize(dbarray[j].keys);
586+
if (dbarray[j] == NULL) continue;
587+
removed += kvstoreSize(dbarray[j]->keys);
587588
if (async) {
588-
emptyDbAsync(&dbarray[j]);
589+
emptyDbAsync(dbarray[j]);
589590
} else {
590-
kvstoreEmpty(dbarray[j].keys, callback);
591-
kvstoreEmpty(dbarray[j].expires, callback);
591+
kvstoreEmpty(dbarray[j]->keys, callback);
592+
kvstoreEmpty(dbarray[j]->expires, callback);
592593
}
593594
/* Because all keys of database are removed, reset average ttl. */
594-
dbarray[j].avg_ttl = 0;
595-
dbarray[j].expires_cursor = 0;
595+
dbarray[j]->avg_ttl = 0;
596+
dbarray[j]->expires_cursor = 0;
596597
}
597598

598599
return removed;
@@ -668,29 +669,36 @@ serverDb *initTempDb(void) {
668669
return tempDb;
669670
}
670671

672+
void freeServerdb(serverDb *db) {
673+
if (db == NULL) return;
674+
kvstoreRelease(db->keys);
675+
kvstoreRelease(db->expires);
676+
zfree(db);
677+
}
678+
671679
/* Discard tempDb, it's always async. */
672-
void discardTempDb(serverDb *tempDb) {
680+
void discardTempDb(serverDb **tempDb) {
673681
/* Release temp DBs. */
674682
emptyDbStructure(tempDb, -1, 1, NULL);
675683
for (int i = 0; i < server.dbnum; i++) {
676-
kvstoreRelease(tempDb[i].keys);
677-
kvstoreRelease(tempDb[i].expires);
684+
freeServerdb(tempDb[i]);
678685
}
679-
680686
zfree(tempDb);
681687
}
682688

683689
int selectDb(client *c, int id) {
684690
if (id < 0 || id >= server.dbnum) return C_ERR;
685-
c->db = &server.db[id];
691+
initDatabase(id);
692+
c->db = server.db[id];
686693
return C_OK;
687694
}
688695

689696
long long dbTotalServerKeyCount(void) {
690697
long long total = 0;
691698
int j;
692699
for (j = 0; j < server.dbnum; j++) {
693-
total += kvstoreSize(server.db[j].keys);
700+
if (server.db[j] == NULL) continue;
701+
total += kvstoreSize(server.db[j]->keys);
694702
}
695703
return total;
696704
}
@@ -721,8 +729,9 @@ void signalFlushedDb(int dbid, int async) {
721729
}
722730

723731
for (int j = startdb; j <= enddb; j++) {
724-
scanDatabaseForDeletedKeys(&server.db[j], NULL);
725-
touchAllWatchedKeysInDb(&server.db[j], NULL);
732+
if (server.db[j] == NULL) continue;
733+
scanDatabaseForDeletedKeys(server.db[j], NULL);
734+
touchAllWatchedKeysInDb(server.db[j], NULL);
726735
}
727736

728737
trackingInvalidateKeysOnFlush(async);
@@ -1641,8 +1650,10 @@ void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with) {
16411650
int dbSwapDatabases(int id1, int id2) {
16421651
if (id1 < 0 || id1 >= server.dbnum || id2 < 0 || id2 >= server.dbnum) return C_ERR;
16431652
if (id1 == id2) return C_OK;
1644-
serverDb aux = server.db[id1];
1645-
serverDb *db1 = &server.db[id1], *db2 = &server.db[id2];
1653+
initDatabase(id1);
1654+
initDatabase(id2);
1655+
serverDb aux = *server.db[id1];
1656+
serverDb *db1 = server.db[id1], *db2 = server.db[id2];
16461657

16471658
/* Swapdb should make transaction fail if there is any
16481659
* client watching keys */
@@ -1683,10 +1694,13 @@ int dbSwapDatabases(int id1, int id2) {
16831694
/* Logically, this discards (flushes) the old main database, and apply the newly loaded
16841695
* database (temp) as the main (active) database, the actual freeing of old database
16851696
* (which will now be placed in the temp one) is done later. */
1686-
void swapMainDbWithTempDb(serverDb *tempDb) {
1697+
void swapMainDbWithTempDb(serverDb **tempDb) {
16871698
for (int i = 0; i < server.dbnum; i++) {
1688-
serverDb aux = server.db[i];
1689-
serverDb *activedb = &server.db[i], *newdb = &tempDb[i];
1699+
if (tempDb[i] == NULL && server.db[i] == NULL) continue;
1700+
if (tempDb[i] == NULL) tempDb[i] = createDatabase(i);
1701+
if (server.db[i] == NULL) tempDb[i] = createDatabase(i);
1702+
serverDb aux = *server.db[i];
1703+
serverDb *activedb = server.db[i], *newdb = tempDb[i];
16901704

16911705
/* Swapping databases should make transaction fail if there is any
16921706
* client watching keys. */

src/debug.c

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,8 @@ void computeDatasetDigest(unsigned char *final) {
289289
memset(final, 0, 20); /* Start with a clean result */
290290

291291
for (int j = 0; j < server.dbnum; j++) {
292-
serverDb *db = server.db + j;
293-
if (kvstoreSize(db->keys) == 0) continue;
292+
serverDb *db = server.db[j];
293+
if (db == NULL || kvstoreSize(db->keys) == 0) continue;
294294
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys);
295295

296296
/* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */
@@ -907,14 +907,19 @@ void debugCommand(client *c) {
907907
if (c->argc >= 4 && !strcasecmp(c->argv[3]->ptr, "full")) full = 1;
908908

909909
stats = sdscatprintf(stats, "[Dictionary HT]\n");
910-
kvstoreGetStats(server.db[dbid].keys, buf, sizeof(buf), full);
910+
serverDb *db = server.db[dbid] == NULL ? server.db[dbid] : createDatabase(dbid);
911+
kvstoreGetStats(db->keys, buf, sizeof(buf), full);
911912
stats = sdscat(stats, buf);
912913

913914
stats = sdscatprintf(stats, "[Expires HT]\n");
914-
kvstoreGetStats(server.db[dbid].expires, buf, sizeof(buf), full);
915+
kvstoreGetStats(db->expires, buf, sizeof(buf), full);
915916
stats = sdscat(stats, buf);
916917

917918
addReplyVerbatim(c, stats, sdslen(stats), "txt");
919+
if (server.db[dbid]==NULL) {
920+
/* This dbid wasn't allocated; a temporary empty DB was created and must be freed. */
921+
freeServerdb(server.db[dbid]);
922+
}
918923
sdsfree(stats);
919924
} else if (!strcasecmp(c->argv[1]->ptr, "htstats-key") && c->argc >= 3) {
920925
int full = 0;

src/defrag.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ static void defragModule(serverDb *db, robj *obj) {
726726
/* for each key we scan in the main dict, this function will attempt to defrag
727727
* all the various pointers it has. */
728728
static void defragKey(defragKeysCtx *ctx, robj **elemref) {
729-
serverDb *db = &server.db[ctx->dbid];
729+
serverDb *db = server.db[ctx->dbid];
730730
int slot = ctx->kvstate.slot;
731731
robj *newob, *ob;
732732
unsigned char *newzl;
@@ -987,7 +987,7 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
987987
static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) {
988988
UNUSED(privdata);
989989
int dbid = (uintptr_t)target;
990-
serverDb *db = &server.db[dbid];
990+
serverDb *db = server.db[dbid];
991991

992992
static defragKeysCtx ctx; // STATIC - this persists
993993
if (endtime == 0) {
@@ -1005,7 +1005,7 @@ static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privda
10051005
static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) {
10061006
UNUSED(privdata);
10071007
int dbid = (uintptr_t)target;
1008-
serverDb *db = &server.db[dbid];
1008+
serverDb *db = server.db[dbid];
10091009
return defragStageKvstoreHelper(endtime, db->expires,
10101010
scanHashtableCallbackCountScanned, NULL, NULL);
10111011
}
@@ -1273,6 +1273,7 @@ static void beginDefragCycle(void) {
12731273
defrag.remaining_stages = listCreate();
12741274

12751275
for (int dbid = 0; dbid < server.dbnum; dbid++) {
1276+
if (server.db[dbid] == NULL) continue;
12761277
addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL);
12771278
addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL);
12781279
}

src/evict.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,8 @@ int performEvictions(void) {
568568
* so to start populate the eviction pool sampling keys from
569569
* every DB. */
570570
for (i = 0; i < server.dbnum; i++) {
571-
db = server.db + i;
571+
db = server.db[i];
572+
if (db == NULL) continue;;
572573
kvstore *kvs;
573574
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
574575
kvs = db->keys;
@@ -601,9 +602,9 @@ int performEvictions(void) {
601602

602603
kvstore *kvs;
603604
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
604-
kvs = server.db[bestdbid].keys;
605+
kvs = server.db[bestdbid]->keys;
605606
} else {
606-
kvs = server.db[bestdbid].expires;
607+
kvs = server.db[bestdbid]->expires;
607608
}
608609
void *entry = NULL;
609610
int found = kvstoreHashtableFind(kvs, pool[k].slot, pool[k].key, &entry);
@@ -634,7 +635,8 @@ int performEvictions(void) {
634635
* incrementally visit all DBs. */
635636
for (i = 0; i < server.dbnum; i++) {
636637
j = (++next_db) % server.dbnum;
637-
db = server.db + j;
638+
db = server.db[j];
639+
if (db == NULL) continue;;
638640
kvstore *kvs;
639641
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) {
640642
kvs = db->keys;
@@ -653,7 +655,7 @@ int performEvictions(void) {
653655

654656
/* Finally remove the selected key. */
655657
if (bestkey) {
656-
db = server.db + bestdbid;
658+
db = server.db[bestdbid];
657659
robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
658660
/* We compute the amount of memory freed by db*Delete() alone.
659661
* It is possible that actually the memory needed to propagate

src/expire.c

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ void activeExpireCycle(int type) {
234234
data.ttl_sum = 0;
235235
data.ttl_samples = 0;
236236

237-
serverDb *db = server.db + (current_db % server.dbnum);
237+
serverDb *db = server.db[(current_db % server.dbnum)];
238238
data.db = db;
239239

240240
int db_done = 0; /* The scan of the current DB is done? */
@@ -245,13 +245,17 @@ void activeExpireCycle(int type) {
245245
* distribute the time evenly across DBs. */
246246
current_db++;
247247

248-
if (kvstoreSize(db->expires)) dbs_performed++;
248+
if (db && kvstoreSize(db->expires)) dbs_performed++;
249249

250250
/* Continue to expire if at the end of the cycle there are still
251251
* a big percentage of keys to expire, compared to the number of keys
252252
* we scanned. The percentage, stored in config_cycle_acceptable_stale
253253
* is not fixed, but depends on the configured "expire effort". */
254254
do {
255+
if (db == NULL) {
256+
break; /* DB not allocated since it was never used */
257+
}
258+
255259
unsigned long num;
256260
iteration++;
257261

@@ -421,11 +425,11 @@ void expireReplicaKeys(void) {
421425
int dbid = 0;
422426
while (dbids && dbid < server.dbnum) {
423427
if ((dbids & 1) != 0) {
424-
serverDb *db = server.db + dbid;
425-
robj *expire = dbFindExpires(db, keyname);
428+
serverDb *db = server.db[dbid];
429+
robj *expire = db == NULL ? NULL : dbFindExpires(db, keyname);
426430
int expired = 0;
427431

428-
if (expire && activeExpireCycleTryExpire(server.db + dbid, expire, start)) {
432+
if (expire && activeExpireCycleTryExpire(db, expire, start)) {
429433
expired = 1;
430434
/* Propagate the DEL (writable replicas do not propagate anything to other replicas,
431435
* but they might propagate to AOF) and trigger module hooks. */

src/object.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1357,8 +1357,8 @@ struct serverMemOverhead *getMemoryOverheadData(void) {
13571357
mem_total += mh->functions_caches;
13581358

13591359
for (j = 0; j < server.dbnum; j++) {
1360-
serverDb *db = server.db + j;
1361-
if (!kvstoreNumAllocatedHashtables(db->keys)) continue;
1360+
serverDb *db = server.db[j];
1361+
if (db == NULL || !kvstoreNumAllocatedHashtables(db->keys)) continue;
13621362

13631363
unsigned long long keyscount = kvstoreSize(db->keys);
13641364

0 commit comments

Comments
 (0)