diff --git a/src/aof.c b/src/aof.c index 70791daa0f..8d302a3d39 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2210,8 +2210,8 @@ int rewriteAppendOnlyFileRio(rio *aof) { for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; - serverDb *db = server.db + j; - if (kvstoreSize(db->keys) == 0) continue; + if (dbHasNoKeys(j)) continue; + serverDb *db = server.db[j]; /* SELECT the new DB */ if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr; diff --git a/src/cluster.c b/src/cluster.c index 7397b3cd5b..cc4dcc6fa3 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -824,7 +824,7 @@ unsigned int countKeysInSlotForDb(unsigned int hashslot, serverDb *db) { unsigned int countKeysInSlot(unsigned int slot) { unsigned int result = 0; for (int i = 0; i < server.dbnum; i++) { - result += countKeysInSlotForDb(slot, server.db + i); + result += server.db[i] ? countKeysInSlotForDb(slot, server.db[i]) : 0; } return result; } diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 87a58965cb..ce665ebdc6 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6548,29 +6548,30 @@ unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, for (int i = 0; i < server.dbnum; i++) { kvstoreHashtableIterator *kvs_di = NULL; void *next; - serverDb db = server.db[i]; - kvs_di = kvstoreGetHashtableIterator(db.keys, hashslot, HASHTABLE_ITER_SAFE); + serverDb *db = server.db[i]; + if (db == NULL) continue; + kvs_di = kvstoreGetHashtableIterator(db->keys, hashslot, HASHTABLE_ITER_SAFE); while (kvstoreHashtableIteratorNext(kvs_di, &next)) { robj *valkey = next; enterExecutionUnit(1, 0); sds sdskey = objectGetKey(valkey); robj *key = createStringObject(sdskey, sdslen(sdskey)); if (lazy) { - dbAsyncDelete(&db, key); + dbAsyncDelete(db, key); } else { - dbSyncDelete(&db, key); + dbSyncDelete(db, key); } // if is command, skip del propagate - if (propagate_del) propagateDeletion(&db, key, lazy); - signalModifiedKey(NULL, &db, key); + if (propagate_del) propagateDeletion(db, key, lazy); + signalModifiedKey(NULL, db, key); if (send_del_event) { /* In the `cluster flushslot` scenario, the keys are actually deleted so notify everyone. */ - notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id); } else { /* The keys are not actually logically deleted from the database, just moved to another node. * The modules needs to know that these keys are no longer available locally, so just send the * keyspace notification to the modules, but not to clients. */ - moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id); + moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id); } exitExecutionUnit(); postExecutionUnitOperations(); @@ -7048,7 +7049,7 @@ int clusterCommandSpecial(client *c) { } } else if (!strcasecmp(c->argv[1]->ptr, "flushslots") && c->argc == 2) { /* CLUSTER FLUSHSLOTS */ - if (!dbHasNoKeys()) { + if (!dbsHaveNoKeys()) { addReplyError(c, "DB must be empty to perform CLUSTER FLUSHSLOTS."); return 1; } @@ -7218,7 +7219,7 @@ int clusterCommandSpecial(client *c) { /* If the instance is currently a primary, it should have no assigned * slots nor keys to accept to replicate some other node. * Replicas can switch to another primary without issues. */ - if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || !dbHasNoKeys())) { + if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || !dbsHaveNoKeys())) { addReplyError(c, "To set a master the node must be empty and " "without assigned slots."); return 1; @@ -7370,7 +7371,7 @@ int clusterCommandSpecial(client *c) { /* Replicas can be reset while containing data, but not primary nodes * that must be empty. */ - if (clusterNodeIsPrimary(myself) && !dbHasNoKeys()) { + if (clusterNodeIsPrimary(myself) && !dbsHaveNoKeys()) { addReplyError(c, "CLUSTER RESET can't be called with " "master nodes containing keys"); return 1; diff --git a/src/db.c b/src/db.c index 7ba6104a78..d890c4bb97 100644 --- a/src/db.c +++ b/src/db.c @@ -571,7 +571,7 @@ robj *dbUnshareStringValue(serverDb *db, robj *key, robj *o) { * The dbnum can be -1 if all the DBs should be emptied, or the specified * DB index if we want to empty only a single database. * The function returns the number of keys removed from the database(s). */ -long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callback)(hashtable *)) { +long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callback)(hashtable *)) { long long removed = 0; int startdb, enddb; @@ -583,18 +583,18 @@ long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callbac } for (int j = startdb; j <= enddb; j++) { - if (kvstoreSize(dbarray[j].keys) == 0) continue; + if (dbarray[j] == NULL || kvstoreSize(dbarray[j]->keys) == 0) continue; - removed += kvstoreSize(dbarray[j].keys); + removed += kvstoreSize(dbarray[j]->keys); if (async) { - emptyDbAsync(&dbarray[j]); + emptyDbAsync(dbarray[j]); } else { - kvstoreEmpty(dbarray[j].keys, callback); - kvstoreEmpty(dbarray[j].expires, callback); + kvstoreEmpty(dbarray[j]->keys, callback); + kvstoreEmpty(dbarray[j]->expires, callback); } /* Because all keys of database are removed, reset average ttl. */ - dbarray[j].avg_ttl = 0; - dbarray[j].expires_cursor = 0; + dbarray[j]->avg_ttl = 0; + dbarray[j]->expires_cursor = 0; } return removed; @@ -652,39 +652,35 @@ long long emptyData(int dbnum, int flags, void(callback)(hashtable *)) { return removed; } -/* Initialize temporary db on replica for use during diskless replication. */ -serverDb *initTempDb(void) { - int slot_count_bits = 0; - int flags = KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND; - if (server.cluster_enabled) { - slot_count_bits = CLUSTER_SLOT_MASK_BITS; - flags |= KVSTORE_FREE_EMPTY_HASHTABLES; - } - serverDb *tempDb = zcalloc(sizeof(serverDb) * server.dbnum); - for (int i = 0; i < server.dbnum; i++) { - tempDb[i].id = i; - tempDb[i].keys = kvstoreCreate(&kvstoreKeysHashtableType, slot_count_bits, flags); - tempDb[i].expires = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags); - } - - return tempDb; -} - -/* Discard tempDb, it's always async. */ -void discardTempDb(serverDb *tempDb) { +/* Discard tempDb array. It's always async. */ +void discardTempDb(serverDb **tempDb) { /* Release temp DBs. */ emptyDbStructure(tempDb, -1, 1, NULL); for (int i = 0; i < server.dbnum; i++) { - kvstoreRelease(tempDb[i].keys); - kvstoreRelease(tempDb[i].expires); + if (tempDb[i]) { + kvstoreRelease(tempDb[i]->keys); + kvstoreRelease(tempDb[i]->expires); + + /* These are expected to be empty on temporary databases */ + serverAssert(dictSize(tempDb[i]->blocking_keys) == 0); + serverAssert(dictSize(tempDb[i]->blocking_keys_unblock_on_nokey) == 0); + serverAssert(dictSize(tempDb[i]->ready_keys) == 0); + serverAssert(dictSize(tempDb[i]->watched_keys) == 0); + + dictRelease(tempDb[i]->blocking_keys); + dictRelease(tempDb[i]->blocking_keys_unblock_on_nokey); + dictRelease(tempDb[i]->ready_keys); + dictRelease(tempDb[i]->watched_keys); + zfree(tempDb[i]); + tempDb[i] = NULL; + } } - zfree(tempDb); } int selectDb(client *c, int id) { if (id < 0 || id >= server.dbnum) return C_ERR; - c->db = &server.db[id]; + c->db = createDatabaseIfNeeded(id); return C_OK; } @@ -692,7 +688,8 @@ long long dbTotalServerKeyCount(void) { long long total = 0; int j; for (j = 0; j < server.dbnum; j++) { - total += kvstoreSize(server.db[j].keys); + if (dbHasNoKeys(j)) continue; + total += kvstoreSize(server.db[j]->keys); } return total; } @@ -723,8 +720,9 @@ void signalFlushedDb(int dbid, int async) { } for (int j = startdb; j <= enddb; j++) { - scanDatabaseForDeletedKeys(&server.db[j], NULL); - touchAllWatchedKeysInDb(&server.db[j], NULL); + if (server.db[j] == NULL) continue; + scanDatabaseForDeletedKeys(server.db[j], NULL); + touchAllWatchedKeysInDb(server.db[j], NULL); } trackingInvalidateKeysOnFlush(async); @@ -1629,8 +1627,9 @@ void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with) { int dbSwapDatabases(int id1, int id2) { if (id1 < 0 || id1 >= server.dbnum || id2 < 0 || id2 >= server.dbnum) return C_ERR; if (id1 == id2) return C_OK; - serverDb aux = server.db[id1]; - serverDb *db1 = &server.db[id1], *db2 = &server.db[id2]; + serverDb *db1 = createDatabaseIfNeeded(id1); + serverDb *db2 = createDatabaseIfNeeded(id2); + serverDb aux = *db1; /* Swapdb should make transaction fail if there is any * client watching keys */ @@ -1671,10 +1670,13 @@ int dbSwapDatabases(int id1, int id2) { /* Logically, this discards (flushes) the old main database, and apply the newly loaded * database (temp) as the main (active) database, the actual freeing of old database * (which will now be placed in the temp one) is done later. */ -void swapMainDbWithTempDb(serverDb *tempDb) { +void swapMainDbWithTempDb(serverDb **tempDb) { for (int i = 0; i < server.dbnum; i++) { - serverDb aux = server.db[i]; - serverDb *activedb = &server.db[i], *newdb = &tempDb[i]; + if (tempDb[i] == NULL && server.db[i] == NULL) continue; + if (tempDb[i] == NULL) tempDb[i] = createDatabase(i); + if (server.db[i] == NULL) server.db[i] = createDatabase(i); + serverDb aux = *server.db[i]; + serverDb *activedb = server.db[i], *newdb = tempDb[i]; /* Swapping databases should make transaction fail if there is any * client watching keys. */ @@ -2839,12 +2841,3 @@ int bitfieldGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysRes } return 1; } - -bool dbHasNoKeys(void) { - for (int i = 0; i < server.dbnum; i++) { - if (kvstoreSize(server.db[i].keys) != 0) { - return false; - } - } - return true; -} diff --git a/src/debug.c b/src/debug.c index d24e69543c..6dfd6b10ec 100644 --- a/src/debug.c +++ b/src/debug.c @@ -289,8 +289,8 @@ void computeDatasetDigest(unsigned char *final) { memset(final, 0, 20); /* Start with a clean result */ for (int j = 0; j < server.dbnum; j++) { - serverDb *db = server.db + j; - if (kvstoreSize(db->keys) == 0) continue; + serverDb *db = server.db[j]; + if (db == NULL || kvstoreSize(db->keys) == 0) continue; kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES); /* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */ @@ -914,14 +914,20 @@ void debugCommand(client *c) { if (c->argc >= 4 && !strcasecmp(c->argv[3]->ptr, "full")) full = 1; stats = sdscatprintf(stats, "[Dictionary HT]\n"); - kvstoreGetStats(server.db[dbid].keys, buf, sizeof(buf), full); - stats = sdscat(stats, buf); + serverDb *db = server.db[dbid]; + if (db) { + kvstoreGetStats(db->keys, buf, sizeof(buf), full); + stats = sdscat(stats, buf); + } stats = sdscatprintf(stats, "[Expires HT]\n"); - kvstoreGetStats(server.db[dbid].expires, buf, sizeof(buf), full); - stats = sdscat(stats, buf); + if (db) { + kvstoreGetStats(db->expires, buf, sizeof(buf), full); + stats = sdscat(stats, buf); + } addReplyVerbatim(c, stats, sdslen(stats), "txt"); + sdsfree(stats); } else if (!strcasecmp(c->argv[1]->ptr, "htstats-key") && c->argc >= 3) { int full = 0; diff --git a/src/defrag.c b/src/defrag.c index 7824deebc2..bb0508f8a2 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -684,7 +684,7 @@ static void defragModule(serverDb *db, robj *obj) { /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. */ static void defragKey(defragKeysCtx *ctx, robj **elemref) { - serverDb *db = &server.db[ctx->dbid]; + serverDb *db = server.db[ctx->dbid]; int slot = ctx->kvstate.slot; robj *newob, *ob; unsigned char *newzl; @@ -940,7 +940,7 @@ static doneStatus defragStageKvstoreHelper(monotime endtime, static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) { UNUSED(privdata); int dbid = (uintptr_t)target; - serverDb *db = &server.db[dbid]; + serverDb *db = server.db[dbid]; static defragKeysCtx ctx; // STATIC - this persists if (endtime == 0) { @@ -958,7 +958,7 @@ static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privda static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) { UNUSED(privdata); int dbid = (uintptr_t)target; - serverDb *db = &server.db[dbid]; + serverDb *db = server.db[dbid]; return defragStageKvstoreHelper(endtime, db->expires, scanHashtableCallbackCountScanned, NULL, NULL); } @@ -1227,6 +1227,7 @@ static void beginDefragCycle(void) { defrag.remaining_stages = listCreate(); for (int dbid = 0; dbid < server.dbnum; dbid++) { + if (dbHasNoKeys(dbid)) continue; addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL); addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL); } diff --git a/src/evict.c b/src/evict.c index d4bfade4fc..eb94d93b8f 100644 --- a/src/evict.c +++ b/src/evict.c @@ -568,7 +568,8 @@ int performEvictions(void) { * so to start populate the eviction pool sampling keys from * every DB. */ for (i = 0; i < server.dbnum; i++) { - db = server.db + i; + db = server.db[i]; + if (db == NULL) continue; kvstore *kvs; if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { kvs = db->keys; @@ -601,9 +602,9 @@ int performEvictions(void) { kvstore *kvs; if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { - kvs = server.db[bestdbid].keys; + kvs = server.db[bestdbid]->keys; } else { - kvs = server.db[bestdbid].expires; + kvs = server.db[bestdbid]->expires; } void *entry = NULL; int found = kvstoreHashtableFind(kvs, pool[k].slot, pool[k].key, &entry); @@ -634,7 +635,8 @@ int performEvictions(void) { * incrementally visit all DBs. */ for (i = 0; i < server.dbnum; i++) { j = (++next_db) % server.dbnum; - db = server.db + j; + db = server.db[j]; + if (db == NULL) continue; kvstore *kvs; if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) { kvs = db->keys; @@ -653,7 +655,7 @@ int performEvictions(void) { /* Finally remove the selected key. */ if (bestkey) { - db = server.db + bestdbid; + db = server.db[bestdbid]; robj *keyobj = createStringObject(bestkey, sdslen(bestkey)); /* We compute the amount of memory freed by db*Delete() alone. * It is possible that actually the memory needed to propagate diff --git a/src/expire.c b/src/expire.c index 93b26724e9..bae62ccc21 100644 --- a/src/expire.c +++ b/src/expire.c @@ -239,7 +239,7 @@ void activeExpireCycle(int type) { data.ttl_sum = 0; data.ttl_samples = 0; - serverDb *db = server.db + (current_db % server.dbnum); + serverDb *db = server.db[(current_db % server.dbnum)]; data.db = db; int db_done = 0; /* The scan of the current DB is done? */ @@ -250,13 +250,17 @@ void activeExpireCycle(int type) { * distribute the time evenly across DBs. */ current_db++; - if (kvstoreSize(db->expires)) dbs_performed++; + if (db && kvstoreSize(db->expires)) dbs_performed++; /* Continue to expire if at the end of the cycle there are still * a big percentage of keys to expire, compared to the number of keys * we scanned. The percentage, stored in config_cycle_acceptable_stale * is not fixed, but depends on the configured "expire effort". */ do { + if (db == NULL) { + break; /* DB not allocated since it was never used */ + } + unsigned long num; iteration++; @@ -426,11 +430,11 @@ void expireReplicaKeys(void) { int dbid = 0; while (dbids && dbid < server.dbnum) { if ((dbids & 1) != 0) { - serverDb *db = server.db + dbid; - robj *expire = dbFindExpires(db, keyname); + serverDb *db = server.db[dbid]; + robj *expire = db == NULL ? NULL : dbFindExpires(db, keyname); int expired = 0; - if (expire && activeExpireCycleTryExpire(server.db + dbid, expire, start)) { + if (expire && activeExpireCycleTryExpire(db, expire, start)) { expired = 1; /* Propagate the DEL (writable replicas do not propagate anything to other replicas, * but they might propagate to AOF) and trigger module hooks. */ diff --git a/src/object.c b/src/object.c index 1b740aba94..36715429b5 100644 --- a/src/object.c +++ b/src/object.c @@ -1364,8 +1364,8 @@ struct serverMemOverhead *getMemoryOverheadData(void) { mem_total += mh->functions_caches; for (j = 0; j < server.dbnum; j++) { - serverDb *db = server.db + j; - if (!kvstoreNumAllocatedHashtables(db->keys)) continue; + serverDb *db = server.db[j]; + if (db == NULL || !kvstoreNumAllocatedHashtables(db->keys)) continue; unsigned long long keyscount = kvstoreSize(db->keys); diff --git a/src/rdb.c b/src/rdb.c index 67dd72fc05..8dd429d322 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1340,7 +1340,8 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { static long long info_updated_time = 0; char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB"; - serverDb *db = server.db + dbid; + serverDb *db = server.db[dbid]; + if (db == NULL) return 0; unsigned long long int db_size = kvstoreSize(db->keys); if (db_size == 0) return 0; @@ -3029,7 +3030,10 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin int type, rdbver; uint64_t db_size = 0, expires_size = 0; int should_expand_db = 0; - serverDb *db = rdb_loading_ctx->dbarray + 0; + if (rdb_loading_ctx->dbarray[0] == NULL) { + rdb_loading_ctx->dbarray[0] = createDatabase(0); + } + serverDb *db = rdb_loading_ctx->dbarray[0]; char buf[1024]; int error; long long empty_keys_skipped = 0; @@ -3106,7 +3110,10 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin SERVER_TITLE, server.dbnum); exit(1); } - db = rdb_loading_ctx->dbarray + dbid; + if (rdb_loading_ctx->dbarray[dbid] == NULL) { + rdb_loading_ctx->dbarray[dbid] = createDatabase(dbid); + } + db = rdb_loading_ctx->dbarray[dbid]; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_RESIZEDB) { /* RESIZEDB: Hint about the size of the keys in the currently diff --git a/src/replication.c b/src/replication.c index 85ddc9ea96..acaffb3669 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2087,13 +2087,13 @@ int replicationSupportSkipRDBChecksum(connection *conn, int is_replica_stream_ve /* Helper function for readSyncBulkPayload() to initialize tempDb * before socket-loading the new db from primary. The tempDb may be populated * by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */ -serverDb *disklessLoadInitTempDb(void) { - return initTempDb(); +serverDb **disklessLoadInitTempDb(void) { + return zcalloc(sizeof(serverDb *) * server.dbnum); } /* Helper function for readSyncBulkPayload() to discard our tempDb * when the loading succeeded or failed. */ -void disklessLoadDiscardTempDb(serverDb *tempDb) { +void disklessLoadDiscardTempDb(serverDb **tempDb) { discardTempDb(tempDb); } @@ -2130,7 +2130,7 @@ void readSyncBulkPayload(connection *conn) { char buf[PROTO_IOBUF_LEN]; ssize_t nread, readlen, nwritten; int use_diskless_load = useDisklessLoad(); - serverDb *diskless_load_tempDb = NULL; + serverDb **diskless_load_tempDb = NULL; functionsLibCtx *temp_functions_lib_ctx = NULL; int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; off_t left; @@ -2326,7 +2326,7 @@ void readSyncBulkPayload(connection *conn) { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (use_diskless_load) { rio rdb; - serverDb *dbarray; + serverDb **dbarray; functionsLibCtx *functions_lib_ctx; int asyncLoading = 0; @@ -2701,7 +2701,7 @@ int sendCurrentOffsetToReplica(client *replica) { char buf[128]; int buflen; buflen = snprintf(buf, sizeof(buf), "$ENDOFF:%lld %s %d %llu\r\n", server.primary_repl_offset, server.replid, - server.db->id, (long long unsigned int)replica->id); + server.db[0]->id, (long long unsigned int)replica->id); dualChannelServerLog(LL_NOTICE, "Sending to replica %s RDB end offset %lld and client-id %llu", replicationGetReplicaName(replica), server.primary_repl_offset, (long long unsigned int)replica->id); diff --git a/src/server.c b/src/server.c index 73da30685a..1641c4cb2a 100644 --- a/src/server.c +++ b/src/server.c @@ -1287,10 +1287,11 @@ void databasesCron(void) { if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; for (j = 0; j < dbs_per_call; j++) { - serverDb *db = &server.db[resize_db % server.dbnum]; + serverDb *db = server.db[resize_db % server.dbnum]; + resize_db++; + if (db == NULL) continue; kvstoreTryResizeHashtables(db->keys, CRON_DICTS_PER_DB); kvstoreTryResizeHashtables(db->expires, CRON_DICTS_PER_DB); - resize_db++; } /* Rehash */ @@ -1298,11 +1299,13 @@ void databasesCron(void) { uint64_t elapsed_us = 0; uint64_t threshold_us = 1 * 1000000 / server.hz / 100; for (j = 0; j < dbs_per_call; j++) { - serverDb *db = &server.db[rehash_db % server.dbnum]; - elapsed_us += kvstoreIncrementallyRehash(db->keys, threshold_us - elapsed_us); - if (elapsed_us >= threshold_us) break; - elapsed_us += kvstoreIncrementallyRehash(db->expires, threshold_us - elapsed_us); - if (elapsed_us >= threshold_us) break; + serverDb *db = server.db[rehash_db % server.dbnum]; + if (db != NULL) { + elapsed_us += kvstoreIncrementallyRehash(db->keys, threshold_us - elapsed_us); + if (elapsed_us >= threshold_us) break; + elapsed_us += kvstoreIncrementallyRehash(db->expires, threshold_us - elapsed_us); + if (elapsed_us >= threshold_us) break; + } rehash_db++; } } @@ -1541,11 +1544,13 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa if (server.verbosity <= LL_VERBOSE) { run_with_period(5000) { for (j = 0; j < server.dbnum; j++) { + serverDb *db = server.db[j]; + if (db == NULL) continue; long long size, used, vkeys; - size = kvstoreBuckets(server.db[j].keys) * hashtableEntriesPerBucket(); - used = kvstoreSize(server.db[j].keys); - vkeys = kvstoreSize(server.db[j].expires); + size = kvstoreBuckets(server.db[j]->keys) * hashtableEntriesPerBucket(); + used = kvstoreSize(server.db[j]->keys); + vkeys = kvstoreSize(server.db[j]->expires); if (used || vkeys) { serverLog(LL_VERBOSE, "DB %d: %lld keys (%lld volatile) in %lld slots HT.", j, used, vkeys, size); } @@ -2740,9 +2745,49 @@ void makeThreadKillable(void) { pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); } -void initServer(void) { - int j; +/* Return non-zero if the database is empty */ +int dbHasNoKeys(int dbid) { + return dbid < 0 || dbid >= server.dbnum || !server.db[dbid] || kvstoreSize(server.db[dbid]->keys) == 0; +} + +bool dbsHaveNoKeys(void) { + for (int i = 0; i < server.dbnum; i++) { + if (server.db[i] && kvstoreSize(server.db[i]->keys) != 0) { + return false; + } + } + return true; +} + +serverDb *createDatabase(int id) { + int slot_count_bits = 0; + int flags = KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND; + if (server.cluster_enabled) { + flags |= KVSTORE_FREE_EMPTY_HASHTABLES; + slot_count_bits = CLUSTER_SLOT_MASK_BITS; + } + + serverDb *db = zmalloc(sizeof(serverDb)); + db->keys = kvstoreCreate(&kvstoreKeysHashtableType, slot_count_bits, flags); + db->expires = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags); + db->expires_cursor = 0; + db->blocking_keys = dictCreate(&keylistDictType); + db->blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType); + db->ready_keys = dictCreate(&objectKeyPointerValueDictType); + db->watched_keys = dictCreate(&keylistDictType); + db->id = id; + db->avg_ttl = 0; + return db; +} +serverDb *createDatabaseIfNeeded(int id) { + if (server.db[id] == NULL) { + server.db[id] = createDatabase(id); + } + return server.db[id]; +} + +void initServer(void) { signal(SIGHUP, SIG_IGN); signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); @@ -2815,33 +2860,16 @@ void initServer(void) { } server.dbnum = server.cluster_enabled ? server.config_databases_cluster : server.config_databases; - server.db = zmalloc(sizeof(serverDb) * server.dbnum); + server.db = zcalloc(sizeof(serverDb *) * server.dbnum); + createDatabaseIfNeeded(0); /* The default database should always exist */ - /* Create the databases, and initialize other internal state. */ - int slot_count_bits = 0; - int flags = KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND; - if (server.cluster_enabled) { - slot_count_bits = CLUSTER_SLOT_MASK_BITS; - flags |= KVSTORE_FREE_EMPTY_HASHTABLES; - } - for (j = 0; j < server.dbnum; j++) { - server.db[j].keys = kvstoreCreate(&kvstoreKeysHashtableType, slot_count_bits, flags); - server.db[j].expires = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags); - server.db[j].expires_cursor = 0; - server.db[j].blocking_keys = dictCreate(&keylistDictType); - server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType); - server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType); - server.db[j].watched_keys = dictCreate(&keylistDictType); - server.db[j].id = j; - server.db[j].avg_ttl = 0; - } evictionPoolAlloc(); /* Initialize the LRU keys pool. */ /* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which * seems odd) just to make the code cleaner by making it be the same type as server.pubsubshard_channels * (which has to be kvstore), see pubsubtype.serverPubSubChannels */ server.pubsub_channels = kvstoreCreate(&kvstoreChannelHashtableType, 0, KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND); server.pubsub_patterns = dictCreate(&objToHashtableDictType); - server.pubsubshard_channels = kvstoreCreate(&kvstoreChannelHashtableType, slot_count_bits, + server.pubsubshard_channels = kvstoreCreate(&kvstoreChannelHashtableType, server.cluster_enabled ? CLUSTER_SLOT_MASK_BITS : 0, KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND | KVSTORE_FREE_EMPTY_HASHTABLES); server.pubsub_clients = 0; server.watching_clients = 0; @@ -5636,9 +5664,10 @@ void totalNumberOfStatefulKeys(unsigned long *blocking_keys, unsigned long *watched_keys) { unsigned long bkeys = 0, bkeys_on_nokey = 0, wkeys = 0; for (int j = 0; j < server.dbnum; j++) { - bkeys += dictSize(server.db[j].blocking_keys); - bkeys_on_nokey += dictSize(server.db[j].blocking_keys_unblock_on_nokey); - wkeys += dictSize(server.db[j].watched_keys); + if (server.db[j] == NULL) continue; + bkeys += dictSize(server.db[j]->blocking_keys); + bkeys_on_nokey += dictSize(server.db[j]->blocking_keys_unblock_on_nokey); + wkeys += dictSize(server.db[j]->watched_keys); } if (blocking_keys) *blocking_keys = bkeys; if (blocking_keys_on_nokey) *blocking_keys_on_nokey = bkeys_on_nokey; @@ -6238,13 +6267,15 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { if (sections++) info = sdscat(info, "\r\n"); info = sdscatprintf(info, "# Keyspace\r\n"); for (j = 0; j < server.dbnum; j++) { + serverDb *db = server.db[j]; + if (db == NULL) continue; long long keys, vkeys; - keys = kvstoreSize(server.db[j].keys); - vkeys = kvstoreSize(server.db[j].expires); + keys = kvstoreSize(db->keys); + vkeys = kvstoreSize(db->expires); if (keys || vkeys) { info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", j, keys, vkeys, - server.db[j].avg_ttl); + db->avg_ttl); } } } diff --git a/src/server.h b/src/server.h index f4bef339a0..c37b9988f2 100644 --- a/src/server.h +++ b/src/server.h @@ -845,7 +845,7 @@ typedef struct functionsLibCtx functionsLibCtx; * For example: dbarray need to be set as main database on * successful loading and dropped on failure. */ typedef struct rdbLoadingCtx { - serverDb *dbarray; + serverDb **dbarray; functionsLibCtx *functions_lib_ctx; } rdbLoadingCtx; @@ -1579,7 +1579,7 @@ struct valkeyServer { int hz; /* serverCron() calls frequency in hertz */ int clients_hz; /* clientsTimeProc() frequency in hertz */ int in_fork_child; /* indication that this is a fork child */ - serverDb *db; + serverDb **db; /* each db created when it's first used */ hashtable *commands; /* Command table */ hashtable *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; @@ -3459,11 +3459,11 @@ robj *dbUnshareStringValue(serverDb *db, robj *key, robj *o); #define EMPTYDB_ASYNC (1 << 0) /* Reclaim memory in another thread. */ #define EMPTYDB_NOFUNCTIONS (1 << 1) /* Indicate not to flush the functions. */ long long emptyData(int dbnum, int flags, void(callback)(hashtable *)); -long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callback)(hashtable *)); +long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callback)(hashtable *)); void flushAllDataAndResetRDB(int flags); long long dbTotalServerKeyCount(void); -serverDb *initTempDb(void); -void discardTempDb(serverDb *tempDb); +serverDb *initTempDb(int id); +void discardTempDb(serverDb **tempDb); int selectDb(client *c, int id); void signalModifiedKey(client *c, serverDb *db, robj *key); void signalFlushedDb(int dbid, int async); @@ -3509,7 +3509,6 @@ int zmpopGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysResult int bzmpopGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysResult *result); int setGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysResult *result); int bitfieldGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysResult *result); -bool dbHasNoKeys(void); unsigned short crc16(const char *buf, int len); @@ -3933,7 +3932,11 @@ void commandAddSubcommand(struct serverCommand *parent, struct serverCommand *su void debugDelay(int usec); void killThreads(void); void makeThreadKillable(void); -void swapMainDbWithTempDb(serverDb *tempDb); +serverDb *createDatabase(int id); +int dbHasNoKeys(int dbid); +bool dbsHaveNoKeys(void); +serverDb *createDatabaseIfNeeded(int id); +void swapMainDbWithTempDb(serverDb **tempDb); sds getVersion(void); void debugPauseProcess(void);