Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2210,8 +2210,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {

for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
serverDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
if (dbHasNoKeys(j)) continue;
serverDb *db = server.db[j];

/* SELECT the new DB */
if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr;
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ unsigned int countKeysInSlotForDb(unsigned int hashslot, serverDb *db) {
unsigned int countKeysInSlot(unsigned int slot) {
unsigned int result = 0;
for (int i = 0; i < server.dbnum; i++) {
result += countKeysInSlotForDb(slot, server.db + i);
result += server.db[i] ? countKeysInSlotForDb(slot, server.db[i]) : 0;
}
return result;
}
Expand Down
23 changes: 12 additions & 11 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6548,29 +6548,30 @@ unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del,
for (int i = 0; i < server.dbnum; i++) {
kvstoreHashtableIterator *kvs_di = NULL;
void *next;
serverDb db = server.db[i];
kvs_di = kvstoreGetHashtableIterator(db.keys, hashslot, HASHTABLE_ITER_SAFE);
serverDb *db = server.db[i];
if (db == NULL) continue;
kvs_di = kvstoreGetHashtableIterator(db->keys, hashslot, HASHTABLE_ITER_SAFE);
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
robj *valkey = next;
enterExecutionUnit(1, 0);
sds sdskey = objectGetKey(valkey);
robj *key = createStringObject(sdskey, sdslen(sdskey));
if (lazy) {
dbAsyncDelete(&db, key);
dbAsyncDelete(db, key);
} else {
dbSyncDelete(&db, key);
dbSyncDelete(db, key);
}
// if is command, skip del propagate
if (propagate_del) propagateDeletion(&db, key, lazy);
signalModifiedKey(NULL, &db, key);
if (propagate_del) propagateDeletion(db, key, lazy);
signalModifiedKey(NULL, db, key);
if (send_del_event) {
/* In the `cluster flushslot` scenario, the keys are actually deleted so notify everyone. */
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id);
} else {
/* The keys are not actually logically deleted from the database, just moved to another node.
* The modules needs to know that these keys are no longer available locally, so just send the
* keyspace notification to the modules, but not to clients. */
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id);
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id);
}
exitExecutionUnit();
postExecutionUnitOperations();
Expand Down Expand Up @@ -7048,7 +7049,7 @@ int clusterCommandSpecial(client *c) {
}
} else if (!strcasecmp(c->argv[1]->ptr, "flushslots") && c->argc == 2) {
/* CLUSTER FLUSHSLOTS */
if (!dbHasNoKeys()) {
if (!dbsHaveNoKeys()) {
addReplyError(c, "DB must be empty to perform CLUSTER FLUSHSLOTS.");
return 1;
}
Expand Down Expand Up @@ -7218,7 +7219,7 @@ int clusterCommandSpecial(client *c) {
/* If the instance is currently a primary, it should have no assigned
* slots nor keys to accept to replicate some other node.
* Replicas can switch to another primary without issues. */
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || !dbHasNoKeys())) {
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || !dbsHaveNoKeys())) {
addReplyError(c, "To set a master the node must be empty and "
"without assigned slots.");
return 1;
Expand Down Expand Up @@ -7370,7 +7371,7 @@ int clusterCommandSpecial(client *c) {

/* Replicas can be reset while containing data, but not primary nodes
* that must be empty. */
if (clusterNodeIsPrimary(myself) && !dbHasNoKeys()) {
if (clusterNodeIsPrimary(myself) && !dbsHaveNoKeys()) {
addReplyError(c, "CLUSTER RESET can't be called with "
"master nodes containing keys");
return 1;
Expand Down
91 changes: 42 additions & 49 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ robj *dbUnshareStringValue(serverDb *db, robj *key, robj *o) {
* The dbnum can be -1 if all the DBs should be emptied, or the specified
* DB index if we want to empty only a single database.
* The function returns the number of keys removed from the database(s). */
long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callback)(hashtable *)) {
long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callback)(hashtable *)) {
long long removed = 0;
int startdb, enddb;

Expand All @@ -583,18 +583,18 @@ long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callbac
}

for (int j = startdb; j <= enddb; j++) {
if (kvstoreSize(dbarray[j].keys) == 0) continue;
if (dbarray[j] == NULL || kvstoreSize(dbarray[j]->keys) == 0) continue;

removed += kvstoreSize(dbarray[j].keys);
removed += kvstoreSize(dbarray[j]->keys);
if (async) {
emptyDbAsync(&dbarray[j]);
emptyDbAsync(dbarray[j]);
} else {
kvstoreEmpty(dbarray[j].keys, callback);
kvstoreEmpty(dbarray[j].expires, callback);
kvstoreEmpty(dbarray[j]->keys, callback);
kvstoreEmpty(dbarray[j]->expires, callback);
}
/* Because all keys of database are removed, reset average ttl. */
dbarray[j].avg_ttl = 0;
dbarray[j].expires_cursor = 0;
dbarray[j]->avg_ttl = 0;
dbarray[j]->expires_cursor = 0;
}

return removed;
Expand Down Expand Up @@ -652,47 +652,44 @@ long long emptyData(int dbnum, int flags, void(callback)(hashtable *)) {
return removed;
}

/* Initialize temporary db on replica for use during diskless replication. */
serverDb *initTempDb(void) {
int slot_count_bits = 0;
int flags = KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND;
if (server.cluster_enabled) {
slot_count_bits = CLUSTER_SLOT_MASK_BITS;
flags |= KVSTORE_FREE_EMPTY_HASHTABLES;
}
serverDb *tempDb = zcalloc(sizeof(serverDb) * server.dbnum);
for (int i = 0; i < server.dbnum; i++) {
tempDb[i].id = i;
tempDb[i].keys = kvstoreCreate(&kvstoreKeysHashtableType, slot_count_bits, flags);
tempDb[i].expires = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags);
}

return tempDb;
}

/* Discard tempDb, it's always async. */
void discardTempDb(serverDb *tempDb) {
/* Discard tempDb array. It's always async. */
void discardTempDb(serverDb **tempDb) {
/* Release temp DBs. */
emptyDbStructure(tempDb, -1, 1, NULL);
for (int i = 0; i < server.dbnum; i++) {
kvstoreRelease(tempDb[i].keys);
kvstoreRelease(tempDb[i].expires);
if (tempDb[i]) {
kvstoreRelease(tempDb[i]->keys);
kvstoreRelease(tempDb[i]->expires);

/* These are expected to be empty on temporary databases */
serverAssert(dictSize(tempDb[i]->blocking_keys) == 0);
serverAssert(dictSize(tempDb[i]->blocking_keys_unblock_on_nokey) == 0);
serverAssert(dictSize(tempDb[i]->ready_keys) == 0);
serverAssert(dictSize(tempDb[i]->watched_keys) == 0);

dictRelease(tempDb[i]->blocking_keys);
dictRelease(tempDb[i]->blocking_keys_unblock_on_nokey);
dictRelease(tempDb[i]->ready_keys);
dictRelease(tempDb[i]->watched_keys);
zfree(tempDb[i]);
tempDb[i] = NULL;
}
}

zfree(tempDb);
}

int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum) return C_ERR;
c->db = &server.db[id];
c->db = createDatabaseIfNeeded(id);
return C_OK;
}

long long dbTotalServerKeyCount(void) {
long long total = 0;
int j;
for (j = 0; j < server.dbnum; j++) {
total += kvstoreSize(server.db[j].keys);
if (dbHasNoKeys(j)) continue;
total += kvstoreSize(server.db[j]->keys);
}
return total;
}
Expand Down Expand Up @@ -723,8 +720,9 @@ void signalFlushedDb(int dbid, int async) {
}

for (int j = startdb; j <= enddb; j++) {
scanDatabaseForDeletedKeys(&server.db[j], NULL);
touchAllWatchedKeysInDb(&server.db[j], NULL);
if (server.db[j] == NULL) continue;
scanDatabaseForDeletedKeys(server.db[j], NULL);
touchAllWatchedKeysInDb(server.db[j], NULL);
}

trackingInvalidateKeysOnFlush(async);
Expand Down Expand Up @@ -1629,8 +1627,9 @@ void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with) {
int dbSwapDatabases(int id1, int id2) {
if (id1 < 0 || id1 >= server.dbnum || id2 < 0 || id2 >= server.dbnum) return C_ERR;
if (id1 == id2) return C_OK;
serverDb aux = server.db[id1];
serverDb *db1 = &server.db[id1], *db2 = &server.db[id2];
serverDb *db1 = createDatabaseIfNeeded(id1);
serverDb *db2 = createDatabaseIfNeeded(id2);
serverDb aux = *db1;

/* Swapdb should make transaction fail if there is any
* client watching keys */
Expand Down Expand Up @@ -1671,10 +1670,13 @@ int dbSwapDatabases(int id1, int id2) {
/* Logically, this discards (flushes) the old main database, and apply the newly loaded
* database (temp) as the main (active) database, the actual freeing of old database
* (which will now be placed in the temp one) is done later. */
void swapMainDbWithTempDb(serverDb *tempDb) {
void swapMainDbWithTempDb(serverDb **tempDb) {
for (int i = 0; i < server.dbnum; i++) {
serverDb aux = server.db[i];
serverDb *activedb = &server.db[i], *newdb = &tempDb[i];
if (tempDb[i] == NULL && server.db[i] == NULL) continue;
if (tempDb[i] == NULL) tempDb[i] = createDatabase(i);
if (server.db[i] == NULL) server.db[i] = createDatabase(i);
serverDb aux = *server.db[i];
serverDb *activedb = server.db[i], *newdb = tempDb[i];

/* Swapping databases should make transaction fail if there is any
* client watching keys. */
Expand Down Expand Up @@ -2839,12 +2841,3 @@ int bitfieldGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysRes
}
return 1;
}

bool dbHasNoKeys(void) {
for (int i = 0; i < server.dbnum; i++) {
if (kvstoreSize(server.db[i].keys) != 0) {
return false;
}
}
return true;
}
18 changes: 12 additions & 6 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ void computeDatasetDigest(unsigned char *final) {
memset(final, 0, 20); /* Start with a clean result */

for (int j = 0; j < server.dbnum; j++) {
serverDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
serverDb *db = server.db[j];
if (db == NULL || kvstoreSize(db->keys) == 0) continue;
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);

/* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */
Expand Down Expand Up @@ -914,14 +914,20 @@ void debugCommand(client *c) {
if (c->argc >= 4 && !strcasecmp(c->argv[3]->ptr, "full")) full = 1;

stats = sdscatprintf(stats, "[Dictionary HT]\n");
kvstoreGetStats(server.db[dbid].keys, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
serverDb *db = server.db[dbid];
if (db) {
kvstoreGetStats(db->keys, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
}

stats = sdscatprintf(stats, "[Expires HT]\n");
kvstoreGetStats(server.db[dbid].expires, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
if (db) {
kvstoreGetStats(db->expires, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
}

addReplyVerbatim(c, stats, sdslen(stats), "txt");

sdsfree(stats);
} else if (!strcasecmp(c->argv[1]->ptr, "htstats-key") && c->argc >= 3) {
int full = 0;
Expand Down
7 changes: 4 additions & 3 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ static void defragModule(serverDb *db, robj *obj) {
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
static void defragKey(defragKeysCtx *ctx, robj **elemref) {
serverDb *db = &server.db[ctx->dbid];
serverDb *db = server.db[ctx->dbid];
int slot = ctx->kvstate.slot;
robj *newob, *ob;
unsigned char *newzl;
Expand Down Expand Up @@ -940,7 +940,7 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];
serverDb *db = server.db[dbid];

static defragKeysCtx ctx; // STATIC - this persists
if (endtime == 0) {
Expand All @@ -958,7 +958,7 @@ static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privda
static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];
serverDb *db = server.db[dbid];
return defragStageKvstoreHelper(endtime, db->expires,
scanHashtableCallbackCountScanned, NULL, NULL);
}
Expand Down Expand Up @@ -1227,6 +1227,7 @@ static void beginDefragCycle(void) {
defrag.remaining_stages = listCreate();

for (int dbid = 0; dbid < server.dbnum; dbid++) {
if (dbHasNoKeys(dbid)) continue;
addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL);
addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL);
}
Expand Down
12 changes: 7 additions & 5 deletions src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ int performEvictions(void) {
* so to start populate the eviction pool sampling keys from
* every DB. */
for (i = 0; i < server.dbnum; i++) {
db = server.db + i;
db = server.db[i];
if (db == NULL) continue;
kvstore *kvs;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
kvs = db->keys;
Expand Down Expand Up @@ -601,9 +602,9 @@ int performEvictions(void) {

kvstore *kvs;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
kvs = server.db[bestdbid].keys;
kvs = server.db[bestdbid]->keys;
} else {
kvs = server.db[bestdbid].expires;
kvs = server.db[bestdbid]->expires;
}
void *entry = NULL;
int found = kvstoreHashtableFind(kvs, pool[k].slot, pool[k].key, &entry);
Expand Down Expand Up @@ -634,7 +635,8 @@ int performEvictions(void) {
* incrementally visit all DBs. */
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db + j;
db = server.db[j];
if (db == NULL) continue;
kvstore *kvs;
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) {
kvs = db->keys;
Expand All @@ -653,7 +655,7 @@ int performEvictions(void) {

/* Finally remove the selected key. */
if (bestkey) {
db = server.db + bestdbid;
db = server.db[bestdbid];
robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
Expand Down
14 changes: 9 additions & 5 deletions src/expire.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ void activeExpireCycle(int type) {
data.ttl_sum = 0;
data.ttl_samples = 0;

serverDb *db = server.db + (current_db % server.dbnum);
serverDb *db = server.db[(current_db % server.dbnum)];
data.db = db;

int db_done = 0; /* The scan of the current DB is done? */
Expand All @@ -250,13 +250,17 @@ void activeExpireCycle(int type) {
* distribute the time evenly across DBs. */
current_db++;

if (kvstoreSize(db->expires)) dbs_performed++;
if (db && kvstoreSize(db->expires)) dbs_performed++;

/* Continue to expire if at the end of the cycle there are still
* a big percentage of keys to expire, compared to the number of keys
* we scanned. The percentage, stored in config_cycle_acceptable_stale
* is not fixed, but depends on the configured "expire effort". */
do {
if (db == NULL) {
break; /* DB not allocated since it was never used */
}

unsigned long num;
iteration++;

Expand Down Expand Up @@ -426,11 +430,11 @@ void expireReplicaKeys(void) {
int dbid = 0;
while (dbids && dbid < server.dbnum) {
if ((dbids & 1) != 0) {
serverDb *db = server.db + dbid;
robj *expire = dbFindExpires(db, keyname);
serverDb *db = server.db[dbid];
robj *expire = db == NULL ? NULL : dbFindExpires(db, keyname);
int expired = 0;

if (expire && activeExpireCycleTryExpire(server.db + dbid, expire, start)) {
if (expire && activeExpireCycleTryExpire(db, expire, start)) {
expired = 1;
/* Propagate the DEL (writable replicas do not propagate anything to other replicas,
* but they might propagate to AOF) and trigger module hooks. */
Expand Down
Loading
Loading