Skip to content

Commit dd071f9

Browse files
committed
Merge remote-tracking branch 'origin/unstable' into ttl-poc-new
2 parents 12a35e2 + 5699c8c commit dd071f9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+1261
-315
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ jobs:
136136
sudo apt-get update
137137
sudo apt-get install libc6-dev-i386 libstdc++-11-dev-i386-cross gcc-multilib g++-multilib
138138
make -j4 SERVER_CFLAGS='-Werror' 32bit USE_FAST_FLOAT=yes
139+
- name: unit tests
140+
run: |
141+
./src/valkey-unit-tests
139142
140143
build-libc-malloc:
141144
runs-on: ubuntu-latest

.github/workflows/codecov.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818

1919
- name: Install lcov and run test
2020
run: |
21-
sudo apt-get install lcov
21+
sudo apt-get install lcov tclx
2222
make lcov
2323
2424
- name: Upload code coverage

MAINTAINERS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Committers listed in alphabetical order by their github ID.
2424
| ------------------- | ----------------------------------------------- | ----------- |
2525
| Harkrishn Patro | [hpatro](https://github.com/hpatro) | Amazon |
2626
| Ran Shidlansik | [ranshid](https://github.com/ranshid) | Amazon |
27+
| Ricardo Dias | [rjd15372](https://github.com/rjd15372) | Percona |
2728

2829
### Former Maintainers and Committers
2930

src/aof.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ int writeAofManifestFile(sds buf) {
530530
sds tmp_am_name = getTempAofManifestFileName();
531531
sds tmp_am_filepath = makePath(server.aof_dirname, tmp_am_name);
532532

533-
int fd = open(tmp_am_filepath, O_WRONLY | O_TRUNC | O_CREAT, 0644);
533+
int fd = open(tmp_am_filepath, O_WRONLY | O_TRUNC | O_CREAT, 0666);
534534
if (fd == -1) {
535535
serverLog(LL_WARNING, "Can't open the AOF manifest file %s: %s", tmp_am_name, strerror(errno));
536536

@@ -728,7 +728,7 @@ void aofOpenIfNeededOnServerStart(void) {
728728

729729
/* Here we should use 'O_APPEND' flag. */
730730
sds aof_filepath = makePath(server.aof_dirname, aof_name);
731-
server.aof_fd = open(aof_filepath, O_WRONLY | O_APPEND | O_CREAT, 0644);
731+
server.aof_fd = open(aof_filepath, O_WRONLY | O_APPEND | O_CREAT, 0666);
732732
sdsfree(aof_filepath);
733733
if (server.aof_fd == -1) {
734734
serverLog(LL_WARNING, "Can't open the append-only file %s: %s", aof_name, strerror(errno));
@@ -790,7 +790,7 @@ int openNewIncrAofForAppend(void) {
790790
new_aof_name = sdsdup(getNewIncrAofName(temp_am));
791791
}
792792
sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name);
793-
newfd = open(new_aof_filepath, O_WRONLY | O_TRUNC | O_CREAT, 0644);
793+
newfd = open(new_aof_filepath, O_WRONLY | O_TRUNC | O_CREAT, 0666);
794794
sdsfree(new_aof_filepath);
795795
if (newfd == -1) {
796796
serverLog(LL_WARNING, "Can't open the append-only file %s: %s", new_aof_name, strerror(errno));

src/blocked.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
403403

404404
initClientBlockingState(c);
405405

406-
if (!c->flag.reprocessing_command) {
406+
if (!c->flag.reexecuting_command) {
407407
/* If the client is re-processing the command, we do not set the timeout
408408
* because we need to retain the client's original timeout. */
409409
c->bstate->timeout = timeout;
@@ -680,6 +680,7 @@ static void unblockClientOnKey(client *c, robj *key) {
680680
* we need to re process the command again */
681681
if (c->flag.pending_command) {
682682
c->flag.pending_command = 0;
683+
c->flag.reexecuting_command = 1;
683684
/* We want the command processing and the unblock handler (see RM_Call 'K' option)
684685
* to run atomically, this is why we must enter the execution unit here before
685686
* running the command, and exit the execution unit after calling the unblock handler (if exists).
@@ -698,6 +699,8 @@ static void unblockClientOnKey(client *c, robj *key) {
698699
}
699700
exitExecutionUnit();
700701
afterCommand(c);
702+
/* Clear the reexecuting_command flag after the proc is executed. */
703+
c->flag.reexecuting_command = 0;
701704
server.current_client = old_client;
702705
}
703706
}

src/cluster.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1540,3 +1540,22 @@ void resetClusterStats(void) {
15401540

15411541
clusterSlotStatResetAll();
15421542
}
1543+
1544+
1545+
void clusterCommandFlushslot(client *c) {
1546+
int slot;
1547+
int lazy = server.lazyfree_lazy_user_flush;
1548+
if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return;
1549+
if (c->argc == 4) {
1550+
if (!strcasecmp(c->argv[3]->ptr, "async")) {
1551+
lazy = 1;
1552+
} else if (!strcasecmp(c->argv[3]->ptr, "sync")) {
1553+
lazy = 0;
1554+
} else {
1555+
addReplyErrorObject(c, shared.syntaxerr);
1556+
return;
1557+
}
1558+
}
1559+
delKeysInSlot(slot, lazy, false, true);
1560+
addReply(c, shared.ok);
1561+
}

src/cluster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#ifndef __CLUSTER_H
22
#define __CLUSTER_H
33

4+
#include <stdbool.h>
45
/*-----------------------------------------------------------------------------
56
* Cluster exported API.
67
*----------------------------------------------------------------------------*/
@@ -133,4 +134,5 @@ int isNodeAvailable(clusterNode *node);
133134
long long getNodeReplicationOffset(clusterNode *node);
134135
sds aggregateClientOutputBuffer(client *c);
135136
void resetClusterStats(void);
137+
unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, bool send_del_event);
136138
#endif /* __CLUSTER_H */

src/cluster_legacy.c

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ void moduleCallClusterReceivers(const char *sender_id,
9898
const char *clusterGetMessageTypeString(int type);
9999
void removeChannelsInSlot(unsigned int slot);
100100
unsigned int countChannelsInSlot(unsigned int hashslot);
101-
unsigned int delKeysInSlot(unsigned int hashslot);
102101
void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
103102
list *clusterLookupNodeListByShardId(const char *shard_id);
104103
void clusterRemoveNodeFromShard(clusterNode *node);
@@ -127,6 +126,7 @@ int verifyClusterNodeId(const char *name, int length);
127126
sds clusterEncodeOpenSlotsAuxField(int rdbflags);
128127
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
129128
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);
129+
void clusterCommandFlushslot(client *c);
130130

131131
/* Only primaries that own slots have voting rights.
132132
* Returns 1 if the node has voting rights, otherwise returns 0. */
@@ -327,7 +327,7 @@ int auxShardIdSetter(clusterNode *n, void *value, size_t length) {
327327
}
328328

329329
sds auxShardIdGetter(clusterNode *n, sds s) {
330-
return sdscatprintf(s, "%.40s", n->shard_id);
330+
return sdscatlen(s, n->shard_id, CLUSTER_NAMELEN);
331331
}
332332

333333
int auxShardIdPresent(clusterNode *n) {
@@ -344,7 +344,7 @@ int auxHumanNodenameSetter(clusterNode *n, void *value, size_t length) {
344344
}
345345

346346
sds auxHumanNodenameGetter(clusterNode *n, sds s) {
347-
return sdscatprintf(s, "%s", n->human_nodename);
347+
return sdscat(s, n->human_nodename);
348348
}
349349

350350
int auxHumanNodenamePresent(clusterNode *n) {
@@ -370,7 +370,7 @@ int auxAnnounceClientIpV4Setter(clusterNode *n, void *value, size_t length) {
370370
}
371371

372372
sds auxAnnounceClientIpV4Getter(clusterNode *n, sds s) {
373-
return sdscatprintf(s, "%s", n->announce_client_ipv4);
373+
return sdscat(s, n->announce_client_ipv4);
374374
}
375375

376376
int auxAnnounceClientIpV4Present(clusterNode *n) {
@@ -396,7 +396,7 @@ int auxAnnounceClientIpV6Setter(clusterNode *n, void *value, size_t length) {
396396
}
397397

398398
sds auxAnnounceClientIpV6Getter(clusterNode *n, sds s) {
399-
return sdscatprintf(s, "%s", n->announce_client_ipv6);
399+
return sdscat(s, n->announce_client_ipv6);
400400
}
401401

402402
int auxAnnounceClientIpV6Present(clusterNode *n) {
@@ -415,7 +415,7 @@ int auxTcpPortSetter(clusterNode *n, void *value, size_t length) {
415415
}
416416

417417
sds auxTcpPortGetter(clusterNode *n, sds s) {
418-
return sdscatprintf(s, "%d", n->tcp_port);
418+
return sdscatfmt(s, "%i", n->tcp_port);
419419
}
420420

421421
int auxTcpPortPresent(clusterNode *n) {
@@ -434,7 +434,7 @@ int auxTlsPortSetter(clusterNode *n, void *value, size_t length) {
434434
}
435435

436436
sds auxTlsPortGetter(clusterNode *n, sds s) {
437-
return sdscatprintf(s, "%d", n->tls_port);
437+
return sdscatfmt(s, "%i", n->tls_port);
438438
}
439439

440440
int auxTlsPortPresent(clusterNode *n) {
@@ -2144,11 +2144,12 @@ void clusterBlacklistAddNode(clusterNode *node) {
21442144
/* Return non-zero if the specified node ID exists in the blacklist.
21452145
* You don't need to pass an sds string here, any pointer to 40 bytes
21462146
* will work. */
2147-
int clusterBlacklistExists(char *nodeid) {
2148-
sds id = sdsnewlen(nodeid, CLUSTER_NAMELEN);
2147+
int clusterBlacklistExists(char *nodeid, size_t len) {
2148+
sds id = sdsnewlen(nodeid, len);
21492149
int retval;
21502150

21512151
clusterBlacklistCleanup();
2152+
21522153
retval = dictFind(server.cluster->nodes_black_list, id) != NULL;
21532154
sdsfree(id);
21542155
return retval;
@@ -2484,7 +2485,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
24842485
* Note that we require that the sender of this gossip message
24852486
* is a well known node in our cluster, otherwise we risk
24862487
* joining another cluster. */
2487-
if (sender && !(flags & CLUSTER_NODE_NOADDR) && !clusterBlacklistExists(g->nodename)) {
2488+
if (sender && !(flags & CLUSTER_NODE_NOADDR) && !clusterBlacklistExists(g->nodename, CLUSTER_NAMELEN)) {
24882489
clusterNode *node;
24892490
node = createClusterNode(g->nodename, flags);
24902491
memcpy(node->ip, g->ip, NET_IP_STR_LEN);
@@ -2861,7 +2862,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
28612862
for (int j = 0; j < dirty_slots_count; j++) {
28622863
serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", dirty_slots[j],
28632864
myself->name, myself->human_nodename, myself->shard_id);
2864-
delKeysInSlot(dirty_slots[j]);
2865+
delKeysInSlot(dirty_slots[j], server.lazyfree_lazy_server_del, true, false);
28652866
}
28662867
}
28672868
}
@@ -3196,7 +3197,7 @@ int clusterIsValidPacket(clusterLink *link) {
31963197
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
31973198
clusterMsgPingExt *ext = getInitialPingExt(hdr, count);
31983199
while (extensions--) {
3199-
uint16_t extlen = getPingExtLength(ext);
3200+
uint32_t extlen = getPingExtLength(ext);
32003201
if (extlen % 8 != 0) {
32013202
serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)",
32023203
clusterGetMessageTypeString(type), (int)extlen);
@@ -3534,6 +3535,7 @@ int clusterProcessPacket(clusterLink *link) {
35343535
* the clients, and the replica will never initiate a failover since the
35353536
* node is not actually in FAIL state. */
35363537
if (!nodeFailed(noaddr_node)) {
3538+
noaddr_node->flags &= ~CLUSTER_NODE_PFAIL;
35373539
noaddr_node->flags |= CLUSTER_NODE_FAIL;
35383540
noaddr_node->fail_time = now;
35393541
clusterSendFail(noaddr_node->name);
@@ -5915,7 +5917,7 @@ int verifyClusterConfigWithData(void) {
59155917
server.cluster->importing_slots_from[j]->shard_id, j, server.cluster->slots[j]->name,
59165918
server.cluster->slots[j]->human_nodename, server.cluster->slots[j]->shard_id);
59175919
}
5918-
delKeysInSlot(j);
5920+
delKeysInSlot(j, server.lazyfree_lazy_server_del, true, false);
59195921
}
59205922
}
59215923
if (update_config) clusterSaveConfigOrDie(1);
@@ -6045,7 +6047,7 @@ sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) {
60456047
continue;
60466048
}
60476049
if (auxFieldHandlers[i].isPresent(node)) {
6048-
ci = sdscatprintf(ci, ",%s=", auxFieldHandlers[i].field);
6050+
ci = sdscatfmt(ci, ",%s=", auxFieldHandlers[i].field);
60496051
ci = auxFieldHandlers[i].getter(node, ci);
60506052
}
60516053
}
@@ -6097,9 +6099,13 @@ sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) {
60976099
if (node->flags & CLUSTER_NODE_MYSELF) {
60986100
for (j = 0; j < CLUSTER_SLOTS; j++) {
60996101
if (server.cluster->migrating_slots_to[j]) {
6100-
ci = sdscatprintf(ci, " [%d->-%.40s]", j, server.cluster->migrating_slots_to[j]->name);
6102+
ci = sdscatfmt(ci, " [%i->-", j);
6103+
ci = sdscatlen(ci, server.cluster->migrating_slots_to[j]->name, CLUSTER_NAMELEN);
6104+
ci = sdscat(ci, "]");
61016105
} else if (server.cluster->importing_slots_from[j]) {
6102-
ci = sdscatprintf(ci, " [%d-<-%.40s]", j, server.cluster->importing_slots_from[j]->name);
6106+
ci = sdscatfmt(ci, " [%i-<-", j);
6107+
ci = sdscatlen(ci, server.cluster->importing_slots_from[j]->name, CLUSTER_NAMELEN);
6108+
ci = sdscat(ci, "]");
61036109
}
61046110
}
61056111
}
@@ -6530,13 +6536,14 @@ void removeChannelsInSlot(unsigned int slot) {
65306536

65316537
/* Remove all the keys in the specified hash slot.
65326538
* The number of removed items is returned. */
6533-
unsigned int delKeysInSlot(unsigned int hashslot) {
6539+
unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, bool send_del_event) {
65346540
if (!countKeysInSlot(hashslot)) return 0;
65356541

65366542
/* We may lose a slot during the pause. We need to track this
65376543
* state so that we don't assert in propagateNow(). */
65386544
server.server_del_keys_in_slot = 1;
65396545
unsigned int j = 0;
6546+
int before_execution_nesting = server.execution_nesting;
65406547

65416548
for (int i = 0; i < server.dbnum; i++) {
65426549
kvstoreHashtableIterator *kvs_di = NULL;
@@ -6548,13 +6555,23 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
65486555
enterExecutionUnit(1, 0);
65496556
sds sdskey = objectGetKey(valkey);
65506557
robj *key = createStringObject(sdskey, sdslen(sdskey));
6551-
dbDelete(&db, key);
6552-
propagateDeletion(&db, key, server.lazyfree_lazy_server_del);
6558+
if (lazy) {
6559+
dbAsyncDelete(&db, key);
6560+
} else {
6561+
dbSyncDelete(&db, key);
6562+
}
6563+
// if is command, skip del propagate
6564+
if (propagate_del) propagateDeletion(&db, key, lazy);
65536565
signalModifiedKey(NULL, &db, key);
6554-
/* The keys are not actually logically deleted from the database, just moved to another node.
6555-
* The modules needs to know that these keys are no longer available locally, so just send the
6556-
* keyspace notification to the modules, but not to clients. */
6557-
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id);
6566+
if (send_del_event) {
6567+
/* In the `cluster flushslot` scenario, the keys are actually deleted so notify everyone. */
6568+
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id);
6569+
} else {
6570+
/* The keys are not actually logically deleted from the database, just moved to another node.
6571+
* The modules needs to know that these keys are no longer available locally, so just send the
6572+
* keyspace notification to the modules, but not to clients. */
6573+
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id);
6574+
}
65586575
exitExecutionUnit();
65596576
postExecutionUnitOperations();
65606577
decrRefCount(key);
@@ -6564,7 +6581,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
65646581
kvstoreReleaseHashtableIterator(kvs_di);
65656582
}
65666583
server.server_del_keys_in_slot = 0;
6567-
serverAssert(server.execution_nesting == 0);
6584+
serverAssert(server.execution_nesting == before_execution_nesting);
65686585
return j;
65696586
}
65706587

@@ -7127,7 +7144,7 @@ int clusterCommandSpecial(client *c) {
71277144
/* CLUSTER FORGET <NODE ID> */
71287145
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
71297146
if (!n) {
7130-
if (clusterBlacklistExists((char *)c->argv[2]->ptr))
7147+
if (clusterBlacklistExists((char *)c->argv[2]->ptr, sdslen(c->argv[2]->ptr)))
71317148
/* Already forgotten. The deletion may have been gossipped by
71327149
* another node, so we pretend it succeeded. */
71337150
addReply(c, shared.ok);
@@ -7366,6 +7383,9 @@ int clusterCommandSpecial(client *c) {
73667383
} else if (!strcasecmp(c->argv[1]->ptr, "links") && c->argc == 2) {
73677384
/* CLUSTER LINKS */
73687385
addReplyClusterLinksDescription(c);
7386+
} else if (!strcasecmp(c->argv[1]->ptr, "flushslot") && (c->argc == 3 || c->argc == 4)) {
7387+
/* CLUSTER FLUSHSLOT <slot> [ASYNC|SYNC] */
7388+
clusterCommandFlushslot(c);
73697389
} else {
73707390
return 0;
73717391
}
@@ -7558,6 +7578,5 @@ int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s) {
75587578
server.cluster->migrating_slots_to[slot] = node;
75597579
}
75607580
}
7561-
75627581
return C_OK;
75637582
}

0 commit comments

Comments
 (0)