Skip to content

Commit 81f15f9

Browse files
committed
volatile set iterator -> iterate over entires, not buckets
Signed-off-by: xbasel <[email protected]>
1 parent f7f7dcc commit 81f15f9

File tree

4 files changed

+238
-13
lines changed

4 files changed

+238
-13
lines changed

src/unit/test_files.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,9 @@ int test_update_entry_within_same_bucket(int argc, char **argv, int flags);
206206
int test_update_entry_across_buckets(int argc, char **argv, int flags);
207207
int test_bucket_order_by_expiry(int argc, char **argv, int flags);
208208
int test_bucket_lookup_strictly_below(int argc, char **argv, int flags);
209+
int test_iterator_basic(int argc, char **argv, int flags);
210+
int test_iterator_listpack(int argc, char **argv, int flags);
211+
int test_iterator_mixed_buckets(int argc, char **argv, int flags);
209212
int test_ziplistCreateIntList(int argc, char **argv, int flags);
210213
int test_ziplistPop(int argc, char **argv, int flags);
211214
int test_ziplistGetElementAtIndex3(int argc, char **argv, int flags);
@@ -266,8 +269,7 @@ unitTest __test_sha1_c[] = {{"test_sha1", test_sha1}, {NULL, NULL}};
266269
unitTest __test_util_c[] = {{"test_string2ll", test_string2ll}, {"test_string2l", test_string2l}, {"test_ll2string", test_ll2string}, {"test_ld2string", test_ld2string}, {"test_fixedpoint_d2string", test_fixedpoint_d2string}, {"test_version2num", test_version2num}, {"test_reclaimFilePageCache", test_reclaimFilePageCache}, {NULL, NULL}};
267270
unitTest __test_valkey_strtod_c[] = {{"test_valkey_strtod", test_valkey_strtod}, {NULL, NULL}};
268271
unitTest __test_vector_c[] = {{"test_vector", test_vector}, {NULL, NULL}};
269-
unitTest __test_volatile_set_c[] = {{"test_basic", test_basic}, {"test_update_entry", test_update_entry}, {"test_hashtable", test_hashtable}, {"test_promotion", test_promotion}, {"test_demotion_ht_to_single", test_demotion_ht_to_single}, {"test_update_same_bucket_noop", test_update_same_bucket_noop}, {"test_update_entry_within_same_bucket", test_update_entry_within_same_bucket}, {"test_update_entry_across_buckets", test_update_entry_across_buckets}, {NULL, NULL}};
270-
unitTest __test_volatile_set_c[] = {{"test_basic", test_basic}, {"test_update_entry", test_update_entry}, {"test_hashtable", test_hashtable}, {"test_promotion", test_promotion}, {"test_demotion_ht_to_single", test_demotion_ht_to_single}, {"test_update_same_bucket_noop", test_update_same_bucket_noop}, {"test_update_entry_within_same_bucket", test_update_entry_within_same_bucket}, {"test_update_entry_across_buckets", test_update_entry_across_buckets}, {"test_bucket_order_by_expiry", test_bucket_order_by_expiry}, {"test_bucket_lookup_strictly_below", test_bucket_lookup_strictly_below}, {NULL, NULL}};
272+
unitTest __test_volatile_set_c[] = {{"test_basic", test_basic}, {"test_update_entry", test_update_entry}, {"test_hashtable", test_hashtable}, {"test_promotion", test_promotion}, {"test_demotion_ht_to_single", test_demotion_ht_to_single}, {"test_update_same_bucket_noop", test_update_same_bucket_noop}, {"test_update_entry_within_same_bucket", test_update_entry_within_same_bucket}, {"test_update_entry_across_buckets", test_update_entry_across_buckets}, {"test_bucket_order_by_expiry", test_bucket_order_by_expiry}, {"test_bucket_lookup_strictly_below", test_bucket_lookup_strictly_below}, {"test_iterator_basic", test_iterator_basic}, {"test_iterator_listpack", test_iterator_listpack}, {"test_iterator_mixed_buckets", test_iterator_mixed_buckets}, {NULL, NULL}};
271273
unitTest __test_ziplist_c[] = {{"test_ziplistCreateIntList", test_ziplistCreateIntList}, {"test_ziplistPop", test_ziplistPop}, {"test_ziplistGetElementAtIndex3", test_ziplistGetElementAtIndex3}, {"test_ziplistGetElementOutOfRange", test_ziplistGetElementOutOfRange}, {"test_ziplistGetLastElement", test_ziplistGetLastElement}, {"test_ziplistGetFirstElement", test_ziplistGetFirstElement}, {"test_ziplistGetElementOutOfRangeReverse", test_ziplistGetElementOutOfRangeReverse}, {"test_ziplistIterateThroughFullList", test_ziplistIterateThroughFullList}, {"test_ziplistIterateThroughListFrom1ToEnd", test_ziplistIterateThroughListFrom1ToEnd}, {"test_ziplistIterateThroughListFrom2ToEnd", test_ziplistIterateThroughListFrom2ToEnd}, {"test_ziplistIterateThroughStartOutOfRange", test_ziplistIterateThroughStartOutOfRange}, {"test_ziplistIterateBackToFront", test_ziplistIterateBackToFront}, {"test_ziplistIterateBackToFrontDeletingAllItems", test_ziplistIterateBackToFrontDeletingAllItems}, {"test_ziplistDeleteInclusiveRange0To0", test_ziplistDeleteInclusiveRange0To0}, {"test_ziplistDeleteInclusiveRange0To1", test_ziplistDeleteInclusiveRange0To1}, {"test_ziplistDeleteInclusiveRange1To2", test_ziplistDeleteInclusiveRange1To2}, {"test_ziplistDeleteWithStartIndexOutOfRange", test_ziplistDeleteWithStartIndexOutOfRange}, {"test_ziplistDeleteWithNumOverflow", test_ziplistDeleteWithNumOverflow}, {"test_ziplistDeleteFooWhileIterating", test_ziplistDeleteFooWhileIterating}, {"test_ziplistReplaceWithSameSize", test_ziplistReplaceWithSameSize}, {"test_ziplistReplaceWithDifferentSize", test_ziplistReplaceWithDifferentSize}, {"test_ziplistRegressionTestForOver255ByteStrings", test_ziplistRegressionTestForOver255ByteStrings}, {"test_ziplistRegressionTestDeleteNextToLastEntries", test_ziplistRegressionTestDeleteNextToLastEntries}, {"test_ziplistCreateLongListAndCheckIndices", test_ziplistCreateLongListAndCheckIndices}, {"test_ziplistCompareStringWithZiplistEntries", test_ziplistCompareStringWithZiplistEntries}, {"test_ziplistMergeTest", test_ziplistMergeTest}, {"test_ziplistStressWithRandomPayloadsOfDifferentEncoding", test_ziplistStressWithRandomPayloadsOfDifferentEncoding}, {"test_ziplistCascadeUpdateEdgeCases", test_ziplistCascadeUpdateEdgeCases}, {"test_ziplistInsertEdgeCase", test_ziplistInsertEdgeCase}, {"test_ziplistStressWithVariableSize", test_ziplistStressWithVariableSize}, {"test_BenchmarkziplistFind", test_BenchmarkziplistFind}, {"test_BenchmarkziplistIndex", test_BenchmarkziplistIndex}, {"test_BenchmarkziplistValidateIntegrity", test_BenchmarkziplistValidateIntegrity}, {"test_BenchmarkziplistCompareWithString", test_BenchmarkziplistCompareWithString}, {"test_BenchmarkziplistCompareWithNumber", test_BenchmarkziplistCompareWithNumber}, {"test_ziplistStress__ziplistCascadeUpdate", test_ziplistStress__ziplistCascadeUpdate}, {NULL, NULL}};
272274
unitTest __test_zipmap_c[] = {{"test_zipmapIterateWithLargeKey", test_zipmapIterateWithLargeKey}, {"test_zipmapIterateThroughElements", test_zipmapIterateThroughElements}, {NULL, NULL}};
273275
unitTest __test_zmalloc_c[] = {{"test_zmallocInitialUsedMemory", test_zmallocInitialUsedMemory}, {"test_zmallocAllocReallocCallocAndFree", test_zmallocAllocReallocCallocAndFree}, {"test_zmallocAllocZeroByteAndFree", test_zmallocAllocZeroByteAndFree}, {NULL, NULL}};

src/unit/test_volatile_set.c

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,4 +515,175 @@ int test_bucket_lookup_strictly_below(int argc, char **argv, int flags) {
515515
raxStop(&ri);
516516
freeVolatileSet(vset);
517517
return 0;
518-
}
518+
}
519+
520+
521+
int test_iterator_basic(int argc, char **argv, int flags) {
522+
UNUSED(argc); UNUSED(argv); UNUSED(flags);
523+
524+
volatile_set *vset = createVolatileSet(NULL);
525+
long long expiry = 10000000;
526+
527+
entry *entries[130];
528+
for (int i = 0; i < 130; i++) {
529+
char f[32], v[32];
530+
snprintf(f, sizeof(f), "f%d", i);
531+
snprintf(v, sizeof(v), "v%d", i);
532+
entries[i] = entryCreate(createSds(f), createSds(v), expiry);
533+
TEST_ASSERT(volatileSetAddEntry(vset, entries[i], expiry) == 1);
534+
}
535+
536+
int seen[130] = {0};
537+
volatileSetIterator it;
538+
volatileSetStart(vset, &it);
539+
void *ptr;
540+
541+
int count = 0;
542+
while (volatileSetNext(&it, &ptr)) {
543+
for (int i = 0; i < 130; i++) {
544+
if (entries[i] == ptr) {
545+
TEST_ASSERT(seen[i] == 0); // no duplicates
546+
seen[i] = 1;
547+
count++;
548+
break;
549+
}
550+
}
551+
}
552+
volatileSetReset(&it);
553+
554+
TEST_ASSERT(count == 130);
555+
for (int i = 0; i < 130; i++)
556+
TEST_ASSERT(seen[i] == 1);
557+
558+
freeVolatileSet(vset);
559+
return 0;
560+
}
561+
562+
int test_iterator_listpack(int argc, char **argv, int flags) {
563+
UNUSED(argc); UNUSED(argv); UNUSED(flags);
564+
565+
volatile_set *vset = createVolatileSet(NULL);
566+
long long expiry = 12345678;
567+
568+
entry *entries[50];
569+
for (int i = 0; i < 50; i++) {
570+
char f[32], v[32];
571+
snprintf(f, sizeof(f), "f%d", i);
572+
snprintf(v, sizeof(v), "v%d", i);
573+
entries[i] = entryCreate(createSds(f), createSds(v), expiry);
574+
TEST_ASSERT(volatileSetAddEntry(vset, entries[i], expiry) == 1);
575+
}
576+
577+
// Check the bucket type is LISTPACK
578+
unsigned char key[VSET_BUCKET_KEY_LEN] = {0};
579+
size_t key_len = encodeExpiryBucketKey(key, expiry);
580+
void *raw = NULL;
581+
TEST_ASSERT(raxFind(vset->expiry_buckets, key, key_len, &raw) == 1);
582+
vsetBucket *bucket = raw;
583+
TEST_ASSERT(bucket->type == VSET_BUCKET_LISTPACK);
584+
TEST_ASSERT(lpLength(bucket->data.listpack) == 50);
585+
586+
// Iterate
587+
int seen[50] = {0};
588+
volatileSetIterator it;
589+
volatileSetStart(vset, &it);
590+
void *ptr;
591+
int count = 0;
592+
593+
while (volatileSetNext(&it, &ptr)) {
594+
for (int i = 0; i < 50; i++) {
595+
if (entries[i] == ptr) {
596+
TEST_ASSERT(seen[i] == 0);
597+
seen[i] = 1;
598+
count++;
599+
break;
600+
}
601+
}
602+
}
603+
604+
volatileSetReset(&it);
605+
606+
TEST_ASSERT(count == 50);
607+
for (int i = 0; i < 50; i++)
608+
TEST_ASSERT(seen[i] == 1);
609+
610+
freeVolatileSet(vset);
611+
return 0;
612+
}
613+
614+
int test_iterator_mixed_buckets(int argc, char **argv, int flags) {
615+
UNUSED(argc); UNUSED(argv); UNUSED(flags);
616+
617+
volatile_set *vset = createVolatileSet(NULL);
618+
long long expiry1 = 10000000; // SINGLE
619+
long long expiry2 = expiry1 + 60000; // LISTPACK
620+
long long expiry3 = expiry2 + 60000; // HT
621+
622+
entry *e_single = entryCreate(createSds("s"), createSds("v"), expiry1);
623+
TEST_ASSERT(volatileSetAddEntry(vset, e_single, expiry1) == 1);
624+
625+
entry *e_listpack[50];
626+
for (int i = 0; i < 50; i++) {
627+
char f[32], v[32];
628+
snprintf(f, sizeof(f), "lp_f%d", i);
629+
snprintf(v, sizeof(v), "lp_v%d", i);
630+
e_listpack[i] = entryCreate(createSds(f), createSds(v), expiry2);
631+
TEST_ASSERT(volatileSetAddEntry(vset, e_listpack[i], expiry2) == 1);
632+
}
633+
634+
entry *e_ht[130];
635+
for (int i = 0; i < 130; i++) {
636+
char f[32], v[32];
637+
snprintf(f, sizeof(f), "ht_f%d", i);
638+
snprintf(v, sizeof(v), "ht_v%d", i);
639+
e_ht[i] = entryCreate(createSds(f), createSds(v), expiry3);
640+
TEST_ASSERT(volatileSetAddEntry(vset, e_ht[i], expiry3) == 1);
641+
}
642+
643+
int found_single = 0;
644+
int seen_listpack[50] = {0};
645+
int seen_ht[130] = {0};
646+
647+
volatileSetIterator it;
648+
volatileSetStart(vset, &it);
649+
void *ptr;
650+
int count = 0;
651+
652+
int stop = 0;
653+
while (stop);
654+
655+
while (volatileSetNext(&it, &ptr)) {
656+
if (ptr == e_single) {
657+
TEST_ASSERT(found_single == 0);
658+
found_single = 1;
659+
goto seen;
660+
}
661+
for (int i = 0; i < 50; i++) {
662+
if (ptr == e_listpack[i]) {
663+
TEST_ASSERT(seen_listpack[i] == 0);
664+
seen_listpack[i] = 1;
665+
goto seen;
666+
}
667+
}
668+
for (int i = 0; i < 130; i++) {
669+
if (ptr == e_ht[i]) {
670+
TEST_ASSERT(seen_ht[i] == 0);
671+
seen_ht[i] = 1;
672+
goto seen;
673+
}
674+
}
675+
TEST_ASSERT(!"Unknown entry found");
676+
677+
seen:
678+
count++;
679+
}
680+
volatileSetReset(&it);
681+
682+
TEST_ASSERT(count == 1 + 50 + 130);
683+
TEST_ASSERT(found_single == 1);
684+
for (int i = 0; i < 50; i++) TEST_ASSERT(seen_listpack[i] == 1);
685+
for (int i = 0; i < 130; i++) TEST_ASSERT(seen_ht[i] == 1);
686+
687+
freeVolatileSet(vset);
688+
return 0;
689+
}

src/volatile_set.c

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,17 +218,66 @@ size_t volatileSetNumEntries(volatile_set *set) {
218218
void volatileSetStart(volatile_set *set, volatileSetIterator *it) {
219219
raxStart(&it->bucket, set->expiry_buckets);
220220
raxSeek(&it->bucket, ">=", NULL, 0);
221+
it->inner_it = NULL;
222+
it->state = 0;
221223
}
222224

223225
int volatileSetNext(volatileSetIterator *it, void **entryptr) {
224-
if (raxNext(&it->bucket)) {
225-
// assert(it->bucket.key_len != EXPIRY_HASH_SIZE);
226-
// memcpy(it->bucket.key + 8, entryptr, sizeof(*entryptr));
227-
*entryptr = it->bucket.data;
228-
return 1;
226+
while (1) {
227+
switch (it->state) {
228+
case 0: // Init or move to next bucket
229+
if (!raxNext(&it->bucket)) return 0;
230+
it->current_bucket = it->bucket.data;
231+
if (!it->current_bucket) continue;
232+
233+
if (it->current_bucket->type == VSET_BUCKET_SINGLE) {
234+
*entryptr = it->current_bucket->data.single;
235+
it->state = 1;
236+
return 1;
237+
} else if (it->current_bucket->type == VSET_BUCKET_LISTPACK) {
238+
it->inner_it = NULL;
239+
it->state = 2;
240+
continue;
241+
} else if (it->current_bucket->type == VSET_BUCKET_HT) {
242+
it->inner_it = hashtableCreateIterator(it->current_bucket->data.hashtable, HASHTABLE_ITER_SAFE);
243+
it->state = 3;
244+
continue;
245+
}
246+
break;
247+
248+
case 1: // single already returned, move to next bucket
249+
it->state = 0;
250+
continue;
251+
252+
case 2: { // listpack
253+
it->inner_it = it->inner_it
254+
? lpNext(it->current_bucket->data.listpack, it->inner_it)
255+
: lpFirst(it->current_bucket->data.listpack);
256+
257+
if (!it->inner_it) {
258+
it->state = 0;
259+
continue;
260+
}
261+
unsigned int slen;
262+
long long val;
263+
lpGetValue(it->inner_it, &slen, &val);
264+
*entryptr = (void *)(uintptr_t)val;
265+
return 1;
266+
}
267+
268+
case 3: { // hashtable
269+
if (!hashtableNext(it->inner_it, entryptr)) {
270+
hashtableReleaseIterator(it->inner_it);
271+
it->inner_it = NULL;
272+
it->state=0;
273+
continue;
274+
}
275+
return 1;
276+
}
277+
}
229278
}
230-
return 0;
231279
}
280+
232281
void volatileSetReset(volatileSetIterator *it) {
233282
raxStop(&it->bucket);
234283
}

src/volatile_set.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@ typedef struct {
2424
rax *expiry_buckets;
2525
} volatile_set;
2626

27-
typedef struct volatileSetIterator {
28-
raxIterator bucket;
29-
} volatileSetIterator;
30-
3127
#define VSET_BUCKET_SINGLE 0
3228
#define VSET_BUCKET_LISTPACK 1
3329
#define VSET_BUCKET_HT 2
@@ -41,6 +37,13 @@ typedef struct {
4137
} data;
4238
} vsetBucket;
4339

40+
typedef struct volatileSetIterator {
41+
raxIterator bucket;
42+
vsetBucket *current_bucket;
43+
int state; // 0 = uninitialized, 1 = single done, 2 = iterating listpack, 3 = iterating hashtable
44+
void *inner_it;
45+
} volatileSetIterator;
46+
4447
int volatileSetRemoveEntry(volatile_set *set, void *entry, long long expiry);
4548
int volatileSetAddEntry(volatile_set *set, void *entry, long long expiry);
4649
int volatileSetExpireEntry(volatile_set *set, void *entry);

0 commit comments

Comments
 (0)