Skip to content

Commit 0d751a2

Browse files
committed
1. Move clusterCommandFlushslot to cluster.c
2. add command scene to delKeysInSlot. In command sence, delKeysInSlot will not propagate del command, and notify del event.
1 parent 1a158ea commit 0d751a2

File tree

3 files changed

+35
-27
lines changed

3 files changed

+35
-27
lines changed

src/cluster.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,3 +1495,22 @@ void resetClusterStats(void) {
14951495

14961496
clusterSlotStatResetAll();
14971497
}
1498+
1499+
1500+
void clusterCommandFlushslot(client *c) {
1501+
int slot;
1502+
int lazy = server.lazyfree_lazy_user_flush;
1503+
if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return;
1504+
if (c->argc == 4) {
1505+
if (!strcasecmp(c->argv[3]->ptr, "async")) {
1506+
lazy = 1;
1507+
} else if (!strcasecmp(c->argv[3]->ptr, "sync")) {
1508+
lazy = 0;
1509+
} else {
1510+
addReplyErrorObject(c, shared.syntaxerr);
1511+
return;
1512+
}
1513+
}
1514+
delKeysInSlot(slot, lazy, true);
1515+
addReply(c, shared.ok);
1516+
}

src/cluster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,5 @@ int isNodeAvailable(clusterNode *node);
133133
long long getNodeReplicationOffset(clusterNode *node);
134134
sds aggregateClientOutputBuffer(client *c);
135135
void resetClusterStats(void);
136+
unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool is_cmd);
136137
#endif /* __CLUSTER_H */

src/cluster_legacy.c

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ void moduleCallClusterReceivers(const char *sender_id,
9898
const char *clusterGetMessageTypeString(int type);
9999
void removeChannelsInSlot(unsigned int slot);
100100
unsigned int countChannelsInSlot(unsigned int hashslot);
101-
unsigned int delKeysInSlot(unsigned int hashslot, int lazy);
102101
void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
103102
list *clusterLookupNodeListByShardId(const char *shard_id);
104103
void clusterRemoveNodeFromShard(clusterNode *node);
@@ -2784,7 +2783,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
27842783
for (int j = 0; j < dirty_slots_count; j++) {
27852784
serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", dirty_slots[j],
27862785
myself->name, myself->human_nodename, myself->shard_id);
2787-
delKeysInSlot(dirty_slots[j], server.lazyfree_lazy_server_del);
2786+
delKeysInSlot(dirty_slots[j], server.lazyfree_lazy_server_del, false);
27882787
}
27892788
}
27902789
}
@@ -5843,7 +5842,7 @@ int verifyClusterConfigWithData(void) {
58435842
server.cluster->importing_slots_from[j]->shard_id, j, server.cluster->slots[j]->name,
58445843
server.cluster->slots[j]->human_nodename, server.cluster->slots[j]->shard_id);
58455844
}
5846-
delKeysInSlot(j, server.lazyfree_lazy_server_del);
5845+
delKeysInSlot(j, server.lazyfree_lazy_server_del, false);
58475846
}
58485847
}
58495848
if (update_config) clusterSaveConfigOrDie(1);
@@ -6434,7 +6433,7 @@ void removeChannelsInSlot(unsigned int slot) {
64346433

64356434
/* Remove all the keys in the specified hash slot.
64366435
* The number of removed items is returned. */
6437-
unsigned int delKeysInSlot(unsigned int hashslot, int lazy) {
6436+
unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool is_cmd) {
64386437
if (!countKeysInSlot(hashslot)) return 0;
64396438

64406439
/* We may lose a slot during the pause. We need to track this
@@ -6456,13 +6455,19 @@ unsigned int delKeysInSlot(unsigned int hashslot, int lazy) {
64566455
} else {
64576456
dbSyncDelete(&server.db[0], key);
64586457
}
6459-
dbDelete(&server.db[0], key);
6460-
propagateDeletion(&server.db[0], key, lazy);
6458+
// if is command, skip del propagate
6459+
if (!is_cmd) propagateDeletion(&server.db[0], key, lazy);
64616460
signalModifiedKey(NULL, &server.db[0], key);
6462-
/* The keys are not actually logically deleted from the database, just moved to another node.
6463-
* The modules needs to know that these keys are no longer available locally, so just send the
6464-
* keyspace notification to the modules, but not to clients. */
6465-
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
6461+
if (is_cmd) {
6462+
/* In cluster flushslot scene, the keys are actually deleted. */
6463+
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
6464+
} else {
6465+
/* The keys are not actually logically deleted from the database, just moved to another node.
6466+
* The modules needs to know that these keys are no longer available locally, so just send the
6467+
* keyspace notification to the modules, but not to clients. */
6468+
/* In cluster flushslot scene, the is actually deleted, fire del event. */
6469+
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
6470+
}
64666471
exitExecutionUnit();
64676472
postExecutionUnitOperations();
64686473
decrRefCount(key);
@@ -7426,20 +7431,3 @@ int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s) {
74267431
return C_OK;
74277432
}
74287433

7429-
void clusterCommandFlushslot(client *c) {
7430-
int slot;
7431-
int lazy = server.lazyfree_lazy_user_flush;
7432-
if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return;
7433-
if (c->argc == 4) {
7434-
if (!strcasecmp(c->argv[3]->ptr, "async")) {
7435-
lazy = 1;
7436-
} else if (!strcasecmp(c->argv[3]->ptr, "sync")) {
7437-
lazy = 0;
7438-
} else {
7439-
addReplyErrorObject(c, shared.syntaxerr);
7440-
return;
7441-
}
7442-
}
7443-
delKeysInSlot(slot, lazy);
7444-
addReply(c, shared.ok);
7445-
}

0 commit comments

Comments
 (0)