Skip to content
Closed
Show file tree
Hide file tree
Changes from 178 commits
Commits
Show all changes
179 commits
Select commit Hold shift + click to select a range
acc2398
active-expire: add field-level TTL expiry with alternating cycle prio…
xbasel Jun 23, 2025
85567b0
fix defrag
xbasel Jul 2, 2025
94829b2
format
xbasel Jul 2, 2025
ea4a25b
fix defrag bug + optimize
xbasel Jul 3, 2025
8e4bfcc
Batch propagation
xbasel Jul 6, 2025
e722f5a
formatting
xbasel Jul 7, 2025
95f597a
Defrag keys_with_volatile_items
xbasel Jul 7, 2025
82d8efa
use unused debugging function
xbasel Jul 7, 2025
2bf7a60
fix bug
xbasel Jul 7, 2025
22261cb
rename expired_fields to expired_subkeys to match redis info
xbasel Jul 7, 2025
536db24
notification
xbasel Jul 8, 2025
fc9701d
del main key notification
xbasel Jul 8, 2025
2f8ecc9
fix subkeys info
xbasel Jul 8, 2025
27f8825
unfinished
xbasel Jul 8, 2025
f8801a9
working
xbasel Jul 8, 2025
45b7e7c
handle pause
xbasel Jul 9, 2025
283b027
Revert "handle pause"
xbasel Jul 9, 2025
331eefa
pause
xbasel Jul 9, 2025
7ce2b04
Remove serverDb* params from hash functions
xbasel Jul 10, 2025
e26b94f
optmize
xbasel Jul 10, 2025
6a7f09f
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 10, 2025
09ea89f
fix an asser
xbasel Jul 10, 2025
4491819
change assertion logic and few other issues to prevent crashes
xbasel Jul 10, 2025
9af1239
fixes
xbasel Jul 13, 2025
9d41e39
format
xbasel Jul 13, 2025
cd04998
change design to not continue processing same hash object over multiple
xbasel Jul 13, 2025
4988b82
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 13, 2025
7a72bea
Added active expire tests in hashexpire.tcl
Jul 8, 2025
bee03bf
remove redundant declaration
xbasel Jul 13, 2025
e880cab
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 13, 2025
9ef4adb
Added HSETEX Active Expiry Test
Jul 13, 2025
9c6d99e
fix todos
xbasel Jul 13, 2025
ac1acc8
fixes
xbasel Jul 13, 2025
63aa12f
Merge remote-tracking branch 'xbasel/field_active_expiry' into field_…
xbasel Jul 13, 2025
8e9d64c
Merge branch 'field_activeexpiry_def' into field_activeexpiry
xbasel Jul 13, 2025
b1d1e2d
fix defrag
xbasel Jul 13, 2025
3e25aeb
fix
xbasel Jul 13, 2025
c23a3b9
comments
xbasel Jul 13, 2025
da48f3c
fixes
xbasel Jul 13, 2025
02e2157
format
xbasel Jul 13, 2025
2e3db4c
fixes
xbasel Jul 13, 2025
0beceeb
Stabilize flaky test
Jul 14, 2025
aff56ee
more fixes
xbasel Jul 14, 2025
5ce8785
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 14, 2025
930d5a9
fix
xbasel Jul 14, 2025
f3d5aca
format
xbasel Jul 14, 2025
5919e64
fix
xbasel Jul 14, 2025
2eef1a9
reduce logging
xbasel Jul 14, 2025
8ffdbdf
Fixed tests
Jul 14, 2025
67db439
fix iteration -> speed up
xbasel Jul 14, 2025
cea7a69
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 14, 2025
730df60
Removed print from test
Jul 15, 2025
36ac4c9
Adjust test timeout to accommodate longer expiry delays
Jul 15, 2025
24e7b45
Added 2 test scenarios that previously caused a crash
Jul 15, 2025
ac25557
fix no epxiry on vector
xbasel Jul 15, 2025
609cb4d
fix bug
xbasel Jul 15, 2025
7920b44
fix defgrag issues
xbasel Jul 15, 2025
d340ebe
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 15, 2025
f102c92
fix bug in vset leading to hashtable corruption
xbasel Jul 15, 2025
4031067
Revert "fix bug in vset leading to hashtable corruption"
xbasel Jul 15, 2025
b0bc2ad
Reapply "fix bug in vset leading to hashtable corruption"
xbasel Jul 15, 2025
dac3a02
fix bug
xbasel Jul 15, 2025
016a4a8
propagate db
xbasel Jul 15, 2025
db29fac
Update src/db.c
xbasel Jul 15, 2025
c2ec2ac
Update src/server.c
xbasel Jul 15, 2025
3a53195
code review
xbasel Jul 15, 2025
4ad9896
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 15, 2025
6f04c70
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 16, 2025
123fafb
formatting
xbasel Jul 16, 2025
d91bde6
fix comment
xbasel Jul 16, 2025
f800b1a
Make the itertaor static and moe the cursor to the serverDb struct
xbasel Jul 16, 2025
1734780
format
xbasel Jul 16, 2025
2f57736
release keys_with_vola_items
xbasel Jul 16, 2025
a7b6b02
fix bugs
xbasel Jul 16, 2025
6d3adf7
don't run cluster mode tests in hash field tests on external engines
xbasel Jul 16, 2025
cf005ac
comments
xbasel Jul 16, 2025
d855272
disable flaky tests
xbasel Jul 16, 2025
59e8ed4
tidy up
xbasel Jul 16, 2025
f9d462b
Update src/defrag.c
xbasel Jul 16, 2025
0fa2b8b
Update src/defrag.c
xbasel Jul 16, 2025
283f6e0
Update src/defrag.c
xbasel Jul 16, 2025
e01d3fb
tidy up
xbasel Jul 16, 2025
01970bb
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 16, 2025
d465f01
Update src/t_hash.c
xbasel Jul 16, 2025
3c50fe8
tidy up
xbasel Jul 16, 2025
e21b91e
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 16, 2025
f198640
Update src/t_hash.c
xbasel Jul 16, 2025
6330c07
tidy up
xbasel Jul 16, 2025
ae783ca
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 16, 2025
86b7068
remove redundant instruction
xbasel Jul 16, 2025
fc3de10
optimize
xbasel Jul 16, 2025
1e44a43
Added Changes from ttl-poc-new branch
Jul 17, 2025
019e9e7
Deflake monitor test
Jul 17, 2025
925c1c7
Added keys_with_volatile_items checks
Jul 17, 2025
fa95f92
code review
xbasel Jul 17, 2025
44ab3c6
use hashtableReplaceReallocatedEntry
xbasel Jul 17, 2025
eec6ed4
fixes
xbasel Jul 17, 2025
f2da2f2
fixes
xbasel Jul 17, 2025
c466290
remove return value
xbasel Jul 17, 2025
34cd30e
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 17, 2025
276327a
fix uninitialized variable
xbasel Jul 17, 2025
b46a465
add missing new line
xbasel Jul 17, 2025
f4f403b
Update src/db.c
xbasel Jul 17, 2025
89017c7
tidy up
xbasel Jul 17, 2025
eda8866
remove redundant code
xbasel Jul 17, 2025
80e2f27
refactor key notification
xbasel Jul 17, 2025
af74624
disable test step (we'll fix it later, its not essential)
xbasel Jul 17, 2025
be86a2c
remove redundant code
xbasel Jul 17, 2025
7ad0462
format
xbasel Jul 17, 2025
ccfcd44
code review fixes
xbasel Jul 20, 2025
db66b29
remove inline
xbasel Jul 20, 2025
5568d48
move db1->keys_with_volatile_items_cursor to activeExpireFieldIterator
xbasel Jul 20, 2025
6b48925
flat loop
xbasel Jul 20, 2025
50d9e54
tidy up
xbasel Jul 21, 2025
cf9bda8
fix slow epxiry bug, don't jump DBs if there's likely more entries to
xbasel Jul 21, 2025
a1e239a
Simplify expire.c loop
xbasel Jul 21, 2025
3ef99ab
defrag -> defrag volatile sets when itertaing on db->keys
xbasel Jul 21, 2025
5a4e887
remove redudant code
xbasel Jul 21, 2025
b8f2704
remove unused code
xbasel Jul 21, 2025
a5a7810
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 21, 2025
e2e5e79
dbAddInternal is called only when the hash object doesn't exist, so its
xbasel Jul 21, 2025
1e2b9b8
Remove redundant code
xbasel Jul 21, 2025
e8bb2c8
clean up
xbasel Jul 21, 2025
be5e704
don't case delta to unasigned
xbasel Jul 21, 2025
8e5fbc4
clean up
xbasel Jul 21, 2025
31082a9
code review
xbasel Jul 22, 2025
a8ad390
Remove db dependency from hash functions
xbasel Jul 22, 2025
43a0784
formatting
xbasel Jul 22, 2025
b340319
Refactor hash defrag and put outside defrag.c
xbasel Jul 22, 2025
6b040a8
More changes
xbasel Jul 22, 2025
07d0329
code review 2
xbasel Jul 23, 2025
8dd7493
relocate hashTypeReclaimExpiredFields to expire.c
xbasel Jul 23, 2025
4b1419d
refactor
xbasel Jul 23, 2025
8ad8175
code review
xbasel Jul 24, 2025
5dcbfec
Remove dbTrackKeyWithVolatileItemsIfNeeded
xbasel Jul 24, 2025
b7ec7b5
tidy up
xbasel Jul 24, 2025
2ce8777
review with Ran
xbasel Jul 24, 2025
d72d639
fixes
xbasel Jul 24, 2025
67a5eb1
format
xbasel Jul 24, 2025
68afa3f
format
xbasel Jul 24, 2025
2ab4785
format
xbasel Jul 24, 2025
6ebda3a
fix vset unit tests (shouldn't test defrag if defrag isn't enable)
xbasel Jul 24, 2025
9487798
Update src/expire.c
xbasel Jul 27, 2025
14fac35
code review
xbasel Jul 27, 2025
abceb06
Update src/expire.c
xbasel Jul 27, 2025
163c486
changes with Ran
xbasel Jul 27, 2025
9f7e18f
Merge remote-tracking branch 'xbasel/field_activeexpiry' into field_a…
xbasel Jul 27, 2025
07b3ca5
more changes
xbasel Jul 27, 2025
ba582ff
format
xbasel Jul 27, 2025
bfcf811
fix comments
xbasel Jul 27, 2025
dacefdb
fix bug in defrag
xbasel Jul 28, 2025
58aba79
code review
xbasel Jul 28, 2025
3561a47
more changes
xbasel Jul 28, 2025
bdc0b98
remove ifdef from unit test
xbasel Jul 28, 2025
b80e8e2
Merge remote-tracking branch 'ranshid/introduce-volatile-set' into fi…
xbasel Jul 28, 2025
7857eca
Apply suggestions from code review
ranshid Jul 28, 2025
3a1d65b
Update src/server.c
xbasel Jul 28, 2025
63337d2
Use getMonotonicUs in activeExpireCycle for faster timing (RDTSC vs g…
xbasel Jul 28, 2025
848ed75
find ./tests -type f -exec sed -i 's/\bexpired_subkeys\b/expired_fiel…
xbasel Jul 29, 2025
19efe45
small refactor following PR comments
ranshid Jul 30, 2025
1a6ff14
document getExpirationPolicyWithFlags
ranshid Jul 30, 2025
475a8e1
fix bug when deleting hash table entry at pos 6
xbasel Jul 30, 2025
0577eb5
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Jul 30, 2025
4a2d5b7
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Jul 30, 2025
651de13
address more PR comments
ranshid Jul 30, 2025
66f7fa9
fix format check
ranshid Jul 30, 2025
9f7cac1
fix yet another format issue
ranshid Jul 30, 2025
9fef365
2 fixes:
ranshid Jul 31, 2025
b813588
keep the original statistic name.
ranshid Jul 31, 2025
eb1d14e
place fast cycle back to back prevention in activeExpireCycle
ranshid Jul 31, 2025
df684f3
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Aug 3, 2025
9054990
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Aug 3, 2025
c938dbd
refactor active expiry:
ranshid Aug 3, 2025
cbb225f
small followup refactor
ranshid Aug 3, 2025
59223fc
fix comment
ranshid Aug 4, 2025
b712220
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Aug 4, 2025
7b0ea56
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Aug 4, 2025
77c0dca
fix active expiration after last merge
ranshid Aug 4, 2025
da5998b
Merge remote-tracking branch 'valkey-fork/introduce-volatile-set' int…
ranshid Aug 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1957,7 +1957,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
hashTypeIterator hi;
long long count = 0, volatile_items = 0, non_volatile_items;
/* First serialize volatile items if exist */
if (hashTypeHasVolatileElements(o)) {
if (hashTypeHasVolatileFields(o)) {
hashTypeInitVolatileIterator(o, &hi);
while (hashTypeNext(&hi) != C_ERR) {
long long expiry = entryGetExpiry(hi.next);
Expand Down
171 changes: 161 additions & 10 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,18 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
return o;
}

/* For hash keys, checks if they contain volatile items and updates tracking accordingly.
* Always accesses the tracking kvstore, even if the tracking state doesn't change. */
void dbUpdateObjectWithVolatileItemsTracking(serverDb *db, robj *o) {
if (o->type == OBJ_HASH) {
if (hashTypeHasVolatileFields(o)) {
dbTrackKeyWithVolatileItems(db, o);
} else {
dbUntrackKeyWithVolatileItems(db, o);
}
}
}

/* Add a key-value entry to the DB.
*
* A copy of 'key' is stored in the database. The caller must ensure the
Expand Down Expand Up @@ -217,6 +229,9 @@ static void dbAddInternal(serverDb *db, robj *key, robj **valref, int update_if_
/* Not existing. Convert val to valkey object and insert. */
robj *val = *valref;
val = objectSetKeyAndExpire(val, key->ptr, -1);
/* Track hash object if it has volatile fields (for active expiry).
* For example, this is needed when a hash is moved to a new DB (e.g. MOVE). */
dbTrackKeyWithVolatileItems(db, val);
initObjectLRUOrLFU(val);
kvstoreHashtableAdd(db->keys, dict_index, val);
signalKeyAsReady(db, key, val->type);
Expand Down Expand Up @@ -284,6 +299,10 @@ int dbAddRDBLoad(serverDb *db, sds key, robj **valref) {
val = objectSetKeyAndExpire(val, key, -1);
kvstoreHashtableInsertAtPosition(db->keys, dict_index, val, &pos);
initObjectLRUOrLFU(val);

/* Track hash objects containing volatile items, created by rdbLoadObject (which lacks DB context). */
dbTrackKeyWithVolatileItems(db, val);

*valref = val;
return 1;
}
Expand Down Expand Up @@ -483,6 +502,11 @@ int dbGenericDeleteWithDictIndex(serverDb *db, robj *key, int async, int flags,
debugServerAssert(0 == kvstoreHashtableDelete(db->expires, dict_index, key->ptr));
}

/* If deleting a hash object, un-track it from the volatile items tracking if it contains volatile items.*/
if (val->type == OBJ_HASH && hashTypeHasVolatileFields(val)) {
dbUntrackKeyWithVolatileItems(db, val);
}

if (async) {
freeObjAsync(key, val, db->id);
} else {
Expand All @@ -501,6 +525,20 @@ int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
return dbGenericDeleteWithDictIndex(db, key, async, flags, dict_index);
}

/* Add a key with volatile items to the tracking kvstore. */
void dbTrackKeyWithVolatileItems(serverDb *db, robj *o) {
if (o->type == OBJ_HASH && hashTypeHasVolatileFields(o)) {
int dict_index = getKVStoreIndexForKey(objectGetKey(o));
kvstoreHashtableAdd(db->keys_with_volatile_items, dict_index, o);
}
}

/* Delete a key from the keys with volatile entries tracking kvstore */
void dbUntrackKeyWithVolatileItems(serverDb *db, robj *o) {
int dict_index = getKVStoreIndexForKey(objectGetKey(o));
kvstoreHashtableDelete(db->keys_with_volatile_items, dict_index, objectGetKey(o));
}

/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(serverDb *db, robj *key) {
return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED);
Expand Down Expand Up @@ -556,6 +594,17 @@ robj *dbUnshareStringValue(serverDb *db, robj *key, robj *o) {
return o;
}

/* Reset the expiry tracking state of a database.
*
* This clears the `expiry` array, which holds per-expiry-type
* data such as average TTL (for stats) and scan cursors used by
* the active expiration cycle.
*
* Should be called whenever the database is emptied or reinitialized. */
void resetDbExpiryState(serverDb *db) {
memset(db->expiry, 0, sizeof(db->expiry));
}

/* Remove all keys from the database(s) structure. The dbarray argument
* may not be the server main DBs (could be a temporary DB).
*
Expand All @@ -582,10 +631,10 @@ long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callba
} else {
kvstoreEmpty(dbarray[j]->keys, callback);
kvstoreEmpty(dbarray[j]->expires, callback);
kvstoreEmpty(dbarray[j]->keys_with_volatile_items, callback);
}
/* Because all keys of database are removed, reset average ttl. */
dbarray[j]->avg_ttl = 0;
dbarray[j]->expires_cursor = 0;
resetDbExpiryState(dbarray[j]);
}

return removed;
Expand Down Expand Up @@ -651,6 +700,7 @@ void discardTempDb(serverDb **tempDb) {
if (tempDb[i]) {
kvstoreRelease(tempDb[i]->keys);
kvstoreRelease(tempDb[i]->expires);
kvstoreRelease(tempDb[i]->keys_with_volatile_items);

/* These are expected to be empty on temporary databases */
serverAssert(dictSize(tempDb[i]->blocking_keys) == 0);
Expand Down Expand Up @@ -1628,6 +1678,15 @@ void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with) {
dictReleaseIterator(di);
}

/* Copy expiry tracking state from one DB to another.
*
* This copies the `expiry` array, which contains per-expiry-type
* metadata such as the average TTL (for stats) and the active
* expiry scan cursor. */
static void copyDbExpiry(serverDb *target, const serverDb *source) {
memcpy(target->expiry, source->expiry, sizeof(target->expiry));
}

/* Swap two databases at runtime so that all clients will magically see
* the new database even if already connected. Note that the client
* structure c->db points to a given DB, so we need to be smarter and
Expand Down Expand Up @@ -1657,13 +1716,14 @@ int dbSwapDatabases(int id1, int id2) {
* remain in the same DB they were. */
db1->keys = db2->keys;
db1->expires = db2->expires;
db1->avg_ttl = db2->avg_ttl;
db1->expires_cursor = db2->expires_cursor;
db1->keys_with_volatile_items = db2->keys_with_volatile_items;
copyDbExpiry(db1, db2);


db2->keys = aux.keys;
db2->expires = aux.expires;
db2->avg_ttl = aux.avg_ttl;
db2->expires_cursor = aux.expires_cursor;
db2->keys_with_volatile_items = aux.keys_with_volatile_items;
copyDbExpiry(db2, &aux);

/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
Expand Down Expand Up @@ -1702,13 +1762,13 @@ void swapMainDbWithTempDb(serverDb **tempDb) {
* remain in the same DB they were. */
activedb->keys = newdb->keys;
activedb->expires = newdb->expires;
activedb->avg_ttl = newdb->avg_ttl;
activedb->expires_cursor = newdb->expires_cursor;
activedb->keys_with_volatile_items = newdb->keys_with_volatile_items;
copyDbExpiry(activedb, newdb);

newdb->keys = aux.keys;
newdb->expires = aux.expires;
newdb->avg_ttl = aux.avg_ttl;
newdb->expires_cursor = aux.expires_cursor;
newdb->keys_with_volatile_items = aux.keys_with_volatile_items;
copyDbExpiry(newdb, &aux);

/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
Expand Down Expand Up @@ -1786,7 +1846,15 @@ robj *setExpire(client *c, serverDb *db, robj *key, long long when) {
serverAssertWithInfo(NULL, key, valref != NULL);
val = *valref;
long long old_when = objectGetExpire(val);

robj *newval = objectSetExpire(val, when);
if (newval->type == OBJ_HASH && hashTypeHasVolatileFields(newval)) {
/* Replace the pointer in the keys_with_volatile_items table without accessing the old pointer. */
int dict_index = getKVStoreIndexForKey(objectGetKey(newval));
hashtable *volatile_items_ht = kvstoreGetHashtable(db->keys_with_volatile_items, dict_index);
int replaced = hashtableReplaceReallocatedEntry(volatile_items_ht, val, newval);
serverAssert(replaced);
}
if (old_when != -1) {
/* Val already had an expire field, so it was not reallocated. */
serverAssert(newval == val);
Expand Down Expand Up @@ -1890,6 +1958,89 @@ void propagateDeletion(serverDb *db, robj *key, int lazy) {
server.replication_allowed = prev_replication_allowed;
}

static const size_t EXPIRE_BULK_LIMIT = 1024; /* Maximum number of fields to active-expire (per replicated HDEL command */

/* Propagate HDEL commands for deleted hash fields to AOF and replicas.
*
* This function builds and propagates a single HDEL command with multiple fields
* for the given hash object `o`. It temporarily enables replication (if needed),
* constructs the command using the field names, and sends it via alsoPropagate(). */
static void propagateFieldsDeletion(serverDb *db, robj *o, size_t n_fields, robj *fields[]) {
int prev_replication_allowed = server.replication_allowed;
server.replication_allowed = 1;

robj *argv[EXPIRE_BULK_LIMIT + 2]; /* HDEL + key + fields */
int argc = 0;
robj *keyobj = createStringObjectFromSds(objectGetKey(o));
argv[argc++] = shared.hdel; // HDEL command
argv[argc++] = keyobj; // key name
for (size_t i = 0; i < n_fields; i++) {
// field to delete
argv[argc++] = fields[i];
}

alsoPropagate(db->id, argv, argc, PROPAGATE_AOF | PROPAGATE_REPL);
server.replication_allowed = prev_replication_allowed;
for (int i = 0; i < argc; i++) {
decrRefCount(argv[i]);
}
}

/* Process expired fields for a hash delete them and propagate changes to replicas and AOF.
*
* This routine:
* - iteratively identifies expired hash fields from the volatile set (batching up to 1024 at a time)
* - deletes the expired fields
* - deletes the entire key if the hash becomes empty
* - propagates HDEL commands for deleted fields if the key remains, or DEL if the key is fully deleted
*
* Batching avoids large stack allocations while allowing max_entries to be arbitrarily large.
* Returns the total number of expired fields removed. */
size_t dbReclaimExpiredFields(robj *o, serverDb *db, mstime_t now, unsigned long max_entries) {
size_t total_expired = 0;
bool deleteKey = false;

while (max_entries > 0) {
/* Process in batches to avoid large stack allocations. */
unsigned long batch_size = max_entries > EXPIRE_BULK_LIMIT ? EXPIRE_BULK_LIMIT : max_entries;
robj *entries[EXPIRE_BULK_LIMIT];
size_t expired = hashTypeDeleteExpiredFields(o, now, batch_size, entries);
if (expired == 0) break;

/* Clean up volatile set if no more volatile fields remain */
if (!hashTypeHasVolatileFields(o)) {
dbUntrackKeyWithVolatileItems(db, o);
}

/* Check if key is now empty after removing expired fields */
deleteKey = hashTypeLength(o) == 0;

enterExecutionUnit(1, 0);
robj *keyobj = createStringObjectFromSds(objectGetKey(o));
/* Note that even though if might have been more efficient to only propagate del in case the key has no more items left,
* we must keep consistency in order to allow the replica to report hdel notifications before del. */
propagateFieldsDeletion(db, o, expired, entries);
notifyKeyspaceEvent(NOTIFY_EXPIRED, "hexpired", keyobj, db->id);
if (deleteKey) {
dbDelete(db, keyobj);
propagateDeletion(db, keyobj, server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyobj, db->id);
} else {
if (!hashTypeHasVolatileFields(o)) dbUntrackKeyWithVolatileItems(db, o);
}
signalModifiedKey(NULL, db, keyobj);
exitExecutionUnit();
postExecutionUnitOperations();
decrRefCount(keyobj);

total_expired += expired;
max_entries -= expired;
if (deleteKey) break; /* Stop if key was deleted */
}

return total_expired;
}

/* Use this instead of keyIsExpired if you already have the value object. */
static int objectIsExpired(robj *val) {
/* Don't expire anything while loading. It will be done later. */
Expand Down
Loading
Loading