Skip to content

Commit 9d39b40

Browse files
committed
Revert " partial work. introduce set expirations"
This reverts commit 04f2006.
1 parent 04f2006 commit 9d39b40

File tree

10 files changed

+151
-64
lines changed

10 files changed

+151
-64
lines changed

cmake/Modules/SourceFiles.cmake

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,7 @@ set(VALKEY_SERVER_SRCS
118118
${CMAKE_SOURCE_DIR}/src/server.c
119119
${CMAKE_SOURCE_DIR}/src/logreqres.c
120120
${CMAKE_SOURCE_DIR}/src/entry.c
121-
${CMAKE_SOURCE_DIR}/src/volatile_set.c
122-
${CMAKE_SOURCE_DIR}/src/volatile_hashtable.c)
121+
${CMAKE_SOURCE_DIR}/src/volatile_set.c)
123122

124123

125124
# valkey-cli

src/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ ENGINE_NAME=valkey
423423
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
424424
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
425425
ENGINE_TRACE_OBJ=trace/trace.o trace/trace_commands.o trace/trace_db.o trace/trace_bgsave.o trace/trace_cluster.o trace/trace_server.o trace/trace_aof.o
426-
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o volatile_set.o volatile_hashtable.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
426+
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o volatile_set.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
427427
ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ)
428428
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
429429
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o sds.o util.o sha256.o

src/aof.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1955,7 +1955,7 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
19551955
* The function returns 0 on error, 1 on success. */
19561956
int rewriteHashObject(rio *r, robj *key, robj *o) {
19571957
hashTypeIterator hi;
1958-
long long count = 0, volatile_items = objectNumVolatileElements(o), items = hashTypeLength(o) - volatile_items;
1958+
long long count = 0, volatile_items = hashTypeNumVolatileElements(o), items = hashTypeLength(o) - volatile_items;
19591959

19601960
hashTypeInitIterator(o, &hi);
19611961
while (hashTypeNext(&hi) != C_ERR) {
@@ -1983,7 +1983,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
19831983
hashTypeResetIterator(&hi);
19841984

19851985
/* Now serialize volatile items if exist */
1986-
if (objectHasVolatileElements(o)) {
1986+
if (hashTypeHasVolatileElements(o)) {
19871987
hashTypeInitVolatileIterator(o, &hi);
19881988
while (hashTypeNext(&hi) != C_ERR) {
19891989
long long expiry = entryGetExpiry(hi.next);

src/object.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ void freeZsetObject(robj *o) {
526526
void freeHashObject(robj *o) {
527527
switch (o->encoding) {
528528
case OBJ_ENCODING_HASHTABLE:
529-
objectFreeVolatileSet(o);
529+
hashTypeFreeVolatileSet(o);
530530
hashtableRelease((hashtable *)o->ptr);
531531
break;
532532
case OBJ_ENCODING_LISTPACK: lpFree(o->ptr); break;

src/rdb.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ int rdbSaveObjectType(rio *rdb, robj *o) {
718718
if (o->encoding == OBJ_ENCODING_LISTPACK)
719719
return rdbSaveType(rdb, RDB_TYPE_HASH_LISTPACK);
720720
else if (o->encoding == OBJ_ENCODING_HASHTABLE)
721-
if (objectHasVolatileElements(o))
721+
if (hashTypeHasVolatileElements(o))
722722
return rdbSaveType(rdb, RDB_TYPE_HASH_2);
723723
else
724724
return rdbSaveType(rdb, RDB_TYPE_HASH);
@@ -967,7 +967,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
967967
}
968968
nwritten += n;
969969
/* check if need to add expired time for the hash elements */
970-
int add_expiry = objectHasVolatileElements(o);
970+
int add_expiry = hashTypeHasVolatileElements(o);
971971
hashtableIterator iter;
972972
hashtableInitIterator(&iter, ht, HASHTABLE_ITER_AVOID_ACCESS);
973973
void *next;
@@ -2211,7 +2211,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
22112211
}
22122212

22132213
if (rdbtype == RDB_TYPE_HASH_2 && itemexpiry > 0) {
2214-
objectTrackEntry(o, entry);
2214+
hashTypeTrackEntry(o, entry);
22152215
}
22162216
}
22172217

src/server.c

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -544,20 +544,6 @@ const void *hashtableSubcommandGetKey(const void *element) {
544544
return command->declared_name;
545545
}
546546

547-
const void *hashHashtableTypeGetKey(const void *entry) {
548-
return (const void *)entryGetField(entry);
549-
}
550-
551-
void hashHashtableTypeDestructor(void *entry) {
552-
entryFree(entry);
553-
}
554-
555-
size_t hashHashtableTypeMetadataSize(void) {
556-
return sizeof(void *);
557-
}
558-
559-
extern hashtableElementAccessState hashHashtableTypeAccess(hashtable *ht, void *entry);
560-
561547
/* Generic hash table type where keys are Objects, Values
562548
* dummy pointers. */
563549
dictType objectKeyPointerValueDictType = {
@@ -591,17 +577,7 @@ hashtableType objectHashtableType = {
591577
hashtableType setHashtableType = {
592578
.hashFunction = dictSdsHash,
593579
.keyCompare = hashtableSdsKeyCompare,
594-
.entryDestructor = hashHashtableTypeDestructor,
595-
.getMetadataSize = hashHashtableTypeMetadataSize,
596-
};
597-
598-
hashtableType setWithVolatileItemsHashtableType = {
599-
.hashFunction = dictSdsHash,
600-
.keyCompare = hashtableSdsKeyCompare,
601-
.entryDestructor = hashHashtableTypeDestructor,
602-
.getMetadataSize = hashHashtableTypeMetadataSize,
603-
.accessElement = hashHashtableTypeAccess,
604-
};
580+
.entryDestructor = dictSdsDestructor};
605581

606582
const void *zsetHashtableGetKey(const void *element) {
607583
const zskiplistNode *node = element;
@@ -689,6 +665,19 @@ hashtableType subcommandSetType = {.entryGetKey = hashtableSubcommandGetKey,
689665
.instant_rehashing = 1};
690666

691667
/* Hash type hash table (note that small hashes are represented with listpacks) */
668+
const void *hashHashtableTypeGetKey(const void *entry) {
669+
return (const void *)entryGetField(entry);
670+
}
671+
672+
void hashHashtableTypeDestructor(void *entry) {
673+
entryFree(entry);
674+
}
675+
676+
size_t hashHashtableTypeMetadataSize(void) {
677+
return sizeof(void *);
678+
}
679+
680+
extern hashtableElementAccessState hashHashtableTypeAccess(hashtable *ht, void *entry);
692681

693682
hashtableType hashHashtableType = {
694683
.hashFunction = dictSdsHash,

src/server.h

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2647,7 +2647,6 @@ extern dictType objectKeyPointerValueDictType;
26472647
extern hashtableType objectHashtableType;
26482648
extern dictType objectKeyHeapPointerValueDictType;
26492649
extern hashtableType setHashtableType;
2650-
extern hashtableType setWithVolatileItemsHashtableType;
26512650
extern dictType BenchmarkDictType;
26522651
extern hashtableType zsetHashtableType;
26532652
extern hashtableType kvstoreKeysHashtableType;
@@ -3340,7 +3339,7 @@ unsigned long long dbScan(serverDb *db, unsigned long long cursor, hashtableScan
33403339
/* Set data type */
33413340
robj *setTypeCreate(sds value, size_t size_hint);
33423341
int setTypeAdd(robj *subject, sds value);
3343-
int setTypeAddAux(robj *set, char *str, size_t len, int64_t llval, int str_is_sds, long long expiry);
3342+
int setTypeAddAux(robj *set, char *str, size_t len, int64_t llval, int str_is_sds);
33443343
int setTypeRemove(robj *subject, sds value);
33453344
int setTypeRemoveAux(robj *set, char *str, size_t len, int64_t llval, int str_is_sds);
33463345
int setTypeIsMember(robj *subject, sds value);
@@ -3361,16 +3360,10 @@ robj *setTypeDup(robj *o);
33613360
#define HASH_SET_KEEP_EXPIRY (1 << 2)
33623361
#define HASH_SET_COPY 0
33633362

3364-
/* Objects with volatile items API */
3365-
volatile_set *objectGetVolatileSet(robj *o);
3366-
volatile_set *objectGetOrcreateVolatileSet(robj *o);
3367-
void objectFreeVolatileSet(robj *o);
3368-
void objectTrackEntry(robj *o, void *entry);
3369-
void objectUntrackEntry(robj *o, void *entry);
3370-
void objectTrackUpdateEntry(robj *o, void *old_entry, void *new_entry, long long old_expiry, long long new_expiry);
3371-
int objectHasVolatileElements(robj *o);
3372-
size_t objectNumVolatileElements(robj *o);
3373-
void objectIgnoreTTL(robj *o, int ignore);
3363+
3364+
void hashTypeFreeVolatileSet(robj *o);
3365+
void hashTypeTrackEntry(robj *o, void *entry);
3366+
void hashTypeUntrackEntry(robj *o, void *entry);
33743367

33753368
void hashTypeConvert(robj *o, int enc);
33763369
void hashTypeTryConversion(robj *subject, robj **argv, int start, int end);
@@ -3392,6 +3385,8 @@ robj *hashTypeLookupWriteOrCreate(client *c, robj *key);
33923385
robj *hashTypeGetValueObject(robj *o, sds field);
33933386
int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags);
33943387
robj *hashTypeDup(robj *o);
3388+
int hashTypeHasVolatileElements(robj *o);
3389+
size_t hashTypeNumVolatileElements(robj *o);
33953390

33963391
/* Pub / Sub */
33973392
int pubsubUnsubscribeAllChannels(client *c, int notify);

src/t_hash.c

Lines changed: 121 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,124 @@
4343
#include "entry.h"
4444

4545

46+
int hashTypeExpireEntry(entry *entry);
47+
48+
volatileEntryType hashvolatileEntryType = {
49+
.entryGetKey = (sds(*)(const void *entry))entryGetField,
50+
.getExpiry = (long long (*)(const void *entry))entryGetExpiry,
51+
.expire = hashTypeExpireEntry,
52+
};
53+
54+
/*-----------------------------------------------------------------------------
55+
* Hash type Expiry API
56+
*----------------------------------------------------------------------------*/
57+
58+
static volatile_set *hashTypeGetVolatileSet(robj *o) {
59+
serverAssert(o->encoding == OBJ_ENCODING_HASHTABLE);
60+
return *(volatile_set **)hashtableMetadata(o->ptr);
61+
}
62+
63+
void hashTypeFreeVolatileSet(robj *o) {
64+
volatile_set *set = hashTypeGetVolatileSet(o);
65+
if (set)
66+
freeVolatileSet(set);
67+
}
68+
69+
int hashTypeHasVolatileElements(robj *o) {
70+
return ((o->encoding == OBJ_ENCODING_HASHTABLE) && (hashTypeGetVolatileSet(o) != NULL));
71+
}
72+
73+
size_t hashTypeNumVolatileElements(robj *o) {
74+
if (hashTypeHasVolatileElements(o)) {
75+
return volatileSetNumEntries(hashTypeGetVolatileSet(o));
76+
}
77+
return 0;
78+
}
79+
80+
void hashTypeIgnoreTTL(robj *o, int ignore) {
81+
if (o->encoding == OBJ_ENCODING_HASHTABLE) {
82+
/* prevent placing access function if not needed */
83+
if (!ignore && !hashTypeHasVolatileElements(o)) {
84+
ignore = 0;
85+
}
86+
hashtableSetType(o->ptr, ignore ? &hashHashtableType : &hashWithVolatileItemsHashtableType);
87+
}
88+
}
89+
90+
static volatile_set *
91+
hashTypeGetOrcreateVolatileSet(robj *o) {
92+
serverAssert(o->encoding == OBJ_ENCODING_HASHTABLE);
93+
volatile_set **volatile_set_ref = hashtableMetadata(o->ptr);
94+
if (*volatile_set_ref == NULL) {
95+
*volatile_set_ref = createVolatileSet(&hashvolatileEntryType);
96+
/* serves mainly for optimization. Use type which supports access function only when needed. */
97+
hashTypeIgnoreTTL(o, 0);
98+
}
99+
return *volatile_set_ref;
100+
}
101+
102+
static void hashTypeDeleteVolatileSet(robj *o) {
103+
volatile_set **volatile_set_ref = hashtableMetadata(o->ptr);
104+
freeVolatileSet(*volatile_set_ref);
105+
*volatile_set_ref = NULL;
106+
/* serves mainly for optimization. by changing the hashtable type we can avoid extra function call in hashtable access */
107+
hashTypeIgnoreTTL(o, 1);
108+
}
109+
110+
void hashTypeTrackEntry(robj *o, void *entry) {
111+
volatile_set *set = hashTypeGetOrcreateVolatileSet(o);
112+
serverAssert(volatileSetAddEntry(set, entry, entryGetExpiry(entry)));
113+
}
114+
115+
void hashTypeUntrackEntry(robj *o, void *entry) {
116+
if (!entryHasExpiry(entry)) return;
117+
volatile_set *set = hashTypeGetVolatileSet(o);
118+
debugServerAssert(set);
119+
serverAssert(volatileSetRemoveEntry(set, entry, entryGetExpiry(entry)));
120+
if (volatileSetNumEntries(set) == 0) {
121+
hashTypeDeleteVolatileSet(o);
122+
}
123+
}
124+
125+
static void hashTypeTrackUpdateEntry(robj *o, void *old_entry, void *new_entry, long long old_expiry, long long new_expiry) {
126+
int old_tracked = (old_entry && old_expiry != EXPIRY_NONE);
127+
int new_tracked = (new_entry && new_expiry != EXPIRY_NONE);
128+
/* If entry was not tracked before and not going to be tracked now, we can simply return */
129+
if (!old_tracked && !new_tracked)
130+
return;
131+
132+
volatile_set *set = hashTypeGetOrcreateVolatileSet(o);
133+
debugServerAssert(set);
134+
135+
if (old_tracked && !new_tracked)
136+
serverAssert(volatileSetRemoveEntry(set, old_entry, old_expiry));
137+
else if (new_tracked && !old_tracked)
138+
serverAssert(volatileSetAddEntry(set, new_entry, new_expiry));
139+
else {
140+
volatile_set *set = hashTypeGetVolatileSet(o);
141+
debugServerAssert(set);
142+
serverAssert(volatileSetUpdateEntry(set, old_entry, new_entry, old_expiry, new_expiry) == 1);
143+
}
144+
if (volatileSetNumEntries(set) == 0) {
145+
hashTypeDeleteVolatileSet(o);
146+
}
147+
}
148+
46149
int hashTypeExpireEntry(void *entry) {
47150
// TBD
48151
UNUSED(entry);
49152
return 1;
50153
}
51154

155+
hashtableElementAccessState hashHashtableTypeAccess(hashtable *ht, void *entry) {
156+
UNUSED(ht);
157+
158+
if (!canExpireWithFlags(0, NULL)) return ELEMENT_VALID;
159+
160+
if (!entryIsExpired(entry)) return ELEMENT_VALID;
161+
162+
return ELEMENT_INVALID;
163+
}
52164

53165
/*-----------------------------------------------------------------------------
54166
* Hash type API
@@ -281,7 +393,7 @@ int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags) {
281393

282394
/* We have to ignore the TTL when setting an element. this is mainly in order to be able to update an existing expired
283395
* entry and not have it remain in the hashtable with the same field/value. */
284-
objectIgnoreTTL(o, 1);
396+
hashTypeIgnoreTTL(o, 1);
285397
hashtablePosition position;
286398
void *existing;
287399
if (hashtableFindPositionForInsert(ht, field, &position, &existing)) {
@@ -290,7 +402,7 @@ int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags) {
290402
hashtableInsertAtPosition(ht, entry, &position);
291403
/* In case an expiry is set on the new entry, we need to track it */
292404
if (expiry != EXPIRY_NONE) {
293-
objectTrackEntry(o, entry);
405+
hashTypeTrackEntry(o, entry);
294406
}
295407
} else {
296408
/* exists: replace value */
@@ -307,11 +419,11 @@ int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags) {
307419
int replaced = hashtableReplaceReallocatedEntry(ht, existing, new_entry);
308420
serverAssert(replaced);
309421
}
310-
objectTrackUpdateEntry(o, existing, new_entry, entry_expiry, expiry);
422+
hashTypeTrackUpdateEntry(o, existing, new_entry, entry_expiry, expiry);
311423

312424
update = is_expired ? 0 : 1;
313425
}
314-
objectIgnoreTTL(o, 0);
426+
hashTypeIgnoreTTL(o, 0);
315427
} else {
316428
serverPanic("Unknown hash encoding");
317429
}
@@ -383,7 +495,7 @@ int hashTypeSetExpire(robj *o, sds field, long long expiry, int flag) {
383495
}
384496
}
385497
*entry_ref = entrySetExpiry(current_entry, expiry);
386-
objectTrackUpdateEntry(o, current_entry, *entry_ref, current_expire, expiry);
498+
hashTypeTrackUpdateEntry(o, current_entry, *entry_ref, current_expire, expiry);
387499
return 1;
388500
}
389501
return -2; // we did not find anything to do. return -2
@@ -408,7 +520,7 @@ int hashTypePersist(robj *o, sds field) {
408520
entry *current_entry = *entry_ref;
409521
long long current_expire = entryGetExpiry(current_entry);
410522
if (current_expire != EXPIRY_NONE) {
411-
objectUntrackEntry(o, current_entry);
523+
hashTypeUntrackEntry(o, current_entry);
412524
*entry_ref = entryUpdate(current_entry, NULL, EXPIRY_NONE);
413525
return 1;
414526
}
@@ -441,7 +553,7 @@ int hashTypeDelete(robj *o, sds field) {
441553
void *entry = NULL;
442554
deleted = hashtablePop(ht, field, &entry);
443555
if (deleted) {
444-
objectUntrackEntry(o, entry);
556+
hashTypeUntrackEntry(o, entry);
445557
entryFree(entry);
446558
}
447559
} else {
@@ -486,7 +598,7 @@ void hashTypeInitVolatileIterator(robj *subject, hashTypeIterator *hi) {
486598
if (hi->encoding == OBJ_ENCODING_LISTPACK) {
487599
return;
488600
} else if (hi->encoding == OBJ_ENCODING_HASHTABLE) {
489-
volatileSetStart(objectGetVolatileSet(subject), &hi->viter);
601+
volatileSetStart(hashTypeGetVolatileSet(subject), &hi->viter);
490602
} else {
491603
serverPanic("Unknown hash encoding");
492604
}
@@ -701,7 +813,7 @@ robj *hashTypeDup(robj *o) {
701813
entry *entry = entryCreate(field, sdsdup(value), expiry);
702814
hashtableAdd(ht, entry);
703815
if (expiry != EXPIRY_NONE)
704-
objectTrackEntry(hobj, entry);
816+
hashTypeTrackEntry(hobj, entry);
705817
}
706818
hashTypeResetIterator(&hi);
707819
} else {

0 commit comments

Comments
 (0)