Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
179 commits
Select commit Hold shift + click to select a range
acc2398
active-expire: add field-level TTL expiry with alternating cycle prio…
xbasel Jun 23, 2025
85567b0
fix defrag
xbasel Jul 2, 2025
94829b2
format
xbasel Jul 2, 2025
ea4a25b
fix defrag bug + optimize
xbasel Jul 3, 2025
8e4bfcc
Batch propagation
xbasel Jul 6, 2025
e722f5a
formatting
xbasel Jul 7, 2025
95f597a
Defrag keys_with_volatile_items
xbasel Jul 7, 2025
82d8efa
use unused debugging function
xbasel Jul 7, 2025
2bf7a60
fix bug
xbasel Jul 7, 2025
22261cb
rename expired_fields to expired_subkeys to match redis info
xbasel Jul 7, 2025
536db24
notification
xbasel Jul 8, 2025
fc9701d
del main key notification
xbasel Jul 8, 2025
2f8ecc9
fix subkeys info
xbasel Jul 8, 2025
27f8825
unfinished
xbasel Jul 8, 2025
f8801a9
working
xbasel Jul 8, 2025
45b7e7c
handle pause
xbasel Jul 9, 2025
283b027
Revert "handle pause"
xbasel Jul 9, 2025
331eefa
pause
xbasel Jul 9, 2025
7ce2b04
Remove serverDb* params from hash functions
xbasel Jul 10, 2025
e26b94f
optmize
xbasel Jul 10, 2025
6a7f09f
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 10, 2025
09ea89f
fix an asser
xbasel Jul 10, 2025
4491819
change assertion logic and few other issues to prevent crashes
xbasel Jul 10, 2025
9af1239
fixes
xbasel Jul 13, 2025
9d41e39
format
xbasel Jul 13, 2025
cd04998
change design to not continue processing same hash object over multiple
xbasel Jul 13, 2025
4988b82
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 13, 2025
7a72bea
Added active expire tests in hashexpire.tcl
Jul 8, 2025
bee03bf
remove redundant declaration
xbasel Jul 13, 2025
e880cab
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 13, 2025
9ef4adb
Added HSETEX Active Expiry Test
Jul 13, 2025
9c6d99e
fix todos
xbasel Jul 13, 2025
ac1acc8
fixes
xbasel Jul 13, 2025
63aa12f
Merge remote-tracking branch 'xbasel/field_active_expiry' into field_…
xbasel Jul 13, 2025
8e9d64c
Merge branch 'field_activeexpiry_def' into field_activeexpiry
xbasel Jul 13, 2025
b1d1e2d
fix defrag
xbasel Jul 13, 2025
3e25aeb
fix
xbasel Jul 13, 2025
c23a3b9
comments
xbasel Jul 13, 2025
da48f3c
fixes
xbasel Jul 13, 2025
02e2157
format
xbasel Jul 13, 2025
2e3db4c
fixes
xbasel Jul 13, 2025
0beceeb
Stabilize flaky test
Jul 14, 2025
aff56ee
more fixes
xbasel Jul 14, 2025
5ce8785
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 14, 2025
930d5a9
fix
xbasel Jul 14, 2025
f3d5aca
format
xbasel Jul 14, 2025
5919e64
fix
xbasel Jul 14, 2025
2eef1a9
reduce logging
xbasel Jul 14, 2025
8ffdbdf
Fixed tests
Jul 14, 2025
67db439
fix iteration -> speed up
xbasel Jul 14, 2025
cea7a69
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 14, 2025
730df60
Removed print from test
Jul 15, 2025
36ac4c9
Adjust test timeout to accommodate longer expiry delays
Jul 15, 2025
24e7b45
Added 2 test scenarios that previously caused a crash
Jul 15, 2025
ac25557
fix no epxiry on vector
xbasel Jul 15, 2025
609cb4d
fix bug
xbasel Jul 15, 2025
7920b44
fix defgrag issues
xbasel Jul 15, 2025
d340ebe
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 15, 2025
f102c92
fix bug in vset leading to hashtable corruption
xbasel Jul 15, 2025
4031067
Revert "fix bug in vset leading to hashtable corruption"
xbasel Jul 15, 2025
b0bc2ad
Reapply "fix bug in vset leading to hashtable corruption"
xbasel Jul 15, 2025
dac3a02
fix bug
xbasel Jul 15, 2025
016a4a8
propagate db
xbasel Jul 15, 2025
db29fac
Update src/db.c
xbasel Jul 15, 2025
c2ec2ac
Update src/server.c
xbasel Jul 15, 2025
3a53195
code review
xbasel Jul 15, 2025
4ad9896
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 15, 2025
6f04c70
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 16, 2025
123fafb
formatting
xbasel Jul 16, 2025
d91bde6
fix comment
xbasel Jul 16, 2025
f800b1a
Make the itertaor static and moe the cursor to the serverDb struct
xbasel Jul 16, 2025
1734780
format
xbasel Jul 16, 2025
2f57736
release keys_with_vola_items
xbasel Jul 16, 2025
a7b6b02
fix bugs
xbasel Jul 16, 2025
6d3adf7
don't run cluster mode tests in hash field tests on external engines
xbasel Jul 16, 2025
cf005ac
comments
xbasel Jul 16, 2025
d855272
disable flaky tests
xbasel Jul 16, 2025
59e8ed4
tidy up
xbasel Jul 16, 2025
f9d462b
Update src/defrag.c
xbasel Jul 16, 2025
0fa2b8b
Update src/defrag.c
xbasel Jul 16, 2025
283f6e0
Update src/defrag.c
xbasel Jul 16, 2025
e01d3fb
tidy up
xbasel Jul 16, 2025
01970bb
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 16, 2025
d465f01
Update src/t_hash.c
xbasel Jul 16, 2025
3c50fe8
tidy up
xbasel Jul 16, 2025
e21b91e
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 16, 2025
f198640
Update src/t_hash.c
xbasel Jul 16, 2025
6330c07
tidy up
xbasel Jul 16, 2025
ae783ca
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 16, 2025
86b7068
remove redundant instruction
xbasel Jul 16, 2025
fc3de10
optimize
xbasel Jul 16, 2025
1e44a43
Added Changes from ttl-poc-new branch
Jul 17, 2025
019e9e7
Deflake monitor test
Jul 17, 2025
925c1c7
Added keys_with_volatile_items checks
Jul 17, 2025
fa95f92
code review
xbasel Jul 17, 2025
44ab3c6
use hashtableReplaceReallocatedEntry
xbasel Jul 17, 2025
eec6ed4
fixes
xbasel Jul 17, 2025
f2da2f2
fixes
xbasel Jul 17, 2025
c466290
remove return value
xbasel Jul 17, 2025
34cd30e
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 17, 2025
276327a
fix uninitialized variable
xbasel Jul 17, 2025
b46a465
add missing new line
xbasel Jul 17, 2025
f4f403b
Update src/db.c
xbasel Jul 17, 2025
89017c7
tidy up
xbasel Jul 17, 2025
eda8866
remove redundant code
xbasel Jul 17, 2025
80e2f27
refactor key notification
xbasel Jul 17, 2025
af74624
disable test step (we'll fix it later, its not essential)
xbasel Jul 17, 2025
be86a2c
remove redundant code
xbasel Jul 17, 2025
7ad0462
format
xbasel Jul 17, 2025
ccfcd44
code review fixes
xbasel Jul 20, 2025
db66b29
remove inline
xbasel Jul 20, 2025
5568d48
move db1->keys_with_volatile_items_cursor to activeExpireFieldIterator
xbasel Jul 20, 2025
6b48925
flat loop
xbasel Jul 20, 2025
50d9e54
tidy up
xbasel Jul 21, 2025
cf9bda8
fix slow epxiry bug, don't jump DBs if there's likely more entries to
xbasel Jul 21, 2025
a1e239a
Simplify expire.c loop
xbasel Jul 21, 2025
3ef99ab
defrag -> defrag volatile sets when itertaing on db->keys
xbasel Jul 21, 2025
5a4e887
remove redudant code
xbasel Jul 21, 2025
b8f2704
remove unused code
xbasel Jul 21, 2025
a5a7810
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 21, 2025
e2e5e79
dbAddInternal is called only when the hash object doesn't exist, so its
xbasel Jul 21, 2025
1e2b9b8
Remove redundant code
xbasel Jul 21, 2025
e8bb2c8
clean up
xbasel Jul 21, 2025
be5e704
don't case delta to unasigned
xbasel Jul 21, 2025
8e5fbc4
clean up
xbasel Jul 21, 2025
31082a9
code review
xbasel Jul 22, 2025
a8ad390
Remove db dependency from hash functions
xbasel Jul 22, 2025
43a0784
formatting
xbasel Jul 22, 2025
b340319
Refactor hash defrag and put outside defrag.c
xbasel Jul 22, 2025
6b040a8
More changes
xbasel Jul 22, 2025
07d0329
code review 2
xbasel Jul 23, 2025
8dd7493
relocate hashTypeReclaimExpiredFields to expire.c
xbasel Jul 23, 2025
4b1419d
refactor
xbasel Jul 23, 2025
8ad8175
code review
xbasel Jul 24, 2025
5dcbfec
Remove dbTrackKeyWithVolatileItemsIfNeeded
xbasel Jul 24, 2025
b7ec7b5
tidy up
xbasel Jul 24, 2025
2ce8777
review with Ran
xbasel Jul 24, 2025
d72d639
fixes
xbasel Jul 24, 2025
67a5eb1
format
xbasel Jul 24, 2025
68afa3f
format
xbasel Jul 24, 2025
2ab4785
format
xbasel Jul 24, 2025
6ebda3a
fix vset unit tests (shouldn't test defrag if defrag isn't enable)
xbasel Jul 24, 2025
9487798
Update src/expire.c
xbasel Jul 27, 2025
14fac35
code review
xbasel Jul 27, 2025
abceb06
Update src/expire.c
xbasel Jul 27, 2025
163c486
changes with Ran
xbasel Jul 27, 2025
9f7e18f
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 27, 2025
07b3ca5
more changes
xbasel Jul 27, 2025
ba582ff
format
xbasel Jul 27, 2025
bfcf811
fix comments
xbasel Jul 27, 2025
dacefdb
fix bug in defrag
xbasel Jul 28, 2025
58aba79
code review
xbasel Jul 28, 2025
3561a47
more changes
xbasel Jul 28, 2025
bdc0b98
remove ifdef from unit test
xbasel Jul 28, 2025
b80e8e2
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 28, 2025
7857eca
Apply suggestions from code review
ranshid Jul 28, 2025
3a1d65b
Update src/server.c
xbasel Jul 28, 2025
63337d2
Use getMonotonicUs in activeExpireCycle for faster timing (RDTSC vs g…
xbasel Jul 28, 2025
848ed75
find ./tests -type f -exec sed -i 's/\bexpired_subkeys\b/expired_fiel…
xbasel Jul 29, 2025
19efe45
small refactor following PR comments
ranshid Jul 30, 2025
1a6ff14
document getExpirationPolicyWithFlags
ranshid Jul 30, 2025
475a8e1
fix bug when deleting hash table entry at pos 6
xbasel Jul 30, 2025
0577eb5
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Jul 30, 2025
4a2d5b7
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Jul 30, 2025
651de13
address more PR comments
ranshid Jul 30, 2025
66f7fa9
fix format check
ranshid Jul 30, 2025
9f7cac1
fix yet another format issue
ranshid Jul 30, 2025
9fef365
2 fixes:
ranshid Jul 31, 2025
b813588
keep the original statistic name.
ranshid Jul 31, 2025
eb1d14e
place fast cycle back to back prevention in activeExpireCycle
ranshid Jul 31, 2025
df684f3
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Aug 3, 2025
9054990
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Aug 3, 2025
c938dbd
refactor active expiry:
ranshid Aug 3, 2025
cbb225f
small followup refactor
ranshid Aug 3, 2025
59223fc
fix comment
ranshid Aug 4, 2025
b712220
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Aug 4, 2025
7b0ea56
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Aug 4, 2025
77c0dca
fix active expiration after last merge
ranshid Aug 4, 2025
da5998b
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Aug 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ gcov:
$(MAKE) SERVER_CFLAGS="-fprofile-arcs -ftest-coverage -DCOVERAGE_TEST" SERVER_LDFLAGS="-fprofile-arcs -ftest-coverage"

noopt:
$(MAKE) OPTIMIZATION="-O0"
$(MAKE) OPTIMIZATION="-O0 -fno-lto"

valgrind:
$(MAKE) OPTIMIZATION="-O0" MALLOC="libc"
Expand Down
22 changes: 21 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,13 @@ int dbGenericDeleteWithDictIndex(serverDb *db, robj *key, int async, int flags,
debugServerAssert(0 == kvstoreHashtableDelete(db->expires, dict_index, key->ptr));
}

/* If deleting a hash object, untrack the object if it contains volatile items. */
if (val->type == OBJ_HASH && val->encoding == OBJ_ENCODING_HASHTABLE && hashTypeHasVolatileElements(val)) {
dbUntrackKeyWithVolaItems(db, val); // TODO xbasel, dbUntrackKeyWithVolaItems should accept optional dict_index (it's available here).
} else {
debugServerAssert(0 == dbUntrackKeyWithVolaItems(db, val));
}

if (async) {
freeObjAsync(key, val, db->id);
} else {
Expand All @@ -501,6 +508,18 @@ int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
return dbGenericDeleteWithDictIndex(db, key, async, flags, dict_index);
}

/* Add a key with volatile items to the tracking kvstore. */
int dbTrackKeyWithVolaItems(serverDb *db, robj *o) {
int dict_index = getKVStoreIndexForKey(objectGetKey(o));
return kvstoreHashtableAdd(db->keys_with_volatile_items, dict_index, o);
}

/* Delete a key from the keys with volatile entries tracking kvstore */
int dbUntrackKeyWithVolaItems(serverDb *db, robj *o) {
int dict_index = getKVStoreIndexForKey(objectGetKey(o));
return kvstoreHashtableDelete(db->keys_with_volatile_items, dict_index, objectGetKey(o));
}

/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(serverDb *db, robj *key) {
return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED);
Expand Down Expand Up @@ -582,6 +601,7 @@ long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callba
} else {
kvstoreEmpty(dbarray[j]->keys, callback);
kvstoreEmpty(dbarray[j]->expires, callback);
kvstoreEmpty(dbarray[j]->keys_with_volatile_items, callback);
}
/* Because all keys of database are removed, reset average ttl. */
dbarray[j]->avg_ttl = 0;
Expand Down Expand Up @@ -1550,7 +1570,7 @@ void copyCommand(client *c) {
case OBJ_LIST: newobj = listTypeDup(o); break;
case OBJ_SET: newobj = setTypeDup(o); break;
case OBJ_ZSET: newobj = zsetDup(o); break;
case OBJ_HASH: newobj = hashTypeDup(o); break;
case OBJ_HASH: newobj = hashTypeDup(c->db, o); break;
case OBJ_STREAM: newobj = streamDup(o); break;
case OBJ_MODULE:
newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o);
Expand Down
38 changes: 29 additions & 9 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ typedef struct {
} defragPubSubCtx;
static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");

/* Context for defragmenting hash objects: holds the DB and the hash key object. */
typedef struct {
serverDb *db;
robj *o;
} objectDbContext;

/* When scanning a main kvstore, large elements are queued for later handling rather than
* causing a large latency spike while processing a hash table bucket. This list is only used
Expand Down Expand Up @@ -452,18 +457,21 @@ static void activeDefragEntry(void *privdata, void *element_ref) {
if (new_entry) {
/* In case the entry is tracked we need to update it in the volatile set */
if (entryHasExpiry(new_entry)) {
robj *obj = (robj *)privdata;
serverAssert(obj);
hashTypeTrackUpdateEntry(obj, old_entry, new_entry, old_expiry, entryGetExpiry(new_entry));
objectDbContext *ctx = privdata;
serverAssert(ctx->o);
serverAssert(ctx->db);
hashTypeTrackUpdateEntry(ctx->db, ctx->o, old_entry, new_entry, old_expiry, entryGetExpiry(new_entry));
}
*entry_ref = new_entry;
}
}

static void scanLaterHash(robj *ob, unsigned long *cursor) {
static void scanLaterHash(robj *ob, unsigned long *cursor, int dbid) {
serverDb *db = server.db[dbid];
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HASHTABLE);
hashtable *ht = ob->ptr;
*cursor = hashtableScanDefrag(ht, *cursor, activeDefragEntry, ob, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
objectDbContext ctx = {db, ob};
*cursor = hashtableScanDefrag(ht, *cursor, activeDefragEntry, &ctx, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

static void defragQuicklist(robj *ob) {
Expand Down Expand Up @@ -500,15 +508,16 @@ static void defragZsetSkiplist(robj *ob) {
}
}

static void defragHash(robj *ob) {
static void defragHash(serverDb *db, robj *ob) {
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HASHTABLE);
hashtable *ht = ob->ptr;
if (hashtableSize(ht) > server.active_defrag_max_scan_fields) {
defragLater(ob);
} else {
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragEntry, ob, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
objectDbContext ctx = {db, ob};
cursor = hashtableScanDefrag(ht, cursor, activeDefragEntry, &ctx, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
/* defrag the hashtable struct and tables */
Expand Down Expand Up @@ -744,7 +753,7 @@ static void defragKey(defragKeysCtx *ctx, robj **elemref) {
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl;
} else if (ob->encoding == OBJ_ENCODING_HASHTABLE) {
defragHash(ob);
defragHash(db, ob);
} else {
serverPanic("Unknown hash encoding");
}
Expand Down Expand Up @@ -813,7 +822,7 @@ static int defragLaterItem(robj *ob, unsigned long *cursor, monotime endtime, in
} else if (ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST) {
scanLaterZset(ob, cursor);
} else if (ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HASHTABLE) {
scanLaterHash(ob, cursor);
scanLaterHash(ob, cursor, dbid);
} else if (ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM) {
return scanLaterStreamListpacks(ob, cursor, endtime);
} else if (ob->type == OBJ_MODULE) {
Expand Down Expand Up @@ -973,6 +982,16 @@ static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void
scanHashtableCallbackCountScanned, NULL, NULL);
}

// Target is a DBID
static doneStatus defragStageKeysWithvolaItemsKvstore(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = server.db[dbid];
return defragStageKvstoreHelper(endtime, db->keys_with_volatile_items,
scanHashtableCallbackCountScanned, NULL, NULL);
}



static doneStatus defragStagePubsubKvstore(monotime endtime, void *target, void *privdata) {
// target is server.pubsub_channels or server.pubsubshard_channels
Expand Down Expand Up @@ -1240,6 +1259,7 @@ static void beginDefragCycle(void) {
if (dbHasNoKeys(dbid)) continue;
addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL);
addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL);
addDefragStage(defragStageKeysWithvolaItemsKvstore, (void *)(uintptr_t)dbid, NULL);
}

static getClientChannelsFnWrapper getClientPubSubChannelsFn = {getClientPubSubChannels};
Expand Down
Loading
Loading