Skip to content

Commit b3bcf4e

Browse files
committed
active-expire: add field-level TTL expiry with alternating cycle prioritization
This change introduces an active expiration mechanism for hash fields with TTLs, in addition to the existing key-level expiry logic. Field TTL information is tracked in volatile sets attached to each hash. A new activeExpireCycleFields() function incrementally scans these volatile sets to remove expired fields, reclaiming memory proactively. Instead of unifying key and field expiry into a single scanning loop, we chose to keep them as separate mechanisms. The primary reason is that their iteration models and expiry pacing are fundamentally different: - key expiry operates on db->key space with randomized sampling; - field expiry operates on db->key->volatile_set with per-key iteration. Because of this structural and pacing difference, a unified algorithm would be complex and could introduce subtle correctness and fairness bugs. By alternating between key expiry and field expiry each cycle, we maintain fairness while staying within the existing effort budget, ensuring consistent latency and predictable CPU usage. +-----------------+ | DB | +-----------------+ | v +---------------------+ | myhash | (key with TTL) +---------------------+ | v +------------------------------------+ | fields (hashType) | | - field1 | | - field2 | | - fieldN | +------------------------------------+ | v +------------------------------------+ | volatile set (field-level TTL) | | - field1 expires at T1 | | - field5 expires at T5 | +------------------------------------+ Key expiry operates at the DB->key level, while field expiry operates at the DB->key->volatile_set level, justifying their separate logic. No new configuration was introduced; the existing active-expire-effort and time budget are reused for both key and field expiry. Signed-off-by: xbasel <[email protected]>
1 parent 17bea7f commit b3bcf4e

File tree

12 files changed

+412
-97
lines changed

12 files changed

+412
-97
lines changed

src/db.c

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,14 @@ int dbGenericDeleteWithDictIndex(serverDb *db, robj *key, int async, int flags,
483483
debugServerAssert(0 == kvstoreHashtableDelete(db->expires, dict_index, key->ptr));
484484
}
485485

486+
/* If deleting a hash object, remove TODO*/
487+
if (val->type == OBJ_HASH && val->encoding == OBJ_ENCODING_HASHTABLE) {
488+
kvstoreHashtableDelete(db->keys_with_volatile_items, dict_index, key->ptr);
489+
} else {
490+
debugServerAssert(0 == kvstoreHashtableDelete(db->keys_with_volatile_items, dict_index, key->ptr));
491+
}
492+
493+
486494
if (async) {
487495
freeObjAsync(key, val, db->id);
488496
} else {
@@ -501,6 +509,18 @@ int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
501509
return dbGenericDeleteWithDictIndex(db, key, async, flags, dict_index);
502510
}
503511

512+
/* Add a volatile key for a hashtable with volatile fields */
513+
int dbAddVolatileKey(serverDb *db, robj *key) {
514+
int dict_index = getKVStoreIndexForKey(key->ptr);
515+
return kvstoreHashtableAdd(db->keys_with_volatile_items, dict_index, key);
516+
}
517+
518+
/* Delete a volatile key for a hash key which no longer has field with ttl attached */
519+
int dbDeleteVolatileKey(serverDb *db, robj *key) {
520+
int dict_index = getKVStoreIndexForKey(key->ptr);
521+
return kvstoreHashtableDelete(db->keys_with_volatile_items, dict_index, objectGetKey(key));
522+
}
523+
504524
/* Delete a key, value, and associated expiration entry if any, from the DB */
505525
int dbSyncDelete(serverDb *db, robj *key) {
506526
return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED);
@@ -582,6 +602,7 @@ long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callba
582602
} else {
583603
kvstoreEmpty(dbarray[j]->keys, callback);
584604
kvstoreEmpty(dbarray[j]->expires, callback);
605+
kvstoreEmpty(dbarray[j]->keys_with_volatile_items, callback);
585606
}
586607
/* Because all keys of database are removed, reset average ttl. */
587608
dbarray[j]->avg_ttl = 0;
@@ -1550,7 +1571,7 @@ void copyCommand(client *c) {
15501571
case OBJ_LIST: newobj = listTypeDup(o); break;
15511572
case OBJ_SET: newobj = setTypeDup(o); break;
15521573
case OBJ_ZSET: newobj = zsetDup(o); break;
1553-
case OBJ_HASH: newobj = hashTypeDup(o); break;
1574+
case OBJ_HASH: newobj = hashTypeDup(c->db, o); break;
15541575
case OBJ_STREAM: newobj = streamDup(o); break;
15551576
case OBJ_MODULE:
15561577
newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o);

src/expire.c

Lines changed: 199 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
* SPDX-License-Identifier: BSD-3-Clause
3636
*/
3737

38+
#include <assert.h>
39+
3840
#include "server.h"
3941

4042
/*-----------------------------------------------------------------------------
@@ -149,6 +151,17 @@ void expireScanCallback(void *privdata, void *entry) {
149151
data->sampled++;
150152
}
151153

154+
int hashTypeExpireEntry(void *db, void *o, void *entry);
155+
156+
void fieldExpireScanCallback(void *privdata, void *volaKey) {
157+
activeExpireFieldIterator *iter = privdata;
158+
serverAssert(volaKey);
159+
serverAssert(hashTypeHasVolatileElements(volaKey));
160+
iter->current_key = volaKey;
161+
incrRefCount(iter->current_key);
162+
assert(hashTypeHasVolatileElements(iter->current_key));
163+
}
164+
152165
static inline int isExpiryTableValidForSamplingCb(hashtable *ht) {
153166
long long numkeys = hashtableSize(ht);
154167
unsigned long buckets = hashtableBuckets(ht);
@@ -161,16 +174,116 @@ static inline int isExpiryTableValidForSamplingCb(hashtable *ht) {
161174
return C_OK;
162175
}
163176

164-
void activeExpireCycle(int type) {
177+
static inline int activeExpireFieldsCheckTimeLimitReached(
178+
unsigned int *iterations,
179+
uint64_t start_us,
180+
uint64_t limit_us,
181+
uint64_t *now_us) {
182+
if (((*iterations)++ & 0xf) == 0) {
183+
*now_us = ustime();
184+
185+
}
186+
return (*now_us - start_us >= limit_us);
187+
}
188+
189+
190+
void advanceDb(activeExpireFieldIterator *it) {
191+
it->current_db++;
192+
if (it->current_db>=server.dbnum) {
193+
it->current_db = 0;
194+
it->db_cursor = 0;
195+
}
196+
it->current_key = NULL;
197+
}
198+
199+
static inline int effort(void) {
200+
return server.active_expire_effort - 1;
201+
}
202+
203+
void hashKeyDone(activeExpireFieldIterator *it) {
204+
serverAssert(it->current_key);
205+
serverAssert(it->current_key->refcount >= 1);
206+
decrRefCount(it->current_key);
207+
it->current_key = NULL;
208+
}
209+
210+
vset *hashTypeGetVolatileSet(robj *o);
211+
212+
/*
213+
* activeExpireCycleFields
214+
*
215+
* This function incrementally expires hash fields that use field-level TTL
216+
* stored in volatile sets. It traverses all databases, scanning keys
217+
* known to hold volatile fields, and then iterates those fields to reclaim
218+
* memory for logically expired elements that were not accessed by clients.
219+
*
220+
* Field expiry is performed within a strict time budget and an entries-per-loop
221+
* limit to protect latency and CPU usage. An activeExpireFieldIterator tracks
222+
* which key and volatile set are currently being processed. Expired fields are
223+
* removed, and if the hash becomes empty, the parent key is deleted as well.
224+
*
225+
*/
226+
void activeExpireCycleFields(int type, unsigned long entries_per_call, long long time_limit_us) {
227+
if (type != ACTIVE_EXPIRE_CYCLE_SLOW) return;
228+
if (!server.active_expire_enabled || !iAmPrimary() || server.dbnum == 0) return;
229+
230+
unsigned int iterations = 0;
231+
uint64_t start = ustime();
232+
uint64_t now = start;
233+
activeExpireFieldIterator *it = &server.active_expire_field_iterator;
234+
int dbs_performed = 0;
235+
236+
while (dbs_performed < CRON_DBS_PER_CALL && !activeExpireFieldsCheckTimeLimitReached(
237+
&iterations, start, time_limit_us, &now)) {
238+
serverDb *db = server.db[it->current_db];
239+
if (!db || kvstoreSize(db->keys_with_volatile_items) == 0) {
240+
advanceDb(it);
241+
dbs_performed++;
242+
continue;
243+
}
244+
245+
size_t entries_processed = 0;
246+
while (entries_processed < entries_per_call && !activeExpireFieldsCheckTimeLimitReached(
247+
&iterations, start, time_limit_us, &now)) {
248+
if (!it->current_key) {
249+
it->db_cursor = kvstoreScan(db->keys_with_volatile_items, it->db_cursor, -1, fieldExpireScanCallback,
250+
isExpiryTableValidForSamplingCb, it);
251+
} else if (it->current_key->refcount == 1) {
252+
hashKeyDone(it);
253+
}
254+
255+
if (it->current_key) {
256+
size_t expired = activeExpireFieldProcessKey(it->current_key, db, (mstime_t) (now / 1000),
257+
entries_per_call);
258+
entries_processed += expired;
259+
bool hasMore = hashTypeHasVolatileElements(it->current_key);
260+
if (!hasMore || expired < entries_per_call) {
261+
hashKeyDone(it);
262+
}
263+
}
264+
265+
if (!it->current_key && it->db_cursor == 0) {
266+
advanceDb(it);
267+
dbs_performed++;
268+
break;
269+
}
270+
}
271+
}
272+
273+
if (activeExpireFieldsCheckTimeLimitReached(&iterations, start, time_limit_us, &now)) {
274+
server.stat_expired_time_cap_reached_count++;
275+
}
276+
}
277+
278+
279+
void activeExpireCycleKeys(int type, unsigned long config_keys_per_loop, long long timelimit) {
165280
/* Adjust the running parameters according to the configured expire
166281
* effort. The default effort is 1, and the maximum configurable effort
167282
* is 10. */
168-
unsigned long effort = server.active_expire_effort - 1, /* Rescale from 0 to 9. */
169-
config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP / 4 * effort,
170-
config_cycle_fast_duration =
171-
ACTIVE_EXPIRE_CYCLE_FAST_DURATION + ACTIVE_EXPIRE_CYCLE_FAST_DURATION / 4 * effort,
172-
config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC + 2 * effort,
173-
config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE - effort;
283+
284+
unsigned long config_cycle_fast_duration =
285+
ACTIVE_EXPIRE_CYCLE_FAST_DURATION + ACTIVE_EXPIRE_CYCLE_FAST_DURATION / 4 * effort();
286+
unsigned long config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE - effort();
174287

175288
/* This function has some global state in order to continue the work
176289
* incrementally across calls. */
@@ -181,7 +294,7 @@ void activeExpireCycle(int type) {
181294
int j, iteration = 0;
182295
int dbs_per_call = CRON_DBS_PER_CALL;
183296
int dbs_performed = 0;
184-
long long start = ustime(), timelimit, elapsed;
297+
long long start = ustime(), elapsed;
185298

186299
/* If 'expire' action is paused, for whatever reason, then don't expire any key.
187300
* Typically, at the end of the pause we will properly expire the key OR we
@@ -209,13 +322,8 @@ void activeExpireCycle(int type) {
209322
* expired keys to use memory for too much time. */
210323
if (dbs_per_call > server.dbnum || timelimit_exit) dbs_per_call = server.dbnum;
211324

212-
/* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
213-
* time per iteration. Since this function gets called with a frequency of
214-
* server.hz times per second, the following is the max amount of
215-
* microseconds we can spend in this function. */
216-
timelimit = config_cycle_slow_time_perc * 1000000 / server.hz / 100;
325+
217326
timelimit_exit = 0;
218-
if (timelimit <= 0) timelimit = 1;
219327

220328
if (type == ACTIVE_EXPIRE_CYCLE_FAST) timelimit = config_cycle_fast_duration; /* in microseconds. */
221329

@@ -376,6 +484,83 @@ void activeExpireCycle(int type) {
376484
server.stat_expired_stale_perc = (current_perc * 0.05) + (server.stat_expired_stale_perc * 0.95);
377485
}
378486

487+
/* expiryDriver abstracts expiry routines with a unified signature,
488+
* allowing activeExpireCycle to alternate keys and fields cleanly. */
489+
typedef void expiryDriver(int type, unsigned long entries_per_loop, long long timelimit);
490+
491+
/*
492+
* activeExpireCycle
493+
*
494+
* This function performs active expiration of both normal keys (with TTL)
495+
* and hash fields (with field-level TTL via volatile sets). Its purpose is to
496+
* reclaim memory from logically expired entries.
497+
*
498+
* The expiry is performed incrementally over multiple databases, respecting
499+
* a CPU time budget derived from the configured active-expire-effort.
500+
*
501+
* There are two separate expiry mechanisms for keys and for hash fields
502+
* because their iteration models are fundamentally different:
503+
* - key expiry operates on db->key entries, scanning random keys
504+
* with attached TTL entries.
505+
* - field expiry operates on db->key->volatile_set entries, scanning
506+
* fields within a hash that each have their own TTL.
507+
* This hierarchy and lookup pattern are entirely different, requiring
508+
* separate cursors, iteration logic, and data structure handling.
509+
*
510+
* The function uses an alternating scheme across event loop cycles: on one
511+
* cycle it will prioritize key expiry first, then hash field expiry if time
512+
* permits; on the next cycle, it will prioritize hash field expiry first,
513+
* then key expiry if time permits. This ensures fairness and prevents
514+
* starvation of either mechanism. Since the memory reclaim pace and iteration
515+
* model of keys versus hash fields are different and unpredictable,
516+
* alternating naturally balances the overall expiry effort when both are
517+
* fully consuming their available time budget.
518+
*
519+
* Note that field expiry is only performed during the slow iteration cycles,
520+
* as it is not scheduled to run in the fast cycle.
521+
*/
522+
void activeExpireCycle(int type) {
523+
524+
/* Adjust the running parameters according to the configured expire
525+
* effort. The default effort is 1, and the maximum configurable effort
526+
* is 10. */
527+
unsigned long config_keys_per_loop =
528+
ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP / 4 * effort();
529+
unsigned long config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC + 2 * effort();
530+
531+
532+
static int expireCycleStartWithFields = 0;
533+
534+
/* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
535+
* time per iteration. Since this function gets called with a frequency of
536+
* server.hz times per second, the following is the max amount of
537+
* microseconds we can spend in this function. */
538+
long long timelimit = config_cycle_slow_time_perc * 1000000 / server.hz / 100;
539+
540+
if (timelimit <= 0) timelimit = 1;
541+
542+
expiryDriver *first, *second;
543+
544+
if (expireCycleStartWithFields) {
545+
first = activeExpireCycleFields;
546+
second = activeExpireCycleKeys;
547+
} else {
548+
first = activeExpireCycleKeys;
549+
second = activeExpireCycleFields;
550+
}
551+
552+
long long start = ustime();
553+
first(type, config_keys_per_loop, timelimit);
554+
long long elapsed = ustime() - start;
555+
556+
if (elapsed < timelimit) {
557+
second(type, config_keys_per_loop, timelimit);
558+
}
559+
560+
expireCycleStartWithFields = !expireCycleStartWithFields;
561+
562+
}
563+
379564
/*-----------------------------------------------------------------------------
380565
* Expires of keys created in writable replicas
381566
*

src/lazyfree.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ void lazyfreeFreeObject(void *args[]) {
2424
void lazyfreeFreeDatabase(void *args[]) {
2525
kvstore *da1 = args[0];
2626
kvstore *da2 = args[1];
27+
kvstore *da3 = args[2];
2728

2829
size_t numkeys = kvstoreSize(da1);
2930
kvstoreRelease(da1);
3031
kvstoreRelease(da2);
32+
kvstoreRelease(da3);
3133
atomic_fetch_sub_explicit(&lazyfree_objects, numkeys, memory_order_relaxed);
3234
atomic_fetch_add_explicit(&lazyfreed_objects, numkeys, memory_order_relaxed);
3335
}
@@ -192,11 +194,12 @@ void emptyDbAsync(serverDb *db) {
192194
slot_count_bits = CLUSTER_SLOT_MASK_BITS;
193195
flags |= KVSTORE_FREE_EMPTY_HASHTABLES;
194196
}
195-
kvstore *oldkeys = db->keys, *oldexpires = db->expires;
197+
kvstore *oldkeys = db->keys, *oldexpires = db->expires, *oldkeyswithexpires = db->keys_with_volatile_items;
196198
db->keys = kvstoreCreate(&kvstoreKeysHashtableType, slot_count_bits, flags);
197199
db->expires = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags);
200+
db->keys_with_volatile_items = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags);
198201
atomic_fetch_add_explicit(&lazyfree_objects, kvstoreSize(oldkeys), memory_order_relaxed);
199-
bioCreateLazyFreeJob(lazyfreeFreeDatabase, 2, oldkeys, oldexpires);
202+
bioCreateLazyFreeJob(lazyfreeFreeDatabase, 3, oldkeys, oldexpires, oldkeyswithexpires);
200203
}
201204

202205
/* Free the key tracking table.

src/module.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5341,7 +5341,7 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
53415341

53425342
/* Handle deletion if value is VALKEYMODULE_HASH_DELETE. */
53435343
if (value == VALKEYMODULE_HASH_DELETE) {
5344-
count += hashTypeDelete(key->value, field->ptr);
5344+
count += hashTypeDelete(key->db, key->value, field->ptr);
53455345
if (flags & VALKEYMODULE_HASH_CFIELDS) decrRefCount(field);
53465346
continue;
53475347
}
@@ -5354,7 +5354,7 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
53545354

53555355
robj *argv[2] = {field, value};
53565356
hashTypeTryConversion(key->value, argv, 0, 1);
5357-
int updated = hashTypeSet(key->value, field->ptr, value->ptr, EXPIRY_NONE, low_flags);
5357+
int updated = hashTypeSet(key->db, key->value, field->ptr, value->ptr, EXPIRY_NONE, low_flags);
53585358
count += (flags & VALKEYMODULE_HASH_COUNT_ALL) ? 1 : updated;
53595359

53605360
/* If CFIELDS is active, SDS string ownership is now of hashTypeSet(),

src/rdb.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2211,7 +2211,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
22112211
}
22122212

22132213
if (rdbtype == RDB_TYPE_HASH_2 && itemexpiry > 0) {
2214-
hashTypeTrackEntry(o, entry);
2214+
// TODO xbasel: We need to add the hashtabele object serverDb->to keys_with_volatile_items
2215+
// the follow invocation will blow up, because o doesn't have the key yet, its created outside this function , seedbAddRDBLoad
2216+
// hashTypeTrackEntry(&server.db[dbid], o, entry);
22152217
}
22162218
}
22172219

@@ -3371,6 +3373,10 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
33713373

33723374
/* Add the new object in the hash table */
33733375
int added = dbAddRDBLoad(db, key, &val);
3376+
if (type == RDB_TYPE_HASH_2) {
3377+
// Mark
3378+
}
3379+
// zzz
33743380
server.rdb_last_load_keys_loaded++;
33753381
if (!added) {
33763382
if (rdbflags & RDBFLAGS_ALLOW_DUP) {

0 commit comments

Comments
 (0)