Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
125 commits
Select commit Hold shift + click to select a range
c7ba3bb
Implementation of the slot async move, part I
enjoy-binbin Oct 14, 2024
219427b
Implementation of the slot async move, part II
enjoy-binbin Oct 14, 2024
c4993e2
Implementation of the slot async move, part III
enjoy-binbin Oct 14, 2024
a4f0dbe
Implementation of the slot async move, part IV
enjoy-binbin Oct 16, 2024
f78b62a
Implementation of the slot async move, part V
enjoy-binbin Oct 16, 2024
bc5c5bf
Implementation of the slot async move, part VI
enjoy-binbin Oct 17, 2024
2a14b69
Implementation of the slot async move, part VII
enjoy-binbin Oct 17, 2024
877f042
Support to inject errors in the slot async move
enjoy-binbin Oct 17, 2024
9ce37d1
Add crstest cases for cluster slot async move
enjoy-binbin Oct 17, 2024
655d102
Disable read slotsync data when rdb is loading
enjoy-binbin Oct 17, 2024
9f2343d
Add new RDBFLAGS_SLOT_SYNC to allow loading the same function from sl…
enjoy-binbin Oct 17, 2024
56e66c8
Fix replica data loss when master re-load a slot RDB
enjoy-binbin Oct 17, 2024
8def8b5
Fix SET with expire time crash in resharding
enjoy-binbin Oct 17, 2024
e899194
Fix double free when sync connection close after cluster slotfailover
enjoy-binbin Oct 17, 2024
0a43cc6
Disable expire during slot migration
enjoy-binbin Oct 17, 2024
4593bb9
Try to run all tests
enjoy-binbin Oct 17, 2024
b96ad52
Fix the dummy primary client on the target side
enjoy-binbin Oct 22, 2024
714cb30
Fix client post write issue
enjoy-binbin Oct 22, 2024
900601e
Fix target node wrongly reset the slot failover
enjoy-binbin Oct 22, 2024
5af2585
Fix slot sync test, make it can pass on Mac
enjoy-binbin Oct 22, 2024
dcf5cc7
Cleanup and format
enjoy-binbin Oct 23, 2024
e66525c
Minor cleanup around sync command
enjoy-binbin Oct 23, 2024
5ef6157
Target node as a replica send REPLCONF listening-port so we can see it
enjoy-binbin Oct 24, 2024
9b756bd
Set slotsync_slots default to NULL and add slot_sync_primary/replica …
enjoy-binbin Oct 24, 2024
7aef44d
Only traverse the slot dicts instead of traversing the entir kvstore
enjoy-binbin Oct 24, 2024
b27aae1
Fix disk-based replication reusing slot RDB file
enjoy-binbin Oct 25, 2024
49a51f1
Add assert to make sure we don't mess up the slot RDBs
enjoy-binbin Oct 25, 2024
cba2f94
Use bio to load the slot RDB
enjoy-binbin Oct 31, 2024
a8b2d85
Move client operations to the main thread
enjoy-binbin Nov 6, 2024
3352644
Don't set loading when loading slot RDB
enjoy-binbin Oct 31, 2024
b855ebd
Adding rdbLoadWithLoadingCtx function to take context arg
enjoy-binbin Nov 1, 2024
74f0ebf
Use temp db to store the slot RDB data
enjoy-binbin Nov 1, 2024
ee13d8b
Restore the slot DB when doing slot failover
enjoy-binbin Nov 4, 2024
3b2b5db
Restore the slot RDB when done loading it
enjoy-binbin Nov 18, 2024
0027d8d
Handle delKeysInSlot in a command execution unit
enjoy-binbin Nov 18, 2024
e9cb015
Enable all slotsync tests
enjoy-binbin Nov 19, 2024
112f8fa
Move the handler logic to replication
enjoy-binbin Nov 20, 2024
3572157
Fix slotsync replica keep adding replication buffer
enjoy-binbin Dec 25, 2024
3f6dd32
todo: there is a assert that need to fix
enjoy-binbin Jan 22, 2025
f95ed8d
Fix typo and format and build
enjoy-binbin Jan 23, 2025
6f6725d
Initial commit merging with https://github.com/valkey-io/valkey/pull/…
murphyjacob4 Jan 31, 2025
fd7f0f6
Code cleanup and bug fixes
murphyjacob4 Feb 1, 2025
8c8f498
Fix bugs related to replication and incremental streaming
murphyjacob4 Apr 4, 2025
4b2af5b
Support two-phase commit and fix some bugs
murphyjacob4 Apr 9, 2025
dd8d210
Begin converting tests to TCL
murphyjacob4 Apr 9, 2025
5845522
Merge remote-tracking branch 'upstream/unstable' into slot_migration_…
murphyjacob4 Apr 14, 2025
6977fe5
Add more tests and recent migration logs
murphyjacob4 Apr 9, 2025
ecce391
Continue adding test cases and fixing edge cases
murphyjacob4 Apr 12, 2025
c93e2f9
Support FLUSHDB
murphyjacob4 Apr 12, 2025
b8f5881
Get test suite passing
murphyjacob4 Apr 12, 2025
33d9630
Refactor slot migration links
murphyjacob4 Apr 12, 2025
cb0616d
Add a few more test cases and address some bugs
murphyjacob4 Apr 13, 2025
e4309f2
Revert formatting changes to unchanged files
murphyjacob4 Apr 13, 2025
0de936c
Add a state machine diagram and format changes
murphyjacob4 Apr 13, 2025
9e9e54b
Cleanup test helpers
murphyjacob4 Apr 14, 2025
a329d79
Apply clang format
murphyjacob4 Apr 14, 2025
a6c0d94
Fix potential non-exporting data in snapshot
murphyjacob4 Apr 14, 2025
8f701e2
Remove patch in https://github.com/valkey-io/valkey/pull/1948
murphyjacob4 Apr 15, 2025
adcd2a7
Fix mac and 32bit build errors due to size_t formatting
murphyjacob4 Apr 16, 2025
4189c24
Improve flakiness of export client buffer enforcement on slow machines
murphyjacob4 Apr 16, 2025
fa638de
Remove unused custom listDup method
murphyjacob4 Apr 16, 2025
9b420da
Add configs and testing for CLUSTER MIGRATIONS
murphyjacob4 Apr 16, 2025
20725b8
Add test case for WRONGPASS
murphyjacob4 Apr 16, 2025
e58c391
Use consistent serverLog style
murphyjacob4 Apr 16, 2025
a2bfafb
Begin addressing first round of review feedback
murphyjacob4 May 2, 2025
b2fa7b4
Merge remote-tracking branch 'upstream/unstable' into slot_migration_…
murphyjacob4 Jun 2, 2025
daaf5c7
Propagate deletion using CLUSTER FLUSHSLOT
murphyjacob4 Jun 2, 2025
9c4aff9
Update commands JSON and comments
murphyjacob4 Jun 3, 2025
b08c029
Fix import-cancel arguments
murphyjacob4 Jun 3, 2025
76cf082
Address some of the review feedback
murphyjacob4 Jun 7, 2025
cb77227
Migrate to CLUSTER MIGRATE from CLUSTER IMPORT
murphyjacob4 Jul 7, 2025
2b3cfb3
Merge remote-tracking branch 'upstream/unstable' into slot_migration_…
murphyjacob4 Jul 7, 2025
f7edf02
Plumb slot throughout propagation code
murphyjacob4 Jul 7, 2025
23d51b5
Clang format fixes
murphyjacob4 Jul 7, 2025
15ae5be
Fix handling of SKIP_COMMAND_VALIDATION with export ongoing
murphyjacob4 Jul 7, 2025
626c212
Cleanup and test fixes
murphyjacob4 Jul 7, 2025
c8220a9
Add config for maximum buffer size for pause
murphyjacob4 Jul 7, 2025
d04b0d3
Fix typo
murphyjacob4 Jul 7, 2025
58bfca4
Fix test case causing flake
murphyjacob4 Jul 7, 2025
99ae259
Clang format
murphyjacob4 Jul 7, 2025
274aaa1
Address review feedback
murphyjacob4 Jul 8, 2025
9eb6db8
Additional hardening (eviction, KEYS, RANDOMKEY)
murphyjacob4 Jul 9, 2025
4620fa8
Fix crash in KEYS command when cluster disabled
murphyjacob4 Jul 9, 2025
98fc148
Add void parameter for MacOS build
murphyjacob4 Jul 9, 2025
b1d8a9a
Improve test stability for expiration and timeouts
murphyjacob4 Jul 9, 2025
e7a3679
Incremental test coverage and flake fixes
murphyjacob4 Jul 10, 2025
df9188b
Fix bug related to slot snapshot taking too long and add test
murphyjacob4 Jul 15, 2025
ef92f6d
Address more review feedback
murphyjacob4 Jul 29, 2025
6e82b71
Fix command help
murphyjacob4 Jul 29, 2025
a4e787a
Fix WAITAOF when there is no replicas
murphyjacob4 Jul 30, 2025
b2fdca5
Address even more review feedback
murphyjacob4 Jul 31, 2025
fe6a08e
Merge remote-tracking branch 'upstream/unstable' into slot_migration_…
murphyjacob4 Jul 31, 2025
a30fc53
Fix bug related to sending ACKs while connecting
murphyjacob4 Jul 31, 2025
42e2a2f
Include importing slots in RDB snapshots
murphyjacob4 Aug 1, 2025
0089ba3
Fix HASHTABLE_ITER_INCLUDE_IMPORTING flag definition
murphyjacob4 Aug 1, 2025
d9859c3
Fix bug where cancel during reading establish response could crash
murphyjacob4 Aug 1, 2025
e610448
Fix sanitizer issue with refactored predicate logic
murphyjacob4 Aug 1, 2025
3644335
Fix clang format
murphyjacob4 Aug 1, 2025
e88fe03
Remove predicate from kvstore, use per slot AOF rewrite
murphyjacob4 Aug 2, 2025
daa3254
Fix old references to CLUSTER IMPORT
murphyjacob4 Aug 2, 2025
1e22259
Fix address sanitizer issue during send child info
murphyjacob4 Aug 2, 2025
df9608c
Update tests to match CLUSTER MIGRATE
murphyjacob4 Aug 2, 2025
2eb3099
Rename slot migration link to slot migration job
murphyjacob4 Aug 2, 2025
3f535bd
Address review feedback from August 1-3
murphyjacob4 Aug 4, 2025
7ae4a05
Clang format
murphyjacob4 Aug 4, 2025
a9a8855
Rename CLUSTER CANCELMIGRATION to CANCELMIGRATIONS and other review f…
murphyjacob4 Aug 4, 2025
3a3e432
Use intptr_t for better portability
murphyjacob4 Aug 5, 2025
65985fe
Command renaming and other feedback
murphyjacob4 Aug 7, 2025
516a5a2
Rename SLOTMIGRATIONS to GETSLOTMIGRATIONS
murphyjacob4 Aug 7, 2025
ddb9b68
Fix commands.def
murphyjacob4 Aug 7, 2025
8f4bb42
Merge remote-tracking branch 'upstream/unstable' into slot_migration_…
murphyjacob4 Aug 7, 2025
6e4bab2
Atomic slot migration integration with hash field expiration
murphyjacob4 Aug 8, 2025
eef803f
Fix AOF rewrite behavior for hashes with expirations
murphyjacob4 Aug 8, 2025
df3b32a
Mark volatile field kvstore as importing for proper containment
murphyjacob4 Aug 8, 2025
9da6ad8
Address review feedback
murphyjacob4 Aug 9, 2025
1aa937d
Address another round of review feedback
murphyjacob4 Aug 9, 2025
f399e38
Fix CANCELMIGRATIONS help text
murphyjacob4 Aug 9, 2025
3210ef9
More review feedback
murphyjacob4 Aug 9, 2025
4607708
Review feedback
murphyjacob4 Aug 11, 2025
b400110
Merge remote-tracking branch 'upstream/unstable' into slot_migration_…
murphyjacob4 Aug 11, 2025
0d51e20
Command renaming for congruency
murphyjacob4 Aug 11, 2025
8eaad96
Merge remote-tracking branch 'upstream/unstable' into slot_migration_…
murphyjacob4 Aug 11, 2025
dae644d
Trigger tests
madolson Aug 11, 2025
6e1b137
Fix some TLS related test failures
madolson Aug 11, 2025
2a155f9
Remove slot migration from request response tool
madolson Aug 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/intset.c
${CMAKE_SOURCE_DIR}/src/syncio.c
${CMAKE_SOURCE_DIR}/src/cluster.c
${CMAKE_SOURCE_DIR}/src/cluster_migrateslots.c
${CMAKE_SOURCE_DIR}/src/cluster_legacy.c
${CMAKE_SOURCE_DIR}/src/cluster_slot_stats.c
${CMAKE_SOURCE_DIR}/src/crc16.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_TRACE_OBJ=trace/trace.o trace/trace_commands.o trace/trace_db.o trace/trace_cluster.o trace/trace_server.o trace/trace_rdb.o trace/trace_aof.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o vset.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o cluster_migrateslots.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o vset.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ)
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o sds.o util.o sha256.o
Expand Down
153 changes: 100 additions & 53 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2218,6 +2218,103 @@ static int rewriteFunctions(rio *aof) {
return 0;
}

int rewriteSelectDbRio(rio *aof, int db_num) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) return C_ERR;
if (rioWriteBulkLongLong(aof, db_num) == 0) return C_ERR;
return C_OK;
}

int rewriteObjectRio(rio *aof, robj *o, int db_num) {
size_t aof_bytes_before_key = aof->processed_bytes;
sds keystr;
robj key;
long long expiretime;

keystr = objectGetKey(o);
initStaticStringObject(key, keystr);

expiretime = objectGetExpire(o);

/* Save the key and associated value */
if (o->type == OBJ_STRING) {
/* Emit a SET command */
char cmd[] = "*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof, cmd, sizeof(cmd) - 1) == 0) return C_ERR;
/* Key and value */
if (rioWriteBulkObject(aof, &key) == 0) return C_ERR;
if (rioWriteBulkObject(aof, o) == 0) return C_ERR;
} else if (o->type == OBJ_LIST) {
if (rewriteListObject(aof, &key, o) == 0) return C_ERR;
} else if (o->type == OBJ_SET) {
if (rewriteSetObject(aof, &key, o) == 0) return C_ERR;
} else if (o->type == OBJ_ZSET) {
if (rewriteSortedSetObject(aof, &key, o) == 0) return C_ERR;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof, &key, o) == 0) return C_ERR;
} else if (o->type == OBJ_STREAM) {
if (rewriteStreamObject(aof, &key, o) == 0) return C_ERR;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof, &key, o, db_num) == 0) return C_ERR;
} else {
serverPanic("Unknown object type");
}

/* In fork child process, we can try to release memory back to the
* OS and possibly avoid or decrease COW. We give the dismiss
* mechanism a hint about an estimated size of the object we stored. */
size_t dump_size = aof->processed_bytes - aof_bytes_before_key;
if (server.in_fork_child) dismissObject(o, dump_size);

/* Save the expire time */
if (expiretime != -1) {
char cmd[] = "*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof, cmd, sizeof(cmd) - 1) == 0) return C_ERR;
if (rioWriteBulkObject(aof, &key) == 0) return C_ERR;
if (rioWriteBulkLongLong(aof, expiretime) == 0) return C_ERR;
}

/* Delay before next key if required (for testing) */
if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay);

return C_OK;
}

int rewriteSlotToAppendOnlyFileRio(rio *aof, int db_num, int hashslot, size_t *key_count) {
long long updated_time = 0;

if (rewriteFunctions(aof) == 0) return C_ERR;
if (dbHasNoKeys(db_num)) return C_OK;

serverDb *db = server.db[db_num];
if (kvstoreHashtableSize(db->keys, hashslot) == 0) return C_OK;

/* SELECT the DB */
if (rewriteSelectDbRio(aof, db_num) == C_ERR) return C_ERR;

kvstoreHashtableIterator *iter = kvstoreGetHashtableIterator(db->keys, hashslot, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);
void *next;
while (kvstoreHashtableIteratorNext(iter, &next)) {
robj *o = next;

/* Update info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
* check the diff every 1024 keys */
if (key_count && ((*key_count)++ & 1023) == 0) {
long long now = mstime();
if (now - updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_count, "AOF rewrite");
updated_time = now;
}
}

if (rewriteObjectRio(aof, o, db_num) == C_ERR) return C_ERR;
}

kvstoreReleaseHashtableIterator(iter);
return C_OK;
}

int rewriteAppendOnlyFileRio(rio *aof) {
int j;
long key_count = 0;
Expand All @@ -2234,69 +2331,20 @@ int rewriteAppendOnlyFileRio(rio *aof) {
sdsfree(ts);
}

if (rewriteFunctions(aof) == 0) goto werr;
if (rewriteFunctions(aof) == C_ERR) goto werr;

for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
if (dbHasNoKeys(j)) continue;
serverDb *db = server.db[j];

/* SELECT the new DB */
if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr;
if (rioWriteBulkLongLong(aof, j) == 0) goto werr;
if (rewriteSelectDbRio(aof, j) == C_ERR) goto werr;

kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);
/* Iterate this DB writing every entry */
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
robj *o = next;
sds keystr;
robj key;
long long expiretime;
size_t aof_bytes_before_key = aof->processed_bytes;

keystr = objectGetKey(o);
initStaticStringObject(key, keystr);

expiretime = objectGetExpire(o);

/* Save the key and associated value */
if (o->type == OBJ_STRING) {
/* Emit a SET command */
char cmd[] = "*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof, cmd, sizeof(cmd) - 1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(aof, &key) == 0) goto werr;
if (rioWriteBulkObject(aof, o) == 0) goto werr;
} else if (o->type == OBJ_LIST) {
if (rewriteListObject(aof, &key, o) == 0) goto werr;
} else if (o->type == OBJ_SET) {
if (rewriteSetObject(aof, &key, o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) {
if (rewriteSortedSetObject(aof, &key, o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof, &key, o) == 0) goto werr;
} else if (o->type == OBJ_STREAM) {
if (rewriteStreamObject(aof, &key, o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof, &key, o, j) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}

/* In fork child process, we can try to release memory back to the
* OS and possibly avoid or decrease COW. We give the dismiss
* mechanism a hint about an estimated size of the object we stored. */
size_t dump_size = aof->processed_bytes - aof_bytes_before_key;
if (server.in_fork_child) dismissObject(o, dump_size);

/* Save the expire time */
if (expiretime != -1) {
char cmd[] = "*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof, cmd, sizeof(cmd) - 1) == 0) goto werr;
if (rioWriteBulkObject(aof, &key) == 0) goto werr;
if (rioWriteBulkLongLong(aof, expiretime) == 0) goto werr;
}

/* Update info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
Expand All @@ -2309,8 +2357,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
}
}

/* Delay before next key if required (for testing) */
if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay);
if (rewriteObjectRio(aof, o, j) == C_ERR) goto werr;
}
kvstoreIteratorRelease(kvs_it);
}
Expand Down
4 changes: 2 additions & 2 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ void freeClientBlockingState(client *c) {
* flag is set client query buffer is not longer processed, but accumulated,
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
/* Primary client should never be blocked unless pause or module */
serverAssert(!(c->flag.primary && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));
/* Replicated clients should never be blocked unless pause or module */
serverAssert(!(isReplicatedClient(c) && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));

initClientBlockingState(c);

Expand Down
1 change: 0 additions & 1 deletion src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,6 @@ void resetClusterStats(void) {
clusterSlotStatResetAll();
}


void clusterCommandFlushslot(client *c) {
int slot;
int lazy = server.lazyfree_lazy_user_flush;
Expand Down
20 changes: 20 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
#define CLUSTER_REDIR_DOWN_UNBOUND 6 /* -CLUSTERDOWN, unbound slot. */
#define CLUSTER_REDIR_DOWN_RO_STATE 7 /* -CLUSTERDOWN, allow reads. */

/* Fixed timeout value for cluster operations (milliseconds) */
#define CLUSTER_OPERATION_TIMEOUT 2000

typedef struct _clusterNode clusterNode;
struct clusterState;

Expand All @@ -38,6 +41,10 @@ struct clusterState;
#define CLUSTER_MODULE_FLAG_NO_FAILOVER (1 << 1)
#define CLUSTER_MODULE_FLAG_NO_REDIRECTION (1 << 2)

/* For clusterBroadcastPong */
#define CLUSTER_BROADCAST_ALL 0 /* All known instances. */
#define CLUSTER_BROADCAST_LOCAL_REPLICAS 1 /* All replicas in my primary-replicas ring. */

/* ---------------------- API exported outside cluster.c -------------------- */
/* functions requiring mechanism specific implementations */
void clusterInit(void);
Expand All @@ -62,6 +69,7 @@ void clusterUpdateMyselfAnnouncedPorts(void);
void clusterUpdateMyselfHumanNodename(void);

void clusterPropagatePublish(robj *channel, robj *message, int sharded);
void clusterBroadcastPong(int target);

unsigned long getClusterConnectionsCount(void);
int isClusterHealthy(void);
Expand Down Expand Up @@ -118,6 +126,9 @@ void clearCachedClusterSlotsResponse(void);
unsigned int countKeysInSlotForDb(unsigned int hashslot, serverDb *db);
unsigned int countKeysInSlot(unsigned int hashslot);
int getSlotOrReply(client *c, robj *o);
int getNodeDefaultReplicationPort(clusterNode *node);
bool isAnySlotInManualImportingState(void);
bool isAnySlotInManualMigratingState(void);

/* functions with shared implementations */
int clusterNodeIsMyself(clusterNode *n);
Expand All @@ -137,4 +148,13 @@ long long getNodeReplicationOffset(clusterNode *node);
sds aggregateClientOutputBuffer(client *c);
void resetClusterStats(void);
unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, bool send_del_event);

unsigned int propagateSlotDeletionByKeys(unsigned int hashslot);
void clusterUpdateState(void);
void clusterSaveConfigOrDie(int do_fsync);
int clusterDelSlot(int slot);
int clusterAddSlot(clusterNode *n, int slot);
int clusterBumpConfigEpochWithoutConsensus(void);
void clusterDoBeforeSleep(int flags);

#endif /* __CLUSTER_H */
Loading
Loading