Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c2a3757
add cluster flushslot command.
wuranxx Dec 3, 2024
15c056b
Add TCL tests and optimize code structure.
wuranxx Dec 5, 2024
61af84d
Fix compile warning.
wuranxx Dec 5, 2024
384599f
Fix delKeysInSlot does not consider command invoke.
wuranxx Jan 22, 2025
ff3afe0
1. Move clusterCommandFlushslot to cluster.c
wuranxx Apr 12, 2025
a72fa59
Fix bool issue
hwware Apr 14, 2025
50fb27d
Adjust format for cluster_legacy.c
hwware Apr 14, 2025
04ac837
Adjust code format for cluster_legacy.c
hwware Apr 14, 2025
19814d5
Remove newline
hwware Apr 14, 2025
f6f258e
Update src/cluster_legacy.c
wuranxx Apr 22, 2025
aad606d
Update src/cluster_legacy.c
wuranxx Apr 22, 2025
4fb08a9
Update src/commands/cluster-flushslot.json
wuranxx Apr 22, 2025
5fc6fe6
Update src/commands/cluster-flushslot.json
wuranxx Apr 22, 2025
050eacd
Update src/commands/cluster-flushslot.json
wuranxx Apr 22, 2025
3829d38
Update src/commands/cluster-flushslot.json
wuranxx Apr 22, 2025
ee303ae
add flushslot command propagate test.
wuranxx Apr 24, 2025
ddfa7ff
change tcl test use assert_match and assert_no_match.
wuranxx Apr 28, 2025
245f1d2
Merge branch 'unstable' into cluster-flush-slot
wuranxx May 12, 2025
4742800
Merge branch 'unstable' into cluster-flush-slot
wuranxx May 20, 2025
e37a8c2
add `propagate_del` and `send_del_event` parameters to `delKeysInSlot`.
wuranxx May 20, 2025
e7c9be7
fix code format.
wuranxx May 22, 2025
62593e4
Merge branch 'valkey-io:unstable' into cluster-flush-slot
wuranxx May 26, 2025
9a2839e
Remove `since` from `cluster-flushslot` options definition.
wuranxx May 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1540,3 +1540,22 @@ void resetClusterStats(void) {

clusterSlotStatResetAll();
}


void clusterCommandFlushslot(client *c) {
int slot;
int lazy = server.lazyfree_lazy_user_flush;
if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return;
if (c->argc == 4) {
if (!strcasecmp(c->argv[3]->ptr, "async")) {
lazy = 1;
} else if (!strcasecmp(c->argv[3]->ptr, "sync")) {
lazy = 0;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
}
delKeysInSlot(slot, lazy, false, true);
addReply(c, shared.ok);
}
2 changes: 2 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef __CLUSTER_H
#define __CLUSTER_H

#include <stdbool.h>
/*-----------------------------------------------------------------------------
* Cluster exported API.
*----------------------------------------------------------------------------*/
Expand Down Expand Up @@ -133,4 +134,5 @@ int isNodeAvailable(clusterNode *node);
long long getNodeReplicationOffset(clusterNode *node);
sds aggregateClientOutputBuffer(client *c);
void resetClusterStats(void);
unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, bool send_del_event);
#endif /* __CLUSTER_H */
37 changes: 25 additions & 12 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ void moduleCallClusterReceivers(const char *sender_id,
const char *clusterGetMessageTypeString(int type);
void removeChannelsInSlot(unsigned int slot);
unsigned int countChannelsInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot);
void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
list *clusterLookupNodeListByShardId(const char *shard_id);
void clusterRemoveNodeFromShard(clusterNode *node);
Expand Down Expand Up @@ -127,6 +126,7 @@ int verifyClusterNodeId(const char *name, int length);
sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);
void clusterCommandFlushslot(client *c);

/* Only primaries that own slots have voting rights.
* Returns 1 if the node has voting rights, otherwise returns 0. */
Expand Down Expand Up @@ -2861,7 +2861,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
for (int j = 0; j < dirty_slots_count; j++) {
serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", dirty_slots[j],
myself->name, myself->human_nodename, myself->shard_id);
delKeysInSlot(dirty_slots[j]);
delKeysInSlot(dirty_slots[j], server.lazyfree_lazy_server_del, true, false);
}
}
}
Expand Down Expand Up @@ -5915,7 +5915,7 @@ int verifyClusterConfigWithData(void) {
server.cluster->importing_slots_from[j]->shard_id, j, server.cluster->slots[j]->name,
server.cluster->slots[j]->human_nodename, server.cluster->slots[j]->shard_id);
}
delKeysInSlot(j);
delKeysInSlot(j, server.lazyfree_lazy_server_del, true, false);
}
}
if (update_config) clusterSaveConfigOrDie(1);
Expand Down Expand Up @@ -6534,13 +6534,14 @@ void removeChannelsInSlot(unsigned int slot) {

/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, bool send_del_event) {
if (!countKeysInSlot(hashslot)) return 0;

/* We may lose a slot during the pause. We need to track this
* state so that we don't assert in propagateNow(). */
server.server_del_keys_in_slot = 1;
unsigned int j = 0;
int before_execution_nesting = server.execution_nesting;

for (int i = 0; i < server.dbnum; i++) {
kvstoreHashtableIterator *kvs_di = NULL;
Expand All @@ -6552,13 +6553,23 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
enterExecutionUnit(1, 0);
sds sdskey = objectGetKey(valkey);
robj *key = createStringObject(sdskey, sdslen(sdskey));
dbDelete(&db, key);
propagateDeletion(&db, key, server.lazyfree_lazy_server_del);
if (lazy) {
dbAsyncDelete(&db, key);
} else {
dbSyncDelete(&db, key);
}
// if is command, skip del propagate
if (propagate_del) propagateDeletion(&db, key, lazy);
signalModifiedKey(NULL, &db, key);
/* The keys are not actually logically deleted from the database, just moved to another node.
* The modules needs to know that these keys are no longer available locally, so just send the
* keyspace notification to the modules, but not to clients. */
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id);
if (send_del_event) {
/* In the `cluster flushslot` scenario, the keys are actually deleted so notify everyone. */
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id);
} else {
/* The keys are not actually logically deleted from the database, just moved to another node.
* The modules needs to know that these keys are no longer available locally, so just send the
* keyspace notification to the modules, but not to clients. */
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id);
}
exitExecutionUnit();
postExecutionUnitOperations();
decrRefCount(key);
Expand All @@ -6568,7 +6579,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
kvstoreReleaseHashtableIterator(kvs_di);
}
server.server_del_keys_in_slot = 0;
serverAssert(server.execution_nesting == 0);
serverAssert(server.execution_nesting == before_execution_nesting);
return j;
}

Expand Down Expand Up @@ -7370,6 +7381,9 @@ int clusterCommandSpecial(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr, "links") && c->argc == 2) {
/* CLUSTER LINKS */
addReplyClusterLinksDescription(c);
} else if (!strcasecmp(c->argv[1]->ptr, "flushslot") && (c->argc == 3 || c->argc == 4)) {
/* CLUSTER FLUSHSLOT <slot> [ASYNC|SYNC] */
clusterCommandFlushslot(c);
} else {
return 0;
}
Expand Down Expand Up @@ -7562,6 +7576,5 @@ int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s) {
server.cluster->migrating_slots_to[slot] = node;
}
}

return C_OK;
}
30 changes: 30 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,35 @@ struct COMMAND_ARG CLUSTER_FAILOVER_Args[] = {
{MAKE_ARG("options",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_FAILOVER_options_Subargs},
};

/********** CLUSTER FLUSHSLOT ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER FLUSHSLOT history */
#define CLUSTER_FLUSHSLOT_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLUSTER FLUSHSLOT tips */
#define CLUSTER_FLUSHSLOT_Tips NULL
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLUSTER FLUSHSLOT key specs */
#define CLUSTER_FLUSHSLOT_Keyspecs NULL
#endif

/* CLUSTER FLUSHSLOT flush_type argument table */
struct COMMAND_ARG CLUSTER_FLUSHSLOT_flush_type_Subargs[] = {
{MAKE_ARG("async",ARG_TYPE_PURE_TOKEN,-1,"ASYNC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("sync",ARG_TYPE_PURE_TOKEN,-1,"SYNC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER FLUSHSLOT argument table */
struct COMMAND_ARG CLUSTER_FLUSHSLOT_Args[] = {
{MAKE_ARG("slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("flush-type",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_FLUSHSLOT_flush_type_Subargs},
};

/********** CLUSTER FLUSHSLOTS ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -1026,6 +1055,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("delslots","Sets hash slots as unbound for a node.","O(N) where N is the total number of hash slot arguments","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTS_History,0,CLUSTER_DELSLOTS_Tips,0,clusterCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTS_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTS_Args},
{MAKE_CMD("delslotsrange","Sets hash slot ranges as unbound for a node.","O(N) where N is the total number of the slots between the start slot and end slot arguments.","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTSRANGE_History,0,CLUSTER_DELSLOTSRANGE_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTSRANGE_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTSRANGE_Args},
{MAKE_CMD("failover","Forces a replica to perform a manual failover of its primary.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FAILOVER_History,0,CLUSTER_FAILOVER_Tips,0,clusterCommand,-2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FAILOVER_Keyspecs,0,NULL,1),.args=CLUSTER_FAILOVER_Args},
{MAKE_CMD("flushslot","Remove all keys from the target slot.","O(N) where N is the number of keys in the target slot","9.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FLUSHSLOT_History,0,CLUSTER_FLUSHSLOT_Tips,0,clusterCommand,-3,CMD_WRITE|CMD_NO_ASYNC_LOADING|CMD_ADMIN,ACL_CATEGORY_KEYSPACE,CLUSTER_FLUSHSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_FLUSHSLOT_Args},
{MAKE_CMD("flushslots","Deletes all slots information from a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FLUSHSLOTS_History,0,CLUSTER_FLUSHSLOTS_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FLUSHSLOTS_Keyspecs,0,NULL,0)},
{MAKE_CMD("forget","Removes a node from the nodes table.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FORGET_History,0,CLUSTER_FORGET_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FORGET_Keyspecs,0,NULL,1),.args=CLUSTER_FORGET_Args},
{MAKE_CMD("getkeysinslot","Returns the key names in a hash slot.","O(N) where N is the number of requested keys","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_GETKEYSINSLOT_History,0,CLUSTER_GETKEYSINSLOT_Tips,1,clusterCommand,4,CMD_STALE,0,CLUSTER_GETKEYSINSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_GETKEYSINSLOT_Args},
Expand Down
45 changes: 45 additions & 0 deletions src/commands/cluster-flushslot.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"FLUSHSLOT": {
"summary": "Remove all keys from the target slot.",
"complexity": "O(N) where N is the number of keys in the target slot",
"group": "cluster",
"since": "9.0.0",
"arity": -3,
"container": "CLUSTER",
"function": "clusterCommand",
"command_flags": [
"WRITE",
"NO_ASYNC_LOADING",
"ADMIN"
],
"acl_categories": [
"KEYSPACE"
],
"reply_schema": {
"const": "OK"
},
"arguments": [
{
"name": "slot",
"type": "integer"
},
{
"name": "flush-type",
"type": "oneof",
"optional": true,
"arguments": [
{
"name": "async",
"type": "pure-token",
"token": "ASYNC"
},
{
"name": "sync",
"type": "pure-token",
"token": "SYNC"
}
]
}
]
}
}
64 changes: 64 additions & 0 deletions tests/unit/cluster/cluster-flush-slot.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
start_cluster 2 2 {tags {external:skip cluster}} {
test "SYNC Flush slot command" {
set key_slot [R 0 CLUSTER KEYSLOT FC]
set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot]

# set key
for {set i 0} {$i < 1000} {incr i} {
R 0 set "{FC}-$i" "value"
}
set after_keys_num [expr {$slot_keys_num + 1000}]
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num

# flush slot key
R 0 CLUSTER FLUSHSLOT $key_slot SYNC
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0
}

test "ASYNC Flush slot command" {
set key_slot [R 0 CLUSTER KEYSLOT FC]
set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot]

# set key
for {set i 0} {$i < 1000} {incr i} {
R 0 set "{FC}-$i" "value"
}
set after_keys_num [expr {$slot_keys_num + 1000}]
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num

# flush slot key
R 0 CLUSTER FLUSHSLOT $key_slot ASYNC
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0
}
}

start_cluster 2 2 {tags {external:skip cluster}} {
test "Flush slot command propagated to replica" {
set key_slot [R 0 CLUSTER KEYSLOT FC]
set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot]

# set key
for {set i 0} {$i < 1000} {incr i} {
R 0 set "{FC}-$i" "value"
}
set after_keys_num [expr {$slot_keys_num + 1000}]
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num

# flush slot key
R 0 CLUSTER FLUSHSLOT $key_slot SYNC
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0

# assert flush slot propagated to replica
for {set l 0} {$l < 4} {incr l} {
puts "R $l info:"
puts [R $l info replication]
puts [R $l info commandSTATS]
}

set info [R 2 info commandSTATS]
# not del cmd
assert_no_match "*cmdstat_del*" $info
# has flushslot cmd
assert_match "*cmdstat_cluster|flushslot:calls=1*" $info
}
}
Loading