Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2210,8 +2210,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {

for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
serverDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
if (databaseEmpty(j)) continue;
serverDb *db = server.db[j];

/* SELECT the new DB */
if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr;
Expand Down
6 changes: 3 additions & 3 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ static int shouldReturnTlsInfo(void) {
}

unsigned int countKeysInSlot(unsigned int slot) {
return kvstoreHashtableSize(server.db->keys, slot);
return kvstoreHashtableSize(server.db[0]->keys, slot);
}

void clusterCommandHelp(client *c) {
Expand Down Expand Up @@ -910,7 +910,7 @@ void clusterCommand(client *c) {
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c, numkeys);
kvstoreHashtableIterator *kvs_di = NULL;
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot, 0);
kvs_di = kvstoreGetHashtableIterator(server.db[0]->keys, slot, 0);
for (unsigned int i = 0; i < numkeys; i++) {
void *next;
serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next));
Expand Down Expand Up @@ -1099,7 +1099,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
if ((migrating_slot || importing_slot) && !pubsubshard_included) {
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL)
if (lookupKeyReadWithFlags(server.db[0], thiskey, flags) == NULL)
missing_keys++;
else
existing_keys++;
Expand Down
17 changes: 9 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -5711,7 +5711,7 @@ int verifyClusterConfigWithData(void) {

/* Make sure we only have keys in DB0. */
for (j = 1; j < server.dbnum; j++) {
if (kvstoreSize(server.db[j].keys)) return C_ERR;
if (!databaseEmpty(j)) return C_ERR;
}

/* Check that all the slots we see populated memory have a corresponding
Expand Down Expand Up @@ -6347,19 +6347,20 @@ unsigned int delKeysInSlot(unsigned int hashslot) {

kvstoreHashtableIterator *kvs_di = NULL;
void *next;
kvs_di = kvstoreGetHashtableIterator(server.db->keys, hashslot, HASHTABLE_ITER_SAFE);
serverDb *db = server.db[0];
kvs_di = kvstoreGetHashtableIterator(db->keys, hashslot, HASHTABLE_ITER_SAFE);
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
robj *valkey = next;
enterExecutionUnit(1, 0);
sds sdskey = objectGetKey(valkey);
robj *key = createStringObject(sdskey, sdslen(sdskey));
dbDelete(&server.db[0], key);
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
signalModifiedKey(NULL, &server.db[0], key);
dbDelete(db, key);
propagateDeletion(db, key, server.lazyfree_lazy_server_del);
signalModifiedKey(NULL, db, key);
/* The keys are not actually logically deleted from the database, just moved to another node.
* The modules needs to know that these keys are no longer available locally, so just send the
* keyspace notification to the modules, but not to clients. */
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id);
exitExecutionUnit();
postExecutionUnitOperations();
decrRefCount(key);
Expand Down Expand Up @@ -6827,7 +6828,7 @@ int clusterCommandSpecial(client *c) {
}
} else if (!strcasecmp(c->argv[1]->ptr, "flushslots") && c->argc == 2) {
/* CLUSTER FLUSHSLOTS */
if (kvstoreSize(server.db[0].keys) != 0) {
if (!databaseEmpty(0)) {
addReplyError(c, "DB must be empty to perform CLUSTER FLUSHSLOTS.");
return 1;
}
Expand Down Expand Up @@ -6968,7 +6969,7 @@ int clusterCommandSpecial(client *c) {
/* If the instance is currently a primary, it should have no assigned
* slots nor keys to accept to replicate some other node.
* Replicas can switch to another primary without issues. */
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || kvstoreSize(server.db[0].keys) != 0)) {
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || !databaseEmpty(0))) {
addReplyError(c, "To set a master the node must be empty and "
"without assigned slots.");
return 1;
Expand Down
81 changes: 43 additions & 38 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ robj *dbUnshareStringValue(serverDb *db, robj *key, robj *o) {
* The dbnum can be -1 if all the DBs should be emptied, or the specified
* DB index if we want to empty only a single database.
* The function returns the number of keys removed from the database(s). */
long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callback)(hashtable *)) {
long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callback)(hashtable *)) {
long long removed = 0;
int startdb, enddb;

Expand All @@ -583,16 +583,17 @@ long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callbac
}

for (int j = startdb; j <= enddb; j++) {
removed += kvstoreSize(dbarray[j].keys);
if (dbarray[j] == NULL) continue;
removed += kvstoreSize(dbarray[j]->keys);
if (async) {
emptyDbAsync(&dbarray[j]);
emptyDbAsync(dbarray[j]);
} else {
kvstoreEmpty(dbarray[j].keys, callback);
kvstoreEmpty(dbarray[j].expires, callback);
kvstoreEmpty(dbarray[j]->keys, callback);
kvstoreEmpty(dbarray[j]->expires, callback);
}
/* Because all keys of database are removed, reset average ttl. */
dbarray[j].avg_ttl = 0;
dbarray[j].expires_cursor = 0;
dbarray[j]->avg_ttl = 0;
dbarray[j]->expires_cursor = 0;
}

return removed;
Expand Down Expand Up @@ -650,47 +651,45 @@ long long emptyData(int dbnum, int flags, void(callback)(hashtable *)) {
return removed;
}

/* Initialize temporary db on replica for use during diskless replication. */
serverDb *initTempDb(void) {
int slot_count_bits = 0;
int flags = KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND;
if (server.cluster_enabled) {
slot_count_bits = CLUSTER_SLOT_MASK_BITS;
flags |= KVSTORE_FREE_EMPTY_HASHTABLES;
}
serverDb *tempDb = zcalloc(sizeof(serverDb) * server.dbnum);
for (int i = 0; i < server.dbnum; i++) {
tempDb[i].id = i;
tempDb[i].keys = kvstoreCreate(&kvstoreKeysHashtableType, slot_count_bits, flags);
tempDb[i].expires = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags);
}

return tempDb;
}

/* Discard tempDb, it's always async. */
void discardTempDb(serverDb *tempDb) {
void discardTempDb(serverDb **tempDb) {
/* Release temp DBs. */
emptyDbStructure(tempDb, -1, 1, NULL);
for (int i = 0; i < server.dbnum; i++) {
kvstoreRelease(tempDb[i].keys);
kvstoreRelease(tempDb[i].expires);
if (tempDb[i]) {
kvstoreRelease(tempDb[i]->keys);
kvstoreRelease(tempDb[i]->expires);

/* These are expected to be empty on temporary databases */
serverAssert(dictSize(tempDb[i]->blocking_keys) == 0);
serverAssert(dictSize(tempDb[i]->blocking_keys_unblock_on_nokey) == 0);
serverAssert(dictSize(tempDb[i]->ready_keys) == 0);
serverAssert(dictSize(tempDb[i]->watched_keys) == 0);

dictRelease(tempDb[i]->blocking_keys);
dictRelease(tempDb[i]->blocking_keys_unblock_on_nokey);
dictRelease(tempDb[i]->ready_keys);
dictRelease(tempDb[i]->watched_keys);
zfree(tempDb[i]);
tempDb[i] = NULL;
}
}

zfree(tempDb);
}

int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum) return C_ERR;
c->db = &server.db[id];
initDatabase(id);
c->db = server.db[id];
return C_OK;
}

long long dbTotalServerKeyCount(void) {
long long total = 0;
int j;
for (j = 0; j < server.dbnum; j++) {
total += kvstoreSize(server.db[j].keys);
if (databaseEmpty(j)) continue;
total += kvstoreSize(server.db[j]->keys);
}
return total;
}
Expand Down Expand Up @@ -721,8 +720,9 @@ void signalFlushedDb(int dbid, int async) {
}

for (int j = startdb; j <= enddb; j++) {
scanDatabaseForDeletedKeys(&server.db[j], NULL);
touchAllWatchedKeysInDb(&server.db[j], NULL);
if (server.db[j] == NULL) continue;
scanDatabaseForDeletedKeys(server.db[j], NULL);
touchAllWatchedKeysInDb(server.db[j], NULL);
}

trackingInvalidateKeysOnFlush(async);
Expand Down Expand Up @@ -1641,8 +1641,10 @@ void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with) {
int dbSwapDatabases(int id1, int id2) {
if (id1 < 0 || id1 >= server.dbnum || id2 < 0 || id2 >= server.dbnum) return C_ERR;
if (id1 == id2) return C_OK;
serverDb aux = server.db[id1];
serverDb *db1 = &server.db[id1], *db2 = &server.db[id2];
initDatabase(id1);
initDatabase(id2);
serverDb aux = *server.db[id1];
serverDb *db1 = server.db[id1], *db2 = server.db[id2];

/* Swapdb should make transaction fail if there is any
* client watching keys */
Expand Down Expand Up @@ -1683,10 +1685,13 @@ int dbSwapDatabases(int id1, int id2) {
/* Logically, this discards (flushes) the old main database, and apply the newly loaded
* database (temp) as the main (active) database, the actual freeing of old database
* (which will now be placed in the temp one) is done later. */
void swapMainDbWithTempDb(serverDb *tempDb) {
void swapMainDbWithTempDb(serverDb **tempDb) {
for (int i = 0; i < server.dbnum; i++) {
serverDb aux = server.db[i];
serverDb *activedb = &server.db[i], *newdb = &tempDb[i];
if (tempDb[i] == NULL && server.db[i] == NULL) continue;
if (tempDb[i] == NULL) tempDb[i] = createDatabase(i);
if (server.db[i] == NULL) server.db[i] = createDatabase(i);
serverDb aux = *server.db[i];
serverDb *activedb = server.db[i], *newdb = tempDb[i];

/* Swapping databases should make transaction fail if there is any
* client watching keys. */
Expand Down
18 changes: 12 additions & 6 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ void computeDatasetDigest(unsigned char *final) {
memset(final, 0, 20); /* Start with a clean result */

for (int j = 0; j < server.dbnum; j++) {
serverDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
serverDb *db = server.db[j];
if (db == NULL || kvstoreSize(db->keys) == 0) continue;
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);

/* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */
Expand Down Expand Up @@ -907,14 +907,20 @@ void debugCommand(client *c) {
if (c->argc >= 4 && !strcasecmp(c->argv[3]->ptr, "full")) full = 1;

stats = sdscatprintf(stats, "[Dictionary HT]\n");
kvstoreGetStats(server.db[dbid].keys, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
serverDb *db = server.db[dbid];
if (db) {
kvstoreGetStats(db->keys, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
}

stats = sdscatprintf(stats, "[Expires HT]\n");
kvstoreGetStats(server.db[dbid].expires, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
if (db) {
kvstoreGetStats(db->expires, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
}

addReplyVerbatim(c, stats, sdslen(stats), "txt");

sdsfree(stats);
} else if (!strcasecmp(c->argv[1]->ptr, "htstats-key") && c->argc >= 3) {
int full = 0;
Expand Down
7 changes: 4 additions & 3 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ static void defragModule(serverDb *db, robj *obj) {
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
static void defragKey(defragKeysCtx *ctx, robj **elemref) {
serverDb *db = &server.db[ctx->dbid];
serverDb *db = server.db[ctx->dbid];
int slot = ctx->kvstate.slot;
robj *newob, *ob;
unsigned char *newzl;
Expand Down Expand Up @@ -987,7 +987,7 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];
serverDb *db = server.db[dbid];

static defragKeysCtx ctx; // STATIC - this persists
if (endtime == 0) {
Expand All @@ -1005,7 +1005,7 @@ static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privda
static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];
serverDb *db = server.db[dbid];
return defragStageKvstoreHelper(endtime, db->expires,
scanHashtableCallbackCountScanned, NULL, NULL);
}
Expand Down Expand Up @@ -1273,6 +1273,7 @@ static void beginDefragCycle(void) {
defrag.remaining_stages = listCreate();

for (int dbid = 0; dbid < server.dbnum; dbid++) {
if (databaseEmpty(dbid)) continue;
addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL);
addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL);
}
Expand Down
14 changes: 9 additions & 5 deletions src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,9 @@ int performEvictions(void) {
* so to start populate the eviction pool sampling keys from
* every DB. */
for (i = 0; i < server.dbnum; i++) {
db = server.db + i;
db = server.db[i];
if (db == NULL) continue;
;
kvstore *kvs;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
kvs = db->keys;
Expand Down Expand Up @@ -601,9 +603,9 @@ int performEvictions(void) {

kvstore *kvs;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
kvs = server.db[bestdbid].keys;
kvs = server.db[bestdbid]->keys;
} else {
kvs = server.db[bestdbid].expires;
kvs = server.db[bestdbid]->expires;
}
void *entry = NULL;
int found = kvstoreHashtableFind(kvs, pool[k].slot, pool[k].key, &entry);
Expand Down Expand Up @@ -634,7 +636,9 @@ int performEvictions(void) {
* incrementally visit all DBs. */
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db + j;
db = server.db[j];
if (db == NULL) continue;
;
kvstore *kvs;
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) {
kvs = db->keys;
Expand All @@ -653,7 +657,7 @@ int performEvictions(void) {

/* Finally remove the selected key. */
if (bestkey) {
db = server.db + bestdbid;
db = server.db[bestdbid];
robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
Expand Down
14 changes: 9 additions & 5 deletions src/expire.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ void activeExpireCycle(int type) {
data.ttl_sum = 0;
data.ttl_samples = 0;

serverDb *db = server.db + (current_db % server.dbnum);
serverDb *db = server.db[(current_db % server.dbnum)];
data.db = db;

int db_done = 0; /* The scan of the current DB is done? */
Expand All @@ -245,13 +245,17 @@ void activeExpireCycle(int type) {
* distribute the time evenly across DBs. */
current_db++;

if (kvstoreSize(db->expires)) dbs_performed++;
if (db && kvstoreSize(db->expires)) dbs_performed++;

/* Continue to expire if at the end of the cycle there are still
* a big percentage of keys to expire, compared to the number of keys
* we scanned. The percentage, stored in config_cycle_acceptable_stale
* is not fixed, but depends on the configured "expire effort". */
do {
if (db == NULL) {
break; /* DB not allocated since it was never used */
}

unsigned long num;
iteration++;

Expand Down Expand Up @@ -421,11 +425,11 @@ void expireReplicaKeys(void) {
int dbid = 0;
while (dbids && dbid < server.dbnum) {
if ((dbids & 1) != 0) {
serverDb *db = server.db + dbid;
robj *expire = dbFindExpires(db, keyname);
serverDb *db = server.db[dbid];
robj *expire = db == NULL ? NULL : dbFindExpires(db, keyname);
int expired = 0;

if (expire && activeExpireCycleTryExpire(server.db + dbid, expire, start)) {
if (expire && activeExpireCycleTryExpire(db, expire, start)) {
expired = 1;
/* Propagate the DEL (writable replicas do not propagate anything to other replicas,
* but they might propagate to AOF) and trigger module hooks. */
Expand Down
Loading
Loading