diff --git a/src/cluster.c b/src/cluster.c index d2afe73ed2..acf5d4ce7c 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1540,3 +1540,22 @@ void resetClusterStats(void) { clusterSlotStatResetAll(); } + + +void clusterCommandFlushslot(client *c) { + int slot; + int lazy = server.lazyfree_lazy_user_flush; + if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return; + if (c->argc == 4) { + if (!strcasecmp(c->argv[3]->ptr, "async")) { + lazy = 1; + } else if (!strcasecmp(c->argv[3]->ptr, "sync")) { + lazy = 0; + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + } + delKeysInSlot(slot, lazy, false, true); + addReply(c, shared.ok); +} diff --git a/src/cluster.h b/src/cluster.h index 9c839410d6..516d2a7b92 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -1,6 +1,7 @@ #ifndef __CLUSTER_H #define __CLUSTER_H +#include /*----------------------------------------------------------------------------- * Cluster exported API. *----------------------------------------------------------------------------*/ @@ -133,4 +134,5 @@ int isNodeAvailable(clusterNode *node); long long getNodeReplicationOffset(clusterNode *node); sds aggregateClientOutputBuffer(client *c); void resetClusterStats(void); +unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, bool send_del_event); #endif /* __CLUSTER_H */ diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 7c4e9e04db..75e1c9f62c 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -98,7 +98,6 @@ void moduleCallClusterReceivers(const char *sender_id, const char *clusterGetMessageTypeString(int type); void removeChannelsInSlot(unsigned int slot); unsigned int countChannelsInSlot(unsigned int hashslot); -unsigned int delKeysInSlot(unsigned int hashslot); void clusterAddNodeToShard(const char *shard_id, clusterNode *node); list *clusterLookupNodeListByShardId(const char *shard_id); void clusterRemoveNodeFromShard(clusterNode *node); @@ -127,6 +126,7 @@ int verifyClusterNodeId(const char *name, int length); sds clusterEncodeOpenSlotsAuxField(int rdbflags); int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s); static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now); +void clusterCommandFlushslot(client *c); /* Only primaries that own slots have voting rights. * Returns 1 if the node has voting rights, otherwise returns 0. */ @@ -2861,7 +2861,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc for (int j = 0; j < dirty_slots_count; j++) { serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", dirty_slots[j], myself->name, myself->human_nodename, myself->shard_id); - delKeysInSlot(dirty_slots[j]); + delKeysInSlot(dirty_slots[j], server.lazyfree_lazy_server_del, true, false); } } } @@ -5915,7 +5915,7 @@ int verifyClusterConfigWithData(void) { server.cluster->importing_slots_from[j]->shard_id, j, server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename, server.cluster->slots[j]->shard_id); } - delKeysInSlot(j); + delKeysInSlot(j, server.lazyfree_lazy_server_del, true, false); } } if (update_config) clusterSaveConfigOrDie(1); @@ -6534,13 +6534,14 @@ void removeChannelsInSlot(unsigned int slot) { /* Remove all the keys in the specified hash slot. * The number of removed items is returned. */ -unsigned int delKeysInSlot(unsigned int hashslot) { +unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, bool send_del_event) { if (!countKeysInSlot(hashslot)) return 0; /* We may lose a slot during the pause. We need to track this * state so that we don't assert in propagateNow(). */ server.server_del_keys_in_slot = 1; unsigned int j = 0; + int before_execution_nesting = server.execution_nesting; for (int i = 0; i < server.dbnum; i++) { kvstoreHashtableIterator *kvs_di = NULL; @@ -6552,13 +6553,23 @@ unsigned int delKeysInSlot(unsigned int hashslot) { enterExecutionUnit(1, 0); sds sdskey = objectGetKey(valkey); robj *key = createStringObject(sdskey, sdslen(sdskey)); - dbDelete(&db, key); - propagateDeletion(&db, key, server.lazyfree_lazy_server_del); + if (lazy) { + dbAsyncDelete(&db, key); + } else { + dbSyncDelete(&db, key); + } + // if is command, skip del propagate + if (propagate_del) propagateDeletion(&db, key, lazy); signalModifiedKey(NULL, &db, key); - /* The keys are not actually logically deleted from the database, just moved to another node. - * The modules needs to know that these keys are no longer available locally, so just send the - * keyspace notification to the modules, but not to clients. */ - moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id); + if (send_del_event) { + /* In the `cluster flushslot` scenario, the keys are actually deleted so notify everyone. */ + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id); + } else { + /* The keys are not actually logically deleted from the database, just moved to another node. + * The modules needs to know that these keys are no longer available locally, so just send the + * keyspace notification to the modules, but not to clients. */ + moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id); + } exitExecutionUnit(); postExecutionUnitOperations(); decrRefCount(key); @@ -6568,7 +6579,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) { kvstoreReleaseHashtableIterator(kvs_di); } server.server_del_keys_in_slot = 0; - serverAssert(server.execution_nesting == 0); + serverAssert(server.execution_nesting == before_execution_nesting); return j; } @@ -7370,6 +7381,9 @@ int clusterCommandSpecial(client *c) { } else if (!strcasecmp(c->argv[1]->ptr, "links") && c->argc == 2) { /* CLUSTER LINKS */ addReplyClusterLinksDescription(c); + } else if (!strcasecmp(c->argv[1]->ptr, "flushslot") && (c->argc == 3 || c->argc == 4)) { + /* CLUSTER FLUSHSLOT [ASYNC|SYNC] */ + clusterCommandFlushslot(c); } else { return 0; } @@ -7562,6 +7576,5 @@ int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s) { server.cluster->migrating_slots_to[slot] = node; } } - return C_OK; } diff --git a/src/commands.def b/src/commands.def index 2187f1b9ba..7cb1f5b001 100644 --- a/src/commands.def +++ b/src/commands.def @@ -518,6 +518,35 @@ struct COMMAND_ARG CLUSTER_FAILOVER_Args[] = { {MAKE_ARG("options",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_FAILOVER_options_Subargs}, }; +/********** CLUSTER FLUSHSLOT ********************/ + +#ifndef SKIP_CMD_HISTORY_TABLE +/* CLUSTER FLUSHSLOT history */ +#define CLUSTER_FLUSHSLOT_History NULL +#endif + +#ifndef SKIP_CMD_TIPS_TABLE +/* CLUSTER FLUSHSLOT tips */ +#define CLUSTER_FLUSHSLOT_Tips NULL +#endif + +#ifndef SKIP_CMD_KEY_SPECS_TABLE +/* CLUSTER FLUSHSLOT key specs */ +#define CLUSTER_FLUSHSLOT_Keyspecs NULL +#endif + +/* CLUSTER FLUSHSLOT flush_type argument table */ +struct COMMAND_ARG CLUSTER_FLUSHSLOT_flush_type_Subargs[] = { +{MAKE_ARG("async",ARG_TYPE_PURE_TOKEN,-1,"ASYNC",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("sync",ARG_TYPE_PURE_TOKEN,-1,"SYNC",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +}; + +/* CLUSTER FLUSHSLOT argument table */ +struct COMMAND_ARG CLUSTER_FLUSHSLOT_Args[] = { +{MAKE_ARG("slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("flush-type",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_FLUSHSLOT_flush_type_Subargs}, +}; + /********** CLUSTER FLUSHSLOTS ********************/ #ifndef SKIP_CMD_HISTORY_TABLE @@ -1026,6 +1055,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("delslots","Sets hash slots as unbound for a node.","O(N) where N is the total number of hash slot arguments","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTS_History,0,CLUSTER_DELSLOTS_Tips,0,clusterCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTS_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTS_Args}, {MAKE_CMD("delslotsrange","Sets hash slot ranges as unbound for a node.","O(N) where N is the total number of the slots between the start slot and end slot arguments.","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTSRANGE_History,0,CLUSTER_DELSLOTSRANGE_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTSRANGE_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTSRANGE_Args}, {MAKE_CMD("failover","Forces a replica to perform a manual failover of its primary.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FAILOVER_History,0,CLUSTER_FAILOVER_Tips,0,clusterCommand,-2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FAILOVER_Keyspecs,0,NULL,1),.args=CLUSTER_FAILOVER_Args}, +{MAKE_CMD("flushslot","Remove all keys from the target slot.","O(N) where N is the number of keys in the target slot","9.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FLUSHSLOT_History,0,CLUSTER_FLUSHSLOT_Tips,0,clusterCommand,-3,CMD_WRITE|CMD_NO_ASYNC_LOADING|CMD_ADMIN,ACL_CATEGORY_KEYSPACE,CLUSTER_FLUSHSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_FLUSHSLOT_Args}, {MAKE_CMD("flushslots","Deletes all slots information from a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FLUSHSLOTS_History,0,CLUSTER_FLUSHSLOTS_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FLUSHSLOTS_Keyspecs,0,NULL,0)}, {MAKE_CMD("forget","Removes a node from the nodes table.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FORGET_History,0,CLUSTER_FORGET_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FORGET_Keyspecs,0,NULL,1),.args=CLUSTER_FORGET_Args}, {MAKE_CMD("getkeysinslot","Returns the key names in a hash slot.","O(N) where N is the number of requested keys","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_GETKEYSINSLOT_History,0,CLUSTER_GETKEYSINSLOT_Tips,1,clusterCommand,4,CMD_STALE,0,CLUSTER_GETKEYSINSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_GETKEYSINSLOT_Args}, diff --git a/src/commands/cluster-flushslot.json b/src/commands/cluster-flushslot.json new file mode 100644 index 0000000000..181f176ee4 --- /dev/null +++ b/src/commands/cluster-flushslot.json @@ -0,0 +1,45 @@ +{ + "FLUSHSLOT": { + "summary": "Remove all keys from the target slot.", + "complexity": "O(N) where N is the number of keys in the target slot", + "group": "cluster", + "since": "9.0.0", + "arity": -3, + "container": "CLUSTER", + "function": "clusterCommand", + "command_flags": [ + "WRITE", + "NO_ASYNC_LOADING", + "ADMIN" + ], + "acl_categories": [ + "KEYSPACE" + ], + "reply_schema": { + "const": "OK" + }, + "arguments": [ + { + "name": "slot", + "type": "integer" + }, + { + "name": "flush-type", + "type": "oneof", + "optional": true, + "arguments": [ + { + "name": "async", + "type": "pure-token", + "token": "ASYNC" + }, + { + "name": "sync", + "type": "pure-token", + "token": "SYNC" + } + ] + } + ] + } +} diff --git a/tests/unit/cluster/cluster-flush-slot.tcl b/tests/unit/cluster/cluster-flush-slot.tcl new file mode 100644 index 0000000000..90848dc929 --- /dev/null +++ b/tests/unit/cluster/cluster-flush-slot.tcl @@ -0,0 +1,64 @@ +start_cluster 2 2 {tags {external:skip cluster}} { + test "SYNC Flush slot command" { + set key_slot [R 0 CLUSTER KEYSLOT FC] + set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] + + # set key + for {set i 0} {$i < 1000} {incr i} { + R 0 set "{FC}-$i" "value" + } + set after_keys_num [expr {$slot_keys_num + 1000}] + assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num + + # flush slot key + R 0 CLUSTER FLUSHSLOT $key_slot SYNC + assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0 + } + + test "ASYNC Flush slot command" { + set key_slot [R 0 CLUSTER KEYSLOT FC] + set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] + + # set key + for {set i 0} {$i < 1000} {incr i} { + R 0 set "{FC}-$i" "value" + } + set after_keys_num [expr {$slot_keys_num + 1000}] + assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num + + # flush slot key + R 0 CLUSTER FLUSHSLOT $key_slot ASYNC + assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0 + } +} + +start_cluster 2 2 {tags {external:skip cluster}} { + test "Flush slot command propagated to replica" { + set key_slot [R 0 CLUSTER KEYSLOT FC] + set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] + + # set key + for {set i 0} {$i < 1000} {incr i} { + R 0 set "{FC}-$i" "value" + } + set after_keys_num [expr {$slot_keys_num + 1000}] + assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num + + # flush slot key + R 0 CLUSTER FLUSHSLOT $key_slot SYNC + assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0 + + # assert flush slot propagated to replica + for {set l 0} {$l < 4} {incr l} { + puts "R $l info:" + puts [R $l info replication] + puts [R $l info commandSTATS] + } + + set info [R 2 info commandSTATS] + # not del cmd + assert_no_match "*cmdstat_del*" $info + # has flushslot cmd + assert_match "*cmdstat_cluster|flushslot:calls=1*" $info + } +} \ No newline at end of file