diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6a1a9460fb..b21c9f3be8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -70,7 +70,7 @@ jobs: build-debian-old: runs-on: ubuntu-latest - container: debian:buster + container: debian:bullseye steps: - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - name: make diff --git a/.github/workflows/clang-format.yml b/.github/workflows/clang-format.yml deleted file mode 100644 index efc63a1f6f..0000000000 --- a/.github/workflows/clang-format.yml +++ /dev/null @@ -1,52 +0,0 @@ -name: Clang Format Check - -on: - pull_request: - paths: - - 'src/**' - -concurrency: - group: clang-${{ github.head_ref || github.ref }} - cancel-in-progress: true - -jobs: - clang-format-check: - runs-on: ubuntu-latest - - steps: - - name: Checkout code - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - - - name: Set up Clang - run: | - sudo apt-get update -y - sudo apt-get upgrade -y - sudo apt-get install software-properties-common -y - wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | gpg --dearmor | sudo tee /usr/share/keyrings/llvm-toolchain.gpg > /dev/null - echo "deb [signed-by=/usr/share/keyrings/llvm-toolchain.gpg] http://apt.llvm.org/$(lsb_release -cs)/ llvm-toolchain-$(lsb_release -cs)-18 main" | sudo tee /etc/apt/sources.list.d/llvm.list - sudo apt-get update -y - sudo apt-get install clang-format-18 -y - - name: Run clang-format - id: clang-format - run: | - # Run clang-format and capture the diff - cd src - shopt -s globstar - clang-format-18 -i **/*.c **/*.h - # Capture the diff output - DIFF=$(git diff) - if [ ! -z "$DIFF" ]; then - # Encode the diff in Base64 to ensure it's handled as a single line - ENCODED_DIFF=$(echo "$DIFF" | base64 -w 0) - echo "diff=$ENCODED_DIFF" >> $GITHUB_OUTPUT - fi - shell: bash - - - name: Check for formatting changes - if: ${{ steps.clang-format.outputs.diff }} - run: | - echo "ERROR: Code is not formatted correctly. Here is the diff:" - # Decode the Base64 diff to display it - echo "${{ steps.clang-format.outputs.diff }}" | base64 --decode - exit 1 - shell: bash diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index edaa91a389..8af559ecd8 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -1150,7 +1150,7 @@ jobs: if: | (github.event_name == 'workflow_dispatch' || (github.event_name == 'schedule' && github.repository == 'valkey-io/valkey') || - (github.event_name == 'pull_request' && github.event.pull_request.base.ref != 'unstable')) && + (github.event_name == 'pull_request' && (contains(github.event.pull_request.labels.*.name, 'run-extra-tests') || github.event.pull_request.base.ref != 'unstable'))) && !contains(github.event.inputs.skipjobs, 'reply-schema') steps: - name: prep diff --git a/00-RELEASENOTES b/00-RELEASENOTES index 17e7d82f1f..7b1937b2ee 100644 --- a/00-RELEASENOTES +++ b/00-RELEASENOTES @@ -9,6 +9,51 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP. SECURITY: There are security fixes in the release. -------------------------------------------------------------------------------- +================================================================================ +Valkey 8.0.5 - Released Thu 22 Aug 2025 +================================================================================ + +Upgrade urgency SECURITY: This release includes security fixes we recommend you +apply as soon as possible. + +Bug fixes +========= +* Fix clients remaining blocked when reprocessing commands after certain + blocking operations (#2109) +* Fix a memory corruption issue in the sharded pub/sub unsubscribe logic (#2137) +* Fix potential memory leak by ensuring module context is freed when `aux_save2` + callback writes no data (#2132) +* Fix `CLIENT UNBLOCK` triggering unexpected errors when used on paused clients + (#2117) +* Fix missing NULL check on `SSL_new()` when creating outgoing TLS connections + (#2140) +* Fix incorrect casting of ping extension lengths to prevent silent packet drops + (#2144) +* Fix replica failover stall due to outdated config epoch (#2178) +* Fix incorrect port/tls-port info in `CLUSTER SLOTS`/`CLUSTER NODES` after + dynamic config change (#2186) +* Ensure empty error tables in Lua scripts don't crash Valkey (#2229) +* Fix client tracking memory overhead calculation (#2360) +* Handle divergent shard-id from nodes.conf and reconcile to the primary node's + shard-id (#2174) +* Fix pre-size hashtables per slot when reading RDB files (#2466) + +Behavior changes +================ +* Trigger election immediately during a forced manual failover (`CLUSTER + FAILOVER FORCE`) to avoid delay (#1067) +* Reset ongoing election state when initiating a new manual failover (#1274) + +Logging and Tooling Improvements +================================ +* Add support to drop all cluster packets (#1252) +* Improve log clarity in failover auth denial message (#1341) + +Security fixes +============== +* CVE-2025-27151: Check length of AOF file name in valkey-check-aof and reject + paths longer than `PATH_MAX` (#2146) + ================================================================================ Valkey 8.0.4 - Released Mon 07 July 2025 ================================================================================ diff --git a/src/acl.c b/src/acl.c index 14087ea2f4..d7cb504393 100644 --- a/src/acl.c +++ b/src/acl.c @@ -297,11 +297,6 @@ int ACLListMatchSds(void *a, void *b) { return sdscmp(a, b) == 0; } -/* Method to free list elements from ACL users password/patterns lists. */ -void ACLListFreeSds(void *item) { - sdsfree(item); -} - /* Method to duplicate list elements from ACL users password/patterns lists. */ void *ACLListDupSds(void *item) { return sdsdup(item); @@ -374,7 +369,7 @@ aclSelector *ACLCreateSelector(int flags) { listSetFreeMethod(selector->patterns, ACLListFreeKeyPattern); listSetDupMethod(selector->patterns, ACLListDupKeyPattern); listSetMatchMethod(selector->channels, ACLListMatchSds); - listSetFreeMethod(selector->channels, ACLListFreeSds); + listSetFreeMethod(selector->channels, sdsfreeVoid); listSetDupMethod(selector->channels, ACLListDupSds); memset(selector->allowed_commands, 0, sizeof(selector->allowed_commands)); @@ -445,7 +440,7 @@ user *ACLCreateUser(const char *name, size_t namelen) { u->passwords = listCreate(); u->acl_string = NULL; listSetMatchMethod(u->passwords, ACLListMatchSds); - listSetFreeMethod(u->passwords, ACLListFreeSds); + listSetFreeMethod(u->passwords, sdsfreeVoid); listSetDupMethod(u->passwords, ACLListDupSds); u->selectors = listCreate(); @@ -489,6 +484,11 @@ void ACLFreeUser(user *u) { zfree(u); } +/* Used for generic free functions. */ +static void ACLFreeUserVoid(void *u) { + ACLFreeUser(u); +} + /* When a user is deleted we need to cycle the active * connections in order to kill all the pending ones that * are authenticated with such user. */ @@ -2449,12 +2449,12 @@ sds ACLLoadFromFile(const char *filename) { c->user = new_user; } - if (user_channels) raxFreeWithCallback(user_channels, (void (*)(void *))listRelease); - raxFreeWithCallback(old_users, (void (*)(void *))ACLFreeUser); + if (user_channels) raxFreeWithCallback(user_channels, listReleaseVoid); + raxFreeWithCallback(old_users, ACLFreeUserVoid); sdsfree(errors); return NULL; } else { - raxFreeWithCallback(Users, (void (*)(void *))ACLFreeUser); + raxFreeWithCallback(Users, ACLFreeUserVoid); Users = old_users; errors = sdscat(errors, "WARNING: ACL errors detected, no change to the previously active ACL rules was performed"); diff --git a/src/adlist.c b/src/adlist.c index 11b152592b..0dc77cc038 100644 --- a/src/adlist.c +++ b/src/adlist.c @@ -77,6 +77,12 @@ void listRelease(list *list) { zfree(list); } +/* Just like listRelease, but takes the list as a (void *). + * Useful as generic free callback. */ +void listReleaseVoid(void *l) { + listRelease((list *)l); +} + /* Add a new node to the list, to head, containing the specified 'value' * pointer as value. * diff --git a/src/adlist.h b/src/adlist.h index bfc4280434..c642c1c791 100644 --- a/src/adlist.h +++ b/src/adlist.h @@ -72,6 +72,7 @@ typedef struct list { /* Prototypes */ list *listCreate(void); void listRelease(list *list); +void listReleaseVoid(void *list); void listEmpty(list *list); list *listAddNodeHead(list *list, void *value); list *listAddNodeTail(list *list, void *value); diff --git a/src/bio.c b/src/bio.c index e55c729f74..ab07581ba9 100644 --- a/src/bio.c +++ b/src/bio.c @@ -142,8 +142,9 @@ void bioInit(void) { * responsible for. */ for (j = 0; j < BIO_WORKER_NUM; j++) { void *arg = (void *)(unsigned long)j; - if (pthread_create(&thread, &attr, bioProcessBackgroundJobs, arg) != 0) { - serverLog(LL_WARNING, "Fatal: Can't initialize Background Jobs. Error message: %s", strerror(errno)); + int err = pthread_create(&thread, &attr, bioProcessBackgroundJobs, arg); + if (err) { + serverLog(LL_WARNING, "Fatal: Can't initialize Background Jobs. Error message: %s", strerror(err)); exit(1); } bio_threads[j] = thread; @@ -222,8 +223,9 @@ void *bioProcessBackgroundJobs(void *arg) { * receive the watchdog signal. */ sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); - if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) - serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); + int err = pthread_sigmask(SIG_BLOCK, &sigset, NULL); + if (err) + serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(err)); while (1) { listNode *ln; diff --git a/src/blocked.c b/src/blocked.c index 3496b21d40..be034e4204 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -227,6 +227,20 @@ void unblockClient(client *c, int queue_for_reprocessing) { if (queue_for_reprocessing) queueClientForReprocessing(c); } +/* Check if the specified client can be safely timed out using + * unblockClientOnTimeout(). */ +int blockedClientMayTimeout(client *c) { + if (c->bstate.btype == BLOCKED_MODULE) { + return moduleBlockedClientMayTimeout(c); + } + + if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM || + c->bstate.btype == BLOCKED_WAIT) { + return 1; + } + return 0; +} + /* This function gets called when a blocked client timed out in order to * send it a reply of some kind. After this function is called, * unblockClient() will be called with the same client as argument. */ @@ -374,7 +388,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo list *l; int j; - if (!c->flag.reprocessing_command) { + if (!c->flag.reexecuting_command) { /* If the client is re-processing the command, we do not set the timeout * because we need to retain the client's original timeout. */ c->bstate.timeout = timeout; @@ -648,6 +662,7 @@ static void unblockClientOnKey(client *c, robj *key) { * we need to re process the command again */ if (c->flag.pending_command) { c->flag.pending_command = 0; + c->flag.reexecuting_command = 1; /* We want the command processing and the unblock handler (see RM_Call 'K' option) * to run atomically, this is why we must enter the execution unit here before * running the command, and exit the execution unit after calling the unblock handler (if exists). @@ -666,6 +681,8 @@ static void unblockClientOnKey(client *c, robj *key) { } exitExecutionUnit(); afterCommand(c); + /* Clear the reexecuting_command flag after the proc is executed. */ + c->flag.reexecuting_command = 0; server.current_client = old_client; } } diff --git a/src/call_reply.c b/src/call_reply.c index 00d196081e..dc981b8be8 100644 --- a/src/call_reply.c +++ b/src/call_reply.c @@ -559,7 +559,7 @@ CallReply *callReplyCreateError(sds reply, void *private_data) { sdsfree(reply); } list *deferred_error_list = listCreate(); - listSetFreeMethod(deferred_error_list, (void (*)(void *))sdsfree); + listSetFreeMethod(deferred_error_list, sdsfreeVoid); listAddNodeTail(deferred_error_list, sdsnew(err_buff)); return callReplyCreate(err_buff, deferred_error_list, private_data); } diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 087bf3758d..5789c21c57 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -293,10 +293,16 @@ int auxShardIdSetter(clusterNode *n, void *value, size_t length) { } memcpy(n->shard_id, value, CLUSTER_NAMELEN); /* if n already has replicas, make sure they all agree - * on the shard id */ + * on the shard id. If not, update them. */ for (int i = 0; i < n->num_replicas; i++) { if (memcmp(n->replicas[i]->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) { - return C_ERR; + serverLog(LL_NOTICE, + "Node %.40s has a different shard id (%.40s) than its primary's shard id %.40s (%.40s). " + "Updating replica's shard id to match primary's shard id.", + n->replicas[i]->name, n->replicas[i]->shard_id, n->name, n->shard_id); + clusterRemoveNodeFromShard(n->replicas[i]); + memcpy(n->replicas[i]->shard_id, n->shard_id, CLUSTER_NAMELEN); + clusterAddNodeToShard(n->shard_id, n->replicas[i]); } } clusterAddNodeToShard(value, n); @@ -696,10 +702,16 @@ int clusterLoadConfig(char *filename) { clusterAddNodeToShard(primary->shard_id, n); } else if (clusterGetNodesInMyShard(primary) != NULL && memcmp(primary->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) { - /* If the primary has been added to a shard, make sure this - * node has the same persisted shard id as the primary. */ - sdsfreesplitres(argv, argc); - goto fmterr; + /* If the primary has been added to a shard and this replica has + * a different shard id stored in nodes.conf, update it to match + * the primary instead of aborting the startup. */ + serverLog(LL_NOTICE, + "Node %.40s has a different shard id (%.40s) than its primary %.40s (%.40s). " + "Updating replica's shard id to match primary's shard id.", + n->name, n->shard_id, primary->name, primary->shard_id); + clusterRemoveNodeFromShard(n); + memcpy(n->shard_id, primary->shard_id, CLUSTER_NAMELEN); + clusterAddNodeToShard(primary->shard_id, n); } n->replicaof = primary; clusterNodeAddReplica(primary, n); @@ -970,6 +982,7 @@ void clusterUpdateMyselfFlags(void) { void clusterUpdateMyselfAnnouncedPorts(void) { if (!myself) return; deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } /* We want to take myself->ip in sync with the cluster-announce-ip option. @@ -1000,6 +1013,7 @@ void clusterUpdateMyselfIp(void) { } else { myself->ip[0] = '\0'; /* Force autodetection. */ } + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } } @@ -2481,6 +2495,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc * need to delete all the keys in the slots we lost ownership. */ uint16_t dirty_slots[CLUSTER_SLOTS]; int dirty_slots_count = 0; + int delete_dirty_slots = 0; /* We should detect if sender is new primary of our shard. * We will know it if all our slots were migrated to sender, and sender @@ -2707,6 +2722,12 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc serverLog(LL_NOTICE, "My last slot was migrated to node %.40s (%s) in shard %.40s. I am now an empty primary.", sender->name, sender->human_nodename, sender->shard_id); + /* We may still have dirty slots when we became a empty primary due to + * a bad migration. + * + * In order to maintain a consistent state between keys and slots + * we need to remove all the keys from the slots we lost. */ + delete_dirty_slots = 1; } } else if (dirty_slots_count) { /* If we are here, we received an update message which removed @@ -2716,6 +2737,10 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc * * In order to maintain a consistent state between keys and slots * we need to remove all the keys from the slots we lost. */ + delete_dirty_slots = 1; + } + + if (delete_dirty_slots) { for (int j = 0; j < dirty_slots_count; j++) { serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", dirty_slots[j], myself->name, myself->human_nodename, myself->shard_id); @@ -3015,7 +3040,7 @@ int clusterIsValidPacket(clusterLink *link) { return 0; } - if (type == server.cluster_drop_packet_filter) { + if (type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2) { serverLog(LL_WARNING, "Dropping packet that matches debug drop filter"); return 0; } @@ -3034,7 +3059,7 @@ int clusterIsValidPacket(clusterLink *link) { if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) { clusterMsgPingExt *ext = getInitialPingExt(hdr, count); while (extensions--) { - uint16_t extlen = getPingExtLength(ext); + uint32_t extlen = getPingExtLength(ext); if (extlen % 8 != 0) { serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)", clusterGetMessageTypeString(type), (int)extlen); @@ -3104,7 +3129,8 @@ int clusterProcessPacket(clusterLink *link) { if (!clusterIsValidPacket(link)) { clusterMsg *hdr = (clusterMsg *)link->rcvbuf; uint16_t type = ntohs(hdr->type); - if (server.debug_cluster_close_link_on_packet_drop && type == server.cluster_drop_packet_filter) { + if (server.debug_cluster_close_link_on_packet_drop && + (type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2)) { freeClusterLink(link); serverLog(LL_WARNING, "Closing link for matching packet type %hu", type); return 0; @@ -3168,6 +3194,25 @@ int clusterProcessPacket(clusterLink *link) { if (sender_claims_to_be_primary && sender_claimed_config_epoch > sender->configEpoch) { sender->configEpoch = sender_claimed_config_epoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); + + if (server.cluster->failover_auth_time && server.cluster->failover_auth_sent && + sender->configEpoch >= server.cluster->failover_auth_epoch) { + /* Another node has claimed an epoch greater than or equal to ours. + * If we have an ongoing election, reset it because we cannot win + * with an epoch smaller than or equal to the incoming claim. This + * allows us to start a new election as soon as possible. */ + server.cluster->failover_auth_time = 0; + serverLog(LL_WARNING, + "Failover election in progress for epoch %llu, but received a claim from " + "node %.40s (%s) with an equal or higher epoch %llu. Resetting the election " + "since we cannot win an election in the past.", + (unsigned long long)server.cluster->failover_auth_epoch, + sender->name, sender->human_nodename, + (unsigned long long)sender->configEpoch); + /* Maybe we could start a new election, set a flag here to make sure + * we check as soon as possible, instead of waiting for a cron. */ + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); + } } /* Update the replication offset info for this node. */ sender->repl_offset = ntohu64(hdr->offset); @@ -3484,9 +3529,10 @@ int clusterProcessPacket(clusterLink *link) { if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue; if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) { serverLog(LL_VERBOSE, - "Node %.40s has old slots configuration, sending " - "an UPDATE message about %.40s", - sender->name, server.cluster->slots[j]->name); + "Node %.40s (%s) has old slots configuration, sending " + "an UPDATE message about %.40s (%s)", + sender->name, sender->human_nodename, server.cluster->slots[j]->name, + server.cluster->slots[j]->human_nodename); clusterSendUpdate(sender->link, server.cluster->slots[j]); /* TODO: instead of exiting the loop send every other @@ -4392,9 +4438,19 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { * by the replica requesting our vote. Refuse to vote for this replica. */ serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): " - "slot %d epoch (%llu) > reqEpoch (%llu)", + "slot %d epoch (%llu) > reqConfigEpoch (%llu)", node->name, node->human_nodename, j, (unsigned long long)server.cluster->slots[j]->configEpoch, (unsigned long long)requestConfigEpoch); + + /* Send an UPDATE message to the replica. After receiving the UPDATE message, + * the replica will update the slots config so that it can initiate a failover + * again later. Otherwise the replica will never get votes if the primary is down. */ + serverLog(LL_VERBOSE, + "Node %.40s (%s) has old slots configuration, sending " + "an UPDATE message about %.40s (%s)", + node->name, node->human_nodename, server.cluster->slots[j]->name, + server.cluster->slots[j]->human_nodename); + clusterSendUpdate(node->link, server.cluster->slots[j]); return; } @@ -4667,8 +4723,8 @@ void clusterHandleReplicaFailover(void) { if (server.cluster->failover_auth_sent == 0) { server.cluster->currentEpoch++; server.cluster->failover_auth_epoch = server.cluster->currentEpoch; - serverLog(LL_NOTICE, "Starting a failover election for epoch %llu.", - (unsigned long long)server.cluster->currentEpoch); + serverLog(LL_NOTICE, "Starting a failover election for epoch %llu, node config epoch is %llu", + (unsigned long long)server.cluster->currentEpoch, (unsigned long long)nodeEpoch(myself)); clusterRequestFailoverAuth(); server.cluster->failover_auth_sent = 1; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); @@ -4832,6 +4888,27 @@ void clusterHandleReplicaMigration(int max_replicas) { * data loss due to the asynchronous primary-replica replication. * -------------------------------------------------------------------------- */ +void manualFailoverCanStart(void) { + serverAssert(server.cluster->mf_can_start == 0); + + if (server.cluster->failover_auth_time) { + /* There is another manual failover requested by the user. + * If we have an ongoing election, reset it because the user may initiate + * manual failover again when the previous manual failover timed out. + * Otherwise, if the previous election timed out (see auth_timeout) and + * before the next retry (see auth_retry_time), the new manual failover + * will pause the primary and replica can not do anything to advance the + * manual failover, and then the manual failover eventually times out. */ + server.cluster->failover_auth_time = 0; + serverLog(LL_WARNING, + "Failover election in progress for epoch %llu, but received a new manual failover. " + "Resetting the election.", + (unsigned long long)server.cluster->failover_auth_epoch); + } + + server.cluster->mf_can_start = 1; +} + /* Reset the manual failover state. This works for both primaries and replicas * as all the state about manual failover is cleared. * @@ -4872,7 +4949,7 @@ void clusterHandleManualFailover(void) { if (server.cluster->mf_primary_offset == replicationGetReplicaOffset()) { /* Our replication offset matches the primary replication offset * announced after clients were paused. We can start the failover. */ - server.cluster->mf_can_start = 1; + manualFailoverCanStart(); serverLog(LL_NOTICE, "All primary replication stream processed, " "manual failover can start."); clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); @@ -6063,7 +6140,7 @@ void removeChannelsInSlot(unsigned int slot) { /* Remove all the keys in the specified hash slot. * The number of removed items is returned. */ unsigned int delKeysInSlot(unsigned int hashslot) { - if (!kvstoreDictSize(server.db->keys, hashslot)) return 0; + if (!countKeysInSlot(hashslot)) return 0; /* We may lose a slot during the pause. We need to track this * state so that we don't assert in propagateNow(). */ @@ -6761,7 +6838,10 @@ int clusterCommandSpecial(client *c) { * primary to agree about the offset. We just failover taking over * it without coordination. */ serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client); - server.cluster->mf_can_start = 1; + manualFailoverCanStart(); + /* We can start a manual failover as soon as possible, setting a flag + * here so that we don't need to waiting for the cron to kick in. */ + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER); } else { serverLog(LL_NOTICE, "Manual failover user request accepted (user request from '%s').", client); clusterSendMFStart(myself->replicaof); diff --git a/src/config.c b/src/config.c index 2d4e703d95..8757c43f87 100644 --- a/src/config.c +++ b/src/config.c @@ -2445,6 +2445,8 @@ static int updatePort(const char **err) { listener->bindaddr_count = server.bindaddr_count; listener->port = server.port; listener->ct = connectionByType(CONN_TYPE_SOCKET); + clusterUpdateMyselfAnnouncedPorts(); + clearCachedClusterSlotsResponse(); if (changeListener(listener) == C_ERR) { *err = "Unable to listen on this port. Check server logs."; return 0; @@ -2691,6 +2693,8 @@ static int applyTLSPort(const char **err) { listener->bindaddr_count = server.bindaddr_count; listener->port = server.tls_port; listener->ct = connectionByType(CONN_TYPE_TLS); + clusterUpdateMyselfAnnouncedPorts(); + clearCachedClusterSlotsResponse(); if (changeListener(listener) == C_ERR) { *err = "Unable to listen on this port. Check server logs."; return 0; diff --git a/src/db.c b/src/db.c index 68eb140557..12a5abd08c 100644 --- a/src/db.c +++ b/src/db.c @@ -1088,7 +1088,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { * The exception to the above is ZSET, where we do allocate temporary * strings even when scanning a dict. */ if (o && (!ht || o->type == OBJ_ZSET)) { - listSetFreeMethod(keys, (void (*)(void *))sdsfree); + listSetFreeMethod(keys, sdsfreeVoid); } /* For main dictionary scan or data structure using hashtable. */ diff --git a/src/debug.c b/src/debug.c index a01ccf9730..5c8733641a 100644 --- a/src/debug.c +++ b/src/debug.c @@ -431,7 +431,7 @@ void debugCommand(client *c) { " Some fields of the default behavior may be time consuming to fetch,", " and `fast` can be passed to avoid fetching them.", "DROP-CLUSTER-PACKET-FILTER ", - " Drop all packets that match the filtered type. Set to -1 allow all packets.", + " Drop all packets that match the filtered type. Set to -1 allow all packets or -2 to drop all packets.", "CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>", " This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type.", " When set to 1, the cluster link is closed after dropping a packet based on the filter.", diff --git a/src/eval.c b/src/eval.c index 73d5e2fedc..5c37d63f83 100644 --- a/src/eval.c +++ b/src/eval.c @@ -205,7 +205,7 @@ void scriptingInit(int setup) { * and we need to free them respectively. */ lctx.lua_scripts = dictCreate(&shaScriptObjectDictType); lctx.lua_scripts_lru_list = listCreate(); - listSetFreeMethod(lctx.lua_scripts_lru_list, (void (*)(void *))sdsfree); + listSetFreeMethod(lctx.lua_scripts_lru_list, sdsfreeVoid); lctx.lua_scripts_mem = 0; luaRegisterServerAPI(lua); @@ -777,7 +777,7 @@ void ldbInit(void) { ldb.conn = NULL; ldb.active = 0; ldb.logs = listCreate(); - listSetFreeMethod(ldb.logs, (void (*)(void *))sdsfree); + listSetFreeMethod(ldb.logs, sdsfreeVoid); ldb.children = listCreate(); ldb.src = NULL; ldb.lines = 0; diff --git a/src/evict.c b/src/evict.c index 4b9f70eaa5..0a83af5f66 100644 --- a/src/evict.c +++ b/src/evict.c @@ -150,7 +150,7 @@ int evictionPoolPopulate(serverDb *db, kvstore *samplekvs, struct evictionPoolEn for (j = 0; j < count; j++) { unsigned long long idle; sds key; - robj *o; + robj *o = NULL; dictEntry *de; de = samples[j]; diff --git a/src/functions.c b/src/functions.c index 68fe404a6c..b72534a343 100644 --- a/src/functions.c +++ b/src/functions.c @@ -159,6 +159,11 @@ static void engineLibraryDispose(dict *d, void *obj) { engineLibraryFree(obj); } +/* Wrapper to free a library when used as a list free callback */ +static void engineLibraryListFree(void *obj) { + engineLibraryFree(obj); +} + /* Clear all the functions from the given library ctx */ void functionsLibCtxClear(functionsLibCtx *lib_ctx) { dictEmpty(lib_ctx->functions, NULL); @@ -338,7 +343,7 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l } else { if (!old_libraries_list) { old_libraries_list = listCreate(); - listSetFreeMethod(old_libraries_list, (void (*)(void *))engineLibraryFree); + listSetFreeMethod(old_libraries_list, engineLibraryListFree); } libraryUnlink(functions_lib_ctx_dst, old_li); listAddNodeTail(old_libraries_list, old_li); diff --git a/src/io_threads.c b/src/io_threads.c index 5b2230f635..c925b0f67c 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -265,8 +265,9 @@ static void createIOThread(int id) { pthread_mutex_init(&io_threads_mutex[id], NULL); IOJobQueue_init(&io_jobs[id], IO_JOB_QUEUE_SIZE); pthread_mutex_lock(&io_threads_mutex[id]); /* Thread will be stopped. */ - if (pthread_create(&tid, NULL, IOThreadMain, (void *)(long)id) != 0) { - serverLog(LL_WARNING, "Fatal: Can't initialize IO thread, pthread_create failed with: %s", strerror(errno)); + int err = pthread_create(&tid, NULL, IOThreadMain, (void *)(long)id); + if (err) { + serverLog(LL_WARNING, "Fatal: Can't initialize IO thread, pthread_create failed with: %s", strerror(err)); exit(1); } io_threads[id] = tid; diff --git a/src/kvstore.c b/src/kvstore.c index d476b87baf..0012b5053b 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -425,10 +425,11 @@ int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandS if (newsize == 0) return 1; for (int i = 0; i < kvs->num_dicts; i++) { if (skip_cb && skip_cb(i)) continue; - /* If the dictionary doesn't exist, create it */ - dict *d = createDictIfNeeded(kvs, i); - int result = try_expand ? dictTryExpand(d, newsize) : dictExpand(d, newsize); - if (try_expand && result == DICT_ERR) return 0; + if (try_expand) { + if (kvstoreDictTryExpand(kvs, i, newsize) == DICT_ERR) return 0; + } else { + kvstoreDictExpand(kvs, i, newsize); + } } return 1; @@ -673,6 +674,12 @@ unsigned long kvstoreDictSize(kvstore *kvs, int didx) { return dictSize(d); } +unsigned long kvstoreDictBuckets(kvstore *kvs, int didx) { + dict *d = kvstoreGetDict(kvs, didx); + if (!d) return 0; + return dictBuckets(d); +} + kvstoreDictIterator *kvstoreGetDictIterator(kvstore *kvs, int didx) { kvstoreDictIterator *kvs_di = zmalloc(sizeof(*kvs_di)); kvs_di->kvs = kvs; @@ -729,11 +736,17 @@ unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, uns } int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size) { - dict *d = kvstoreGetDict(kvs, didx); - if (!d) return DICT_ERR; + if (size == 0) return DICT_ERR; + dict *d = createDictIfNeeded(kvs, didx); return dictExpand(d, size); } +int kvstoreDictTryExpand(kvstore *kvs, int didx, unsigned long size) { + if (size == 0) return DICT_ERR; + dict *d = createDictIfNeeded(kvs, didx); + return dictTryExpand(d, size); +} + unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, diff --git a/src/kvstore.h b/src/kvstore.h index 81a0d9a96e..7bdfb20aaf 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -56,6 +56,7 @@ unsigned long kvstoreDictRehashingCount(kvstore *kvs); /* Specific dict access by dict-index */ unsigned long kvstoreDictSize(kvstore *kvs, int didx); +unsigned long kvstoreDictBuckets(kvstore *kvs, int didx); kvstoreDictIterator *kvstoreGetDictIterator(kvstore *kvs, int didx); kvstoreDictIterator *kvstoreGetDictSafeIterator(kvstore *kvs, int didx); void kvstoreReleaseDictIterator(kvstoreDictIterator *kvs_id); @@ -64,6 +65,7 @@ dictEntry *kvstoreDictGetRandomKey(kvstore *kvs, int didx); dictEntry *kvstoreDictGetFairRandomKey(kvstore *kvs, int didx); unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, unsigned int count); int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size); +int kvstoreDictTryExpand(kvstore *kvs, int didx, unsigned long size); unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, diff --git a/src/listpack.c b/src/listpack.c index 4c51b285d2..1891310b0e 100644 --- a/src/listpack.c +++ b/src/listpack.c @@ -250,6 +250,12 @@ void lpFree(unsigned char *lp) { lp_free(lp); } +/* Same as lpFree, but useful for when you are passing the listpack + * into a generic free function that expects (void *) */ +void lpFreeVoid(void *lp) { + lp_free((unsigned char *)lp); +} + /* Shrink the memory to fit. */ unsigned char *lpShrinkToFit(unsigned char *lp) { size_t size = lpGetTotalBytes(lp); diff --git a/src/listpack.h b/src/listpack.h index aa7636143f..b143797261 100644 --- a/src/listpack.h +++ b/src/listpack.h @@ -56,6 +56,7 @@ typedef struct { unsigned char *lpNew(size_t capacity); void lpFree(unsigned char *lp); +void lpFreeVoid(void *lp); unsigned char *lpShrinkToFit(unsigned char *lp); unsigned char * lpInsertString(unsigned char *lp, unsigned char *s, uint32_t slen, unsigned char *p, int where, unsigned char **newp); diff --git a/src/module.c b/src/module.c index 93d11b52d4..bdd1b8a868 100644 --- a/src/module.c +++ b/src/module.c @@ -10380,7 +10380,7 @@ ValkeyModuleServerInfoData *VM_GetServerInfo(ValkeyModuleCtx *ctx, const char *s * context instead of passing NULL. */ void VM_FreeServerInfo(ValkeyModuleCtx *ctx, ValkeyModuleServerInfoData *data) { if (ctx != NULL) autoMemoryFreed(ctx, VALKEYMODULE_AM_INFO, data); - raxFreeWithCallback(data->rax, (void (*)(void *))sdsfree); + raxFreeWithCallback(data->rax, sdsfreeVoid); zfree(data); } diff --git a/src/networking.c b/src/networking.c index ab2df89a59..2d748c812b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -101,7 +101,8 @@ void linkClient(client *c) { static void clientSetDefaultAuth(client *c) { /* If the default user does not require authentication, the user is * directly authenticated. */ - clientSetUser(c, DefaultUser, (DefaultUser->flags & USER_FLAG_NOPASS) && !(DefaultUser->flags & USER_FLAG_DISABLED)); + clientSetUser(c, DefaultUser, + (DefaultUser->flags & USER_FLAG_NOPASS) && !(DefaultUser->flags & USER_FLAG_DISABLED)); } /* Attach the user u to this client. @@ -111,8 +112,7 @@ static void clientSetDefaultAuth(client *c) { void clientSetUser(client *c, user *u, int authenticated) { c->user = u; c->flag.authenticated = authenticated; - if (authenticated) - c->flag.ever_authenticated = authenticated; + if (authenticated) c->flag.ever_authenticated = authenticated; } static int clientEverAuthenticated(client *c) { @@ -565,7 +565,7 @@ void afterErrorReply(client *c, const char *s, size_t len, int flags) { if (c->flag.module) { if (!c->deferred_reply_errors) { c->deferred_reply_errors = listCreate(); - listSetFreeMethod(c->deferred_reply_errors, (void (*)(void *))sdsfree); + listSetFreeMethod(c->deferred_reply_errors, sdsfreeVoid); } listAddNodeTail(c->deferred_reply_errors, sdsnewlen(s, len)); return; @@ -3775,11 +3775,12 @@ NULL } if (getLongLongFromObjectOrReply(c, c->argv[2], &id, NULL) != C_OK) return; struct client *target = lookupClientByID(id); - /* Note that we never try to unblock a client blocked on a module command, which + /* Note that we never try to unblock a client blocked on a module command, + * or a client blocked by CLIENT PAUSE or some other blocking type which * doesn't have a timeout callback (even in the case of UNBLOCK ERROR). * The reason is that we assume that if a command doesn't expect to be timedout, * it also doesn't expect to be unblocked by CLIENT UNBLOCK */ - if (target && target->flag.blocked && moduleBlockedClientMayTimeout(target)) { + if (target && target->flag.blocked && blockedClientMayTimeout(target)) { if (unblock_error) unblockClientOnError(target, "-UNBLOCKED client unblocked via CLIENT UNBLOCK"); else @@ -4334,7 +4335,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { /* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire * rax */ if (c->client_tracking_prefixes) - mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode *)); + mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) + sizeof(raxNode *)); return mem; } @@ -4394,8 +4395,7 @@ int checkClientOutputBufferLimits(client *c) { /* For unauthenticated clients which were also never authenticated before the output buffer is limited to prevent * them from abusing it by not reading the replies */ - if (used_mem > REPLY_BUFFER_SIZE_UNAUTHENTICATED_CLIENT && authRequired(c) && !clientEverAuthenticated(c)) - return 1; + if (used_mem > REPLY_BUFFER_SIZE_UNAUTHENTICATED_CLIENT && authRequired(c) && !clientEverAuthenticated(c)) return 1; class = getClientType(c); /* For the purpose of output buffer limiting, primaries are handled diff --git a/src/pubsub.c b/src/pubsub.c index 5b037b5721..ad3429f8b4 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -307,7 +307,9 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty retval = 1; /* Remove the client from the channel -> clients list hash table */ if (server.cluster_enabled && type.shard) { - slot = getKeySlot(channel->ptr); + /* Using keyHashSlot directly because we can't rely on the current_client's slot via getKeySlot() here, + * as it might differ from the channel's slot. */ + slot = keyHashSlot(channel->ptr, (int)sdslen(channel->ptr)); } de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel); serverAssertWithInfo(c, NULL, de != NULL); diff --git a/src/rdb.c b/src/rdb.c index cc131cfa61..e5d0b7fa67 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1267,6 +1267,10 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { * to allow loading this RDB if the module is not present. */ sdsfree(io.pre_flush_buffer); io.pre_flush_buffer = NULL; + if (io.ctx) { + moduleFreeContext(io.ctx); + zfree(io.ctx); + } return 0; } } else { @@ -3153,8 +3157,8 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if (server.cluster_enabled) { /* In cluster mode we resize individual slot specific dictionaries based on the number of keys that * slot holds. */ - kvstoreDictExpand(db->keys, slot_id, slot_size); - kvstoreDictExpand(db->expires, slot_id, expires_slot_size); + if (slot_size) kvstoreDictExpand(db->keys, slot_id, slot_size); + if (expires_slot_size) kvstoreDictExpand(db->expires, slot_id, expires_slot_size); should_expand_db = 0; } } else { diff --git a/src/replication.c b/src/replication.c index 4f87aa7df5..cca1849dc7 100644 --- a/src/replication.c +++ b/src/replication.c @@ -283,7 +283,7 @@ void removeReplicaFromPsyncWait(client *replica_main_client) { void resetReplicationBuffer(void) { server.repl_buffer_mem = 0; server.repl_buffer_blocks = listCreate(); - listSetFreeMethod(server.repl_buffer_blocks, (void (*)(void *))zfree); + listSetFreeMethod(server.repl_buffer_blocks, zfree); } int canFeedReplicaReplBuffer(client *replica) { diff --git a/src/script_lua.c b/src/script_lua.c index 5fbdf743f9..e6df679196 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -1616,6 +1616,11 @@ void luaExtractErrorInformation(lua_State *lua, errorInfo *err_info) { err_info->ignore_err_stats_update = lua_toboolean(lua, -1); } lua_pop(lua, 1); + + if (err_info->msg == NULL) { + /* Ensure we never return a NULL msg. */ + err_info->msg = sdsnew("ERR unknown error"); + } } void luaCallFunction(scriptRunCtx *run_ctx, diff --git a/src/sds.c b/src/sds.c index e14f4bd0bd..32b13b2138 100644 --- a/src/sds.c +++ b/src/sds.c @@ -216,6 +216,13 @@ void sdsfree(sds s) { s_free_with_size(sdsAllocPtr(s), sdsAllocSize(s)); } +/* This variant of sdsfree() gets its argument as void, and is useful + * as free method in data structures that expect a 'void free_object(void*)' + * prototype for the free method. */ +void sdsfreeVoid(void *s) { + sdsfree(s); +} + /* Set the sds string length to the length as obtained with strlen(), so * considering as content only up to the first null term character. * diff --git a/src/sds.h b/src/sds.h index e9c4a95f9a..e1b8531955 100644 --- a/src/sds.h +++ b/src/sds.h @@ -183,6 +183,7 @@ sds sdsempty(void); sds sdsdup(const sds s); size_t sdscopytobuffer(unsigned char *buf, size_t buf_len, sds s, uint8_t *hdr_size); void sdsfree(sds s); +void sdsfreeVoid(void *s); sds sdsgrowzero(sds s, size_t len); sds sdscatlen(sds s, const void *t, size_t len); sds sdscat(sds s, const char *t); diff --git a/src/server.c b/src/server.c index eb9cf04e82..d697bdfba2 100644 --- a/src/server.c +++ b/src/server.c @@ -3498,7 +3498,7 @@ void call(client *c, int flags) { * and a client which is reprocessing command again (after being unblocked). * Blocked clients can be blocked in different places and not always it means the call() function has been * called. For example this is required for avoiding double logging to monitors.*/ - int reprocessing_command = flags & CMD_CALL_REPROCESSING; + int reprocessing_command = c->flag.reexecuting_command ? 1 : 0; /* Initialization: clear the flags that must be set by the command on * demand, and initialize the array for additional commands propagation. */ @@ -3527,19 +3527,11 @@ void call(client *c, int flags) { * re-processing and unblock the client.*/ c->flag.executing_command = 1; - /* Setting the CLIENT_REPROCESSING_COMMAND flag so that during the actual - * processing of the command proc, the client is aware that it is being - * re-processed. */ - if (reprocessing_command) c->flag.reprocessing_command = 1; - monotime monotonic_start = 0; if (monotonicGetType() == MONOTONIC_CLOCK_HW) monotonic_start = getMonotonicUs(); c->cmd->proc(c); - /* Clear the CLIENT_REPROCESSING_COMMAND flag after the proc is executed. */ - if (reprocessing_command) c->flag.reprocessing_command = 0; - exitExecutionUnit(); /* In case client is blocked after trying to execute the command, @@ -4162,7 +4154,6 @@ int processCommand(client *c) { addReply(c, shared.queued); } else { int flags = CMD_CALL_FULL; - if (client_reprocessing_command) flags |= CMD_CALL_REPROCESSING; call(c, flags); if (listLength(server.ready_keys) && !isInsideYieldingLongCommand()) handleClientsBlockedOnKeys(); } diff --git a/src/server.h b/src/server.h index f2fce0d328..e0b1ffdf19 100644 --- a/src/server.h +++ b/src/server.h @@ -565,8 +565,7 @@ typedef enum { #define CMD_CALL_NONE 0 #define CMD_CALL_PROPAGATE_AOF (1 << 0) #define CMD_CALL_PROPAGATE_REPL (1 << 1) -#define CMD_CALL_REPROCESSING (1 << 2) -#define CMD_CALL_FROM_MODULE (1 << 3) /* From RM_Call */ +#define CMD_CALL_FROM_MODULE (1 << 2) /* From RM_Call */ #define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF | CMD_CALL_PROPAGATE_REPL) #define CMD_CALL_FULL (CMD_CALL_PROPAGATE) @@ -1222,10 +1221,10 @@ typedef struct ClientFlags { from the Module. */ uint64_t module_prevent_aof_prop : 1; /* Module client do not want to propagate to AOF */ uint64_t module_prevent_repl_prop : 1; /* Module client do not want to propagate to replica */ - uint64_t reprocessing_command : 1; /* The client is re-processing the command. */ + uint64_t reexecuting_command : 1; /* The client is re-executing the command. */ uint64_t replication_done : 1; /* Indicate that replication has been done on the client */ uint64_t authenticated : 1; /* Indicate a client has successfully authenticated */ - uint64_t ever_authenticated : 1; /* Indicate a client was ever successfully authenticated during it's lifetime */ + uint64_t ever_authenticated : 1; /* Indicate a client was ever successfully authenticated during it's lifetime */ uint64_t protected_rdb_channel : 1; /* Dual channel replication sync: Protects the RDB client from premature \ * release during full sync. This flag is used to ensure that the RDB client, which \ @@ -1773,11 +1772,11 @@ struct valkeyServer { int events_per_io_thread; /* Number of events on the event loop to trigger IO threads activation. */ int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ - int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ - int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ - int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */ - int enable_debug_assert; /* Enable debug asserts */ - int debug_client_enforce_reply_list; /* Force client to always use the reply list */ + int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ + int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ + int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */ + int enable_debug_assert; /* Enable debug asserts */ + int debug_client_enforce_reply_list; /* Force client to always use the reply list */ /* RDB / AOF loading information */ volatile sig_atomic_t loading; /* We are loading data from disk if true */ volatile sig_atomic_t async_loading; /* We are loading data without blocking the db being served */ @@ -3673,6 +3672,7 @@ void unblockClient(client *c, int queue_for_reprocessing); void unblockClientOnTimeout(client *c); void unblockClientOnError(client *c, const char *err_str); void queueClientForReprocessing(client *c); +int blockedClientMayTimeout(client *c); void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); void disconnectAllBlockedClients(void); diff --git a/src/t_stream.c b/src/t_stream.c index 801a5e4323..63fa0a361c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -54,6 +54,7 @@ #define STREAM_LISTPACK_MAX_SIZE (1 << 30) void streamFreeCG(streamCG *cg); +void streamFreeCGVoid(void *cg); void streamFreeNACK(streamNACK *na); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, @@ -86,8 +87,8 @@ stream *streamNew(void) { /* Free a stream, including the listpacks stored inside the radix tree. */ void freeStream(stream *s) { - raxFreeWithCallback(s->rax, (void (*)(void *))lpFree); - if (s->cgroups) raxFreeWithCallback(s->cgroups, (void (*)(void *))streamFreeCG); + raxFreeWithCallback(s->rax, lpFreeVoid); + if (s->cgroups) raxFreeWithCallback(s->cgroups, streamFreeCGVoid); zfree(s); } @@ -2455,6 +2456,11 @@ void streamFreeConsumer(streamConsumer *sc) { zfree(sc); } +/* Used for generic free functions. */ +static void streamFreeConsumerVoid(void *sc) { + streamFreeConsumer((streamConsumer *)sc); +} + /* Create a new consumer group in the context of the stream 's', having the * specified name, last server ID and reads counter. If a consumer group with * the same name already exists NULL is returned, otherwise the pointer to the @@ -2474,11 +2480,16 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, lo /* Free a consumer group and all its associated data. */ void streamFreeCG(streamCG *cg) { - raxFreeWithCallback(cg->pel, (void (*)(void *))streamFreeNACK); - raxFreeWithCallback(cg->consumers, (void (*)(void *))streamFreeConsumer); + raxFreeWithCallback(cg->pel, zfree); + raxFreeWithCallback(cg->consumers, streamFreeConsumerVoid); zfree(cg); } +/* Used for generic free functions. */ +void streamFreeCGVoid(void *cg) { + streamFreeCG((streamCG *)cg); +} + /* Lookup the consumer group in the specified stream and returns its * pointer, otherwise if there is no such group, NULL is returned. */ streamCG *streamLookupCG(stream *s, sds groupname) { diff --git a/src/tls.c b/src/tls.c index a1f101d9c5..f5ceeadfaa 100644 --- a/src/tls.c +++ b/src/tls.c @@ -456,6 +456,14 @@ typedef struct tls_connection { size_t last_failed_write_data_len; } tls_connection; +/* Fetch the latest OpenSSL error and store it in the connection */ +static void updateTLSError(tls_connection *conn) { + conn->c.last_errno = 0; + if (conn->ssl_error) zfree(conn->ssl_error); + conn->ssl_error = zmalloc(512); + ERR_error_string_n(ERR_get_error(), conn->ssl_error, 512); +} + static connection *createTLSConnection(int client_side) { SSL_CTX *ctx = valkey_tls_ctx; if (client_side && valkey_tls_client_ctx) ctx = valkey_tls_client_ctx; @@ -464,6 +472,10 @@ static connection *createTLSConnection(int client_side) { conn->c.fd = -1; conn->c.iovcnt = IOV_MAX; conn->ssl = SSL_new(ctx); + if (!conn->ssl) { + updateTLSError(conn); + conn->c.state = CONN_STATE_ERROR; + } return (connection *)conn; } @@ -471,14 +483,6 @@ static connection *connCreateTLS(void) { return createTLSConnection(1); } -/* Fetch the latest OpenSSL error and store it in the connection */ -static void updateTLSError(tls_connection *conn) { - conn->c.last_errno = 0; - if (conn->ssl_error) zfree(conn->ssl_error); - conn->ssl_error = zmalloc(512); - ERR_error_string_n(ERR_get_error(), conn->ssl_error, 512); -} - /* Create a new TLS connection that is already associated with * an accepted underlying file descriptor. * @@ -492,14 +496,9 @@ static connection *connCreateAcceptedTLS(int fd, void *priv) { int require_auth = *(int *)priv; tls_connection *conn = (tls_connection *)createTLSConnection(0); conn->c.fd = fd; + if (conn->c.state == CONN_STATE_ERROR) return (connection *)conn; conn->c.state = CONN_STATE_ACCEPTING; - if (!conn->ssl) { - updateTLSError(conn); - conn->c.state = CONN_STATE_ERROR; - return (connection *)conn; - } - switch (require_auth) { case TLS_CLIENT_AUTH_NO: SSL_set_verify(conn->ssl, SSL_VERIFY_NONE, NULL); break; case TLS_CLIENT_AUTH_OPTIONAL: SSL_set_verify(conn->ssl, SSL_VERIFY_PEER, NULL); break; diff --git a/src/unit/test_files.h b/src/unit/test_files.h index 7a63ba5f2e..4e617276d5 100644 --- a/src/unit/test_files.h +++ b/src/unit/test_files.h @@ -33,6 +33,7 @@ int test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict(int argc, char **argv, in int test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict(int argc, char **argv, int flags); int test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict(int argc, char **argv, int flags); int test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict(int argc, char **argv, int flags); +int test_kvstoreDictExpand(int argc, char **argv, int flags); int test_listpackCreateIntList(int argc, char **argv, int flags); int test_listpackCreateList(int argc, char **argv, int flags); int test_listpackLpPrepend(int argc, char **argv, int flags); @@ -144,7 +145,7 @@ unitTest __test_crc64combine_c[] = {{"test_crc64combine", test_crc64combine}, {N unitTest __test_dict_c[] = {{"test_dictCreate", test_dictCreate}, {"test_dictAdd16Keys", test_dictAdd16Keys}, {"test_dictDisableResize", test_dictDisableResize}, {"test_dictAddOneKeyTriggerResize", test_dictAddOneKeyTriggerResize}, {"test_dictDeleteKeys", test_dictDeleteKeys}, {"test_dictDeleteOneKeyTriggerResize", test_dictDeleteOneKeyTriggerResize}, {"test_dictEmptyDirAdd128Keys", test_dictEmptyDirAdd128Keys}, {"test_dictDisableResizeReduceTo3", test_dictDisableResizeReduceTo3}, {"test_dictDeleteOneKeyTriggerResizeAgain", test_dictDeleteOneKeyTriggerResizeAgain}, {"test_dictBenchmark", test_dictBenchmark}, {NULL, NULL}}; unitTest __test_endianconv_c[] = {{"test_endianconv", test_endianconv}, {NULL, NULL}}; unitTest __test_intset_c[] = {{"test_intsetValueEncodings", test_intsetValueEncodings}, {"test_intsetBasicAdding", test_intsetBasicAdding}, {"test_intsetLargeNumberRandomAdd", test_intsetLargeNumberRandomAdd}, {"test_intsetUpgradeFromint16Toint32", test_intsetUpgradeFromint16Toint32}, {"test_intsetUpgradeFromint16Toint64", test_intsetUpgradeFromint16Toint64}, {"test_intsetUpgradeFromint32Toint64", test_intsetUpgradeFromint32Toint64}, {"test_intsetStressLookups", test_intsetStressLookups}, {"test_intsetStressAddDelete", test_intsetStressAddDelete}, {NULL, NULL}}; -unitTest __test_kvstore_c[] = {{"test_kvstoreAdd16Keys", test_kvstoreAdd16Keys}, {"test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict}, {NULL, NULL}}; +unitTest __test_kvstore_c[] = {{"test_kvstoreAdd16Keys", test_kvstoreAdd16Keys}, {"test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict}, {"test_kvstoreDictExpand", test_kvstoreDictExpand}, {NULL, NULL}}; unitTest __test_listpack_c[] = {{"test_listpackCreateIntList", test_listpackCreateIntList}, {"test_listpackCreateList", test_listpackCreateList}, {"test_listpackLpPrepend", test_listpackLpPrepend}, {"test_listpackLpPrependInteger", test_listpackLpPrependInteger}, {"test_listpackGetELementAtIndex", test_listpackGetELementAtIndex}, {"test_listpackPop", test_listpackPop}, {"test_listpackGetELementAtIndex2", test_listpackGetELementAtIndex2}, {"test_listpackIterate0toEnd", test_listpackIterate0toEnd}, {"test_listpackIterate1toEnd", test_listpackIterate1toEnd}, {"test_listpackIterate2toEnd", test_listpackIterate2toEnd}, {"test_listpackIterateBackToFront", test_listpackIterateBackToFront}, {"test_listpackIterateBackToFrontWithDelete", test_listpackIterateBackToFrontWithDelete}, {"test_listpackDeleteWhenNumIsMinusOne", test_listpackDeleteWhenNumIsMinusOne}, {"test_listpackDeleteWithNegativeIndex", test_listpackDeleteWithNegativeIndex}, {"test_listpackDeleteInclusiveRange0_0", test_listpackDeleteInclusiveRange0_0}, {"test_listpackDeleteInclusiveRange0_1", test_listpackDeleteInclusiveRange0_1}, {"test_listpackDeleteInclusiveRange1_2", test_listpackDeleteInclusiveRange1_2}, {"test_listpackDeleteWitStartIndexOutOfRange", test_listpackDeleteWitStartIndexOutOfRange}, {"test_listpackDeleteWitNumOverflow", test_listpackDeleteWitNumOverflow}, {"test_listpackBatchDelete", test_listpackBatchDelete}, {"test_listpackDeleteFooWhileIterating", test_listpackDeleteFooWhileIterating}, {"test_listpackReplaceWithSameSize", test_listpackReplaceWithSameSize}, {"test_listpackReplaceWithDifferentSize", test_listpackReplaceWithDifferentSize}, {"test_listpackRegressionGt255Bytes", test_listpackRegressionGt255Bytes}, {"test_listpackCreateLongListAndCheckIndices", test_listpackCreateLongListAndCheckIndices}, {"test_listpackCompareStrsWithLpEntries", test_listpackCompareStrsWithLpEntries}, {"test_listpackLpMergeEmptyLps", test_listpackLpMergeEmptyLps}, {"test_listpackLpMergeLp1Larger", test_listpackLpMergeLp1Larger}, {"test_listpackLpMergeLp2Larger", test_listpackLpMergeLp2Larger}, {"test_listpackLpNextRandom", test_listpackLpNextRandom}, {"test_listpackLpNextRandomCC", test_listpackLpNextRandomCC}, {"test_listpackRandomPairWithOneElement", test_listpackRandomPairWithOneElement}, {"test_listpackRandomPairWithManyElements", test_listpackRandomPairWithManyElements}, {"test_listpackRandomPairsWithOneElement", test_listpackRandomPairsWithOneElement}, {"test_listpackRandomPairsWithManyElements", test_listpackRandomPairsWithManyElements}, {"test_listpackRandomPairsUniqueWithOneElement", test_listpackRandomPairsUniqueWithOneElement}, {"test_listpackRandomPairsUniqueWithManyElements", test_listpackRandomPairsUniqueWithManyElements}, {"test_listpackPushVariousEncodings", test_listpackPushVariousEncodings}, {"test_listpackLpFind", test_listpackLpFind}, {"test_listpackLpValidateIntegrity", test_listpackLpValidateIntegrity}, {"test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN", test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN}, {"test_listpackStressWithRandom", test_listpackStressWithRandom}, {"test_listpackSTressWithVariableSize", test_listpackSTressWithVariableSize}, {"test_listpackBenchmarkInit", test_listpackBenchmarkInit}, {"test_listpackBenchmarkLpAppend", test_listpackBenchmarkLpAppend}, {"test_listpackBenchmarkLpFindString", test_listpackBenchmarkLpFindString}, {"test_listpackBenchmarkLpFindNumber", test_listpackBenchmarkLpFindNumber}, {"test_listpackBenchmarkLpSeek", test_listpackBenchmarkLpSeek}, {"test_listpackBenchmarkLpValidateIntegrity", test_listpackBenchmarkLpValidateIntegrity}, {"test_listpackBenchmarkLpCompareWithString", test_listpackBenchmarkLpCompareWithString}, {"test_listpackBenchmarkLpCompareWithNumber", test_listpackBenchmarkLpCompareWithNumber}, {"test_listpackBenchmarkFree", test_listpackBenchmarkFree}, {NULL, NULL}}; unitTest __test_rax_c[] = {{"test_raxRecompressHugeKey", test_raxRecompressHugeKey}, {NULL, NULL}}; unitTest __test_sds_c[] = {{"test_sds", test_sds}, {"test_typesAndAllocSize", test_typesAndAllocSize}, {"test_sdsHeaderSizes", test_sdsHeaderSizes}, {NULL, NULL}}; diff --git a/src/unit/test_kvstore.c b/src/unit/test_kvstore.c index b3eff7d132..5832bf2c7f 100644 --- a/src/unit/test_kvstore.c +++ b/src/unit/test_kvstore.c @@ -203,3 +203,20 @@ int test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict(int argc, char **argv, kvstoreRelease(kvs2); return 0; } + +int test_kvstoreDictExpand(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + kvstore *kvs = kvstoreCreate(&KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS); + + TEST_ASSERT(kvstoreGetDict(kvs, 0) == NULL); + TEST_ASSERT(kvstoreDictExpand(kvs, 0, 10000) == DICT_OK); + TEST_ASSERT(kvstoreGetDict(kvs, 0) != NULL); + TEST_ASSERT(kvstoreBuckets(kvs) > 0); + TEST_ASSERT(kvstoreBuckets(kvs) == kvstoreDictBuckets(kvs, 0)); + + kvstoreRelease(kvs); + return 0; +} diff --git a/src/unit/test_listpack.c b/src/unit/test_listpack.c index 4838fc8952..0c71da18db 100644 --- a/src/unit/test_listpack.c +++ b/src/unit/test_listpack.c @@ -1184,7 +1184,7 @@ int test_listpackStressWithRandom(int argc, char **argv, int flags) { for (i = 0; i < iteration; i++) { lp = lpNew(0); ref = listCreate(); - listSetFreeMethod(ref, (void (*)(void *))sdsfree); + listSetFreeMethod(ref, sdsfreeVoid); len = rand() % 256; /* Create lists */ diff --git a/src/unit/test_ziplist.c b/src/unit/test_ziplist.c index d2f7ebe69c..58687d81fc 100644 --- a/src/unit/test_ziplist.c +++ b/src/unit/test_ziplist.c @@ -645,7 +645,7 @@ int test_ziplistStressWithRandomPayloadsOfDifferentEncoding(int argc, char **arg for (i = 0; i < iteration; i++) { zl = ziplistNew(); ref = listCreate(); - listSetFreeMethod(ref, (void (*)(void *))sdsfree); + listSetFreeMethod(ref, sdsfreeVoid); len = rand() % 256; /* Create lists */ diff --git a/src/valkey-check-aof.c b/src/valkey-check-aof.c index bc71d366d5..c43c30b75b 100644 --- a/src/valkey-check-aof.c +++ b/src/valkey-check-aof.c @@ -556,6 +556,12 @@ int redis_check_aof_main(int argc, char **argv) { goto invalid_args; } + /* Check if filepath is longer than PATH_MAX */ + if (strnlen(filepath, PATH_MAX + 1) > PATH_MAX) { + printf("Error: filepath is too long (exceeds PATH_MAX)\n"); + goto invalid_args; + } + /* In the glibc implementation dirname may modify their argument. */ memcpy(temp_filepath, filepath, strlen(filepath) + 1); dirpath = dirname(temp_filepath); diff --git a/src/version.h b/src/version.h index fd2a474ab1..e0180c6ed0 100644 --- a/src/version.h +++ b/src/version.h @@ -4,8 +4,8 @@ * similar. */ #define SERVER_NAME "valkey" #define SERVER_TITLE "Valkey" -#define VALKEY_VERSION "8.0.4" -#define VALKEY_VERSION_NUM 0x00080004 +#define VALKEY_VERSION "8.0.5" +#define VALKEY_VERSION_NUM 0x00080005 /* Redis OSS compatibility version, should never * exceed 7.2.x. */ diff --git a/tests/assets/divergent-shard-1.conf b/tests/assets/divergent-shard-1.conf new file mode 100644 index 0000000000..6f347d0470 --- /dev/null +++ b/tests/assets/divergent-shard-1.conf @@ -0,0 +1,3 @@ +43ee1cacd6948ee96bb367eb8795e62e8d153f05 127.0.0.1:0@6379,,tls-port=0,shard-id=f91532eb722943e035f34292cf586f3f750d65bd myself,master - 0 1749488968682 13 connected 0-16383 +8d89f4d4e7c57a2819277732f86213241c3ec0d3 127.0.0.1:0@6380,,tls-port=0,shard-id=a91532eb722943e035f34292cf586f3f750d65bd slave 43ee1cacd6948ee96bb367eb8795e62e8d153f05 0 1749488968682 13 connected +vars currentEpoch 13 lastVoteEpoch 9 \ No newline at end of file diff --git a/tests/assets/divergent-shard-2.conf b/tests/assets/divergent-shard-2.conf new file mode 100644 index 0000000000..3f7d75c947 --- /dev/null +++ b/tests/assets/divergent-shard-2.conf @@ -0,0 +1,3 @@ +8d89f4d4e7c57a2819277732f86213241c3ec0d3 127.0.0.1:0@6380,,tls-port=0,shard-id=a91532eb722943e035f34292cf586f3f750d65bd slave 43ee1cacd6948ee96bb367eb8795e62e8d153f05 0 1749488968682 13 connected +43ee1cacd6948ee96bb367eb8795e62e8d153f05 127.0.0.1:0@6379,,tls-port=0,shard-id=f91532eb722943e035f34292cf586f3f750d65bd myself,master - 0 1749488968682 13 connected 0-16383 +vars currentEpoch 13 lastVoteEpoch 9 \ No newline at end of file diff --git a/tests/assets/divergent-shard-3.conf b/tests/assets/divergent-shard-3.conf new file mode 100644 index 0000000000..363837f436 --- /dev/null +++ b/tests/assets/divergent-shard-3.conf @@ -0,0 +1,3 @@ +43ee1cacd6948ee96bb367eb8795e62e8d153f05 127.0.0.1:0@6379,,tls-port=0,shard-id=f91532eb722943e035f34292cf586f3f750d65bd master - 0 1749488968682 13 connected 0-16383 +8d89f4d4e7c57a2819277732f86213241c3ec0d3 127.0.0.1:0@6380,,tls-port=0,shard-id=a91532eb722943e035f34292cf586f3f750d65bd myself,slave 43ee1cacd6948ee96bb367eb8795e62e8d153f05 0 1749488968682 13 connected +vars currentEpoch 13 lastVoteEpoch 9 \ No newline at end of file diff --git a/tests/assets/divergent-shard-4.conf b/tests/assets/divergent-shard-4.conf new file mode 100644 index 0000000000..5fcd97bfad --- /dev/null +++ b/tests/assets/divergent-shard-4.conf @@ -0,0 +1,3 @@ +8d89f4d4e7c57a2819277732f86213241c3ec0d3 127.0.0.1:0@6380,,tls-port=0,shard-id=a91532eb722943e035f34292cf586f3f750d65bd myself,slave 43ee1cacd6948ee96bb367eb8795e62e8d153f05 0 1749488968682 13 connected +43ee1cacd6948ee96bb367eb8795e62e8d153f05 127.0.0.1:0@6379,,tls-port=0,shard-id=f91532eb722943e035f34292cf586f3f750d65bd master - 0 1749488968682 13 connected 0-16383 +vars currentEpoch 13 lastVoteEpoch 9 \ No newline at end of file diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index 33c7c12d4b..a7f1fe21eb 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -689,9 +689,10 @@ tags {"aof cluster external:skip"} { append_to_manifest "file appendonly.aof.1.incr.aof seq 1 type i\n" } - start_server_aof [list dir $server_path cluster-enabled yes] { + start_server_aof [list dir $server_path cluster-enabled yes cluster-port [find_available_port $::baseport $::portcount]] { assert_equal [r ping] {PONG} } + clean_aof_persistence $aof_dirpath } } diff --git a/tests/integration/valkey-cli.tcl b/tests/integration/valkey-cli.tcl index 6344215a25..2d51677ab9 100644 --- a/tests/integration/valkey-cli.tcl +++ b/tests/integration/valkey-cli.tcl @@ -6,7 +6,7 @@ if {$::singledb} { set ::dbnum 9 } -start_server {tags {"cli"}} { +start_server {tags {"cli logreqres:skip"}} { proc open_cli {{opts ""} {infile ""}} { if { $opts == "" } { set opts "-n $::dbnum" diff --git a/tests/support/cluster.tcl b/tests/support/cluster.tcl index f9d4792d7b..b4289948e3 100644 --- a/tests/support/cluster.tcl +++ b/tests/support/cluster.tcl @@ -230,7 +230,8 @@ proc ::valkey_cluster::__dispatch__ {id method args} { if {[catch {$link $method {*}$args} e]} { if {$link eq {} || \ [string range $e 0 4] eq {MOVED} || \ - [string range $e 0 2] eq {I/O} \ + [string range $e 0 2] eq {I/O} || \ + [string range $e 0 10] eq {CLUSTERDOWN} \ } { # MOVED redirection. ::valkey_cluster::__method__refresh_nodes_map $id diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index 3f31fd5a3b..871f3e6404 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -209,6 +209,7 @@ proc cluster_allocate_replicas {masters replicas} { # Setup method to be executed to configure the cluster before the # tests run. proc cluster_setup {masters replicas node_count slot_allocator replica_allocator code} { + # Have all nodes meet if {$::tls} { set tls_cluster [lindex [R 0 CONFIG GET tls-cluster] 1] diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index feee94ef80..58781dda0f 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -248,6 +248,30 @@ proc CI {index field} { getInfoProperty [R $index cluster info] $field } +# Provide easy access to CLIENT INFO properties from CLIENT INFO string. +proc get_field_in_client_info {info field} { + set info [string trim $info] + foreach item [split $info " "] { + set kv [split $item "="] + set k [lindex $kv 0] + if {[string match $field $k]} { + return [lindex $kv 1] + } + } + return "" +} + +# Provide easy access to CLIENT INFO properties from CLIENT LIST string. +proc get_field_in_client_list {id client_list filed} { + set list [split $client_list "\r\n"] + foreach info $list { + if {[string match "id=$id *" $info] } { + return [get_field_in_client_info $info $filed] + } + } + return "" +} + # Test wrapped into run_solo are sent back from the client to the # test server, so that the test server will send them again to # clients once the clients are idle. @@ -264,6 +288,7 @@ proc cleanup {} { if {!$::quiet} {puts -nonewline "Cleanup: may take some time... "} flush stdout catch {exec rm -rf {*}[glob tests/tmp/valkey.conf.*]} + catch {exec rm -rf {*}[glob tests/tmp/nodes.conf.*]} catch {exec rm -rf {*}[glob tests/tmp/server*.*]} catch {exec rm -rf {*}[glob tests/tmp/*.acl.*]} if {!$::quiet} {puts "OK"} diff --git a/tests/unit/cluster/announced-endpoints.tcl b/tests/unit/cluster/announced-endpoints.tcl index b44a2a0e95..47ca8d2e41 100644 --- a/tests/unit/cluster/announced-endpoints.tcl +++ b/tests/unit/cluster/announced-endpoints.tcl @@ -53,4 +53,43 @@ start_cluster 2 2 {tags {external:skip cluster}} { R 0 config set cluster-announce-bus-port 0 assert_match "*@$base_bus_port *" [R 0 CLUSTER NODES] } + + test "Test change port and tls-port on runtime" { + if {$::tls} { + set baseport [lindex [R 0 config get tls-port] 1] + } else { + set baseport [lindex [R 0 config get port] 1] + } + set count [expr [llength $::servers] + 1] + set used_port [find_available_port $baseport $count] + + # We execute CLUSTER SLOTS command to trigger the `debugServerAssertWithInfo` in `clusterCommandSlots` function, ensuring + # that the cached response is invalidated upon updating any of port or tls-port. + R 0 CLUSTER SLOTS + R 1 CLUSTER SLOTS + + # Set port or tls-port to ensure changes are consistent across the cluster. + if {$::tls} { + R 0 config set tls-port $used_port + } else { + R 0 config set port $used_port + } + # Make sure changes in myself node's view are consistent. + assert_match "*:$used_port@*" [R 0 CLUSTER NODES] + assert_match "*$used_port*" [R 0 CLUSTER SLOTS] + # Make sure changes in other node's view are consistent. + wait_for_condition 50 100 { + [string match "*:$used_port@*" [R 1 CLUSTER NODES]] && + [string match "*$used_port*" [R 1 CLUSTER SLOTS]] + } else { + fail "Node port was not propagated via gossip" + } + + # Restore the original configuration item value. + if {$::tls} { + R 0 config set tls-port $baseport + } else { + R 0 config set port $baseport + } + } } diff --git a/tests/unit/cluster/diskless-load-swapdb.tcl b/tests/unit/cluster/diskless-load-swapdb.tcl index 68c2135493..e237fd81de 100644 --- a/tests/unit/cluster/diskless-load-swapdb.tcl +++ b/tests/unit/cluster/diskless-load-swapdb.tcl @@ -78,6 +78,11 @@ test "Main db not affected when fail to diskless load" { fail "Fail to stop the full sync" } + # Since we paused the primary node earlier, the replica may enter + # cluster down due to primary node pfail. Here set allow read to + # prevent subsequent read errors. + $replica config set cluster-allow-reads-when-down yes + # Replica keys and keys to slots map still both are right assert_equal {1} [$replica get $slot0_key] assert_equal $slot0_key [$replica CLUSTER GETKEYSINSLOT 0 1] diff --git a/tests/unit/cluster/divergent-cluster-shardid-conf.tcl b/tests/unit/cluster/divergent-cluster-shardid-conf.tcl new file mode 100644 index 0000000000..6885eac214 --- /dev/null +++ b/tests/unit/cluster/divergent-cluster-shardid-conf.tcl @@ -0,0 +1,17 @@ +tags {external:skip cluster singledb} { + set old_singledb $::singledb + set ::singledb 1 + # Start a cluster with a divergent shard ID configuration + test "divergent cluster shardid conflict" { + for {set i 1} {$i <= 4} {incr i} { + if {$::verbose} { puts "Testing for tests/assets/divergent-shard-$i.conf"; flush stdout;} + exec cp -f tests/assets/divergent-shard-$i.conf tests/tmp/nodes.conf.divergent + start_server {overrides {"cluster-enabled" "yes" "cluster-config-file" "../nodes.conf.divergent"}} { + set shardid [r CLUSTER MYSHARDID] + set count [exec grep -c $shardid tests/tmp/nodes.conf.divergent]; + assert_equal $count 2 "Expect shard ID to be present twice in the configuration file" + } + } + } + set ::singledb $old_singledb +} diff --git a/tests/unit/cluster/failover2.tcl b/tests/unit/cluster/failover2.tcl index 7bc6a05e95..3411e7feff 100644 --- a/tests/unit/cluster/failover2.tcl +++ b/tests/unit/cluster/failover2.tcl @@ -64,3 +64,174 @@ start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-ping-interval } } ;# start_cluster + +start_cluster 7 3 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 5000}} { + test "Primaries will not time out then they are elected in the same epoch" { + # Since we have the delay time, so these node may not initiate the + # election at the same time (same epoch). But if they do, we make + # sure there is no failover timeout. + + # Killing there primary nodes. + pause_process [srv 0 pid] + pause_process [srv -1 pid] + pause_process [srv -2 pid] + + # Wait for the failover + wait_for_condition 1000 50 { + [s -7 role] == "master" && + [s -8 role] == "master" && + [s -9 role] == "master" + } else { + fail "No failover detected" + } + + # Make sure there is no false epoch 0. + verify_no_log_message -7 "*Failover election in progress for epoch 0*" 0 + verify_no_log_message -8 "*Failover election in progress for epoch 0*" 0 + verify_no_log_message -9 "*Failover election in progress for epoch 0*" 0 + + # Make sure there is no failover timeout. + verify_no_log_message -7 "*Failover attempt expired*" 0 + verify_no_log_message -8 "*Failover attempt expired*" 0 + verify_no_log_message -9 "*Failover attempt expired*" 0 + + # Resuming these primary nodes, speed up the shutdown. + resume_process [srv 0 pid] + resume_process [srv -1 pid] + resume_process [srv -2 pid] + } +} ;# start_cluster + +run_solo {cluster} { + start_cluster 32 15 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 15000}} { + test "Multiple primary nodes are down, rank them based on the failed primary" { + # Killing these primary nodes. + for {set j 0} {$j < 15} {incr j} { + pause_process [srv -$j pid] + } + + # Make sure that a node starts failover. + wait_for_condition 1000 100 { + [s -40 role] == "master" + } else { + fail "No failover detected" + } + + # Wait for the cluster state to become ok. + for {set j 0} {$j < [llength $::servers]} {incr j} { + if {[process_is_paused [srv -$j pid]]} continue + wait_for_condition 1000 100 { + [CI $j cluster_state] eq "ok" + } else { + fail "Cluster node $j cluster_state:[CI $j cluster_state]" + } + } + + # Resuming these primary nodes, speed up the shutdown. + for {set j 0} {$j < 15} {incr j} { + resume_process [srv -$j pid] + } + } + } ;# start_cluster +} ;# run_solo + +# Needs to run in the body of +# start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-replica-validity-factor 0}} +proc test_replica_config_epoch_failover {type} { + test "Replica can update the config epoch when trigger the failover - $type" { + set CLUSTER_PACKET_TYPE_NONE -1 + set CLUSTER_PACKET_TYPE_ALL -2 + + if {$type == "automatic"} { + R 3 CONFIG SET cluster-replica-no-failover no + } elseif {$type == "manual"} { + R 3 CONFIG SET cluster-replica-no-failover yes + } + R 3 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_ALL + R 3 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 1 + + set R0_nodeid [R 0 cluster myid] + + # R 0 is the first node, so we expect its epoch to be the smallest, + # so bumpepoch must succeed and it's config epoch will be changed. + set res [R 0 cluster bumpepoch] + assert_match {BUMPED *} $res + set R0_config_epoch [lindex $res 1] + + # Wait for the config epoch to propagate across the cluster. + wait_for_condition 1000 10 { + $R0_config_epoch == [dict get [cluster_get_node_by_id 1 $R0_nodeid] config_epoch] && + $R0_config_epoch == [dict get [cluster_get_node_by_id 2 $R0_nodeid] config_epoch] + } else { + fail "Other primaries does not update config epoch" + } + # Make sure that replica do not update config epoch. + assert_not_equal $R0_config_epoch [dict get [cluster_get_node_by_id 3 $R0_nodeid] config_epoch] + + # Pause the R 0 and wait for the cluster to be down. + pause_process [srv 0 pid] + R 3 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + R 3 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 0 + wait_for_condition 1000 50 { + [CI 1 cluster_state] == "fail" && + [CI 2 cluster_state] == "fail" && + [CI 3 cluster_state] == "fail" + } else { + fail "Cluster does not fail" + } + + # Make sure both the automatic and the manual failover will fail in the first time. + if {$type == "automatic"} { + wait_for_log_messages -3 {"*Failover attempt expired*"} 0 1000 10 + } elseif {$type == "manual"} { + R 3 cluster failover force + wait_for_log_messages -3 {"*Manual failover timed out*"} 0 1000 10 + } + + # Make sure the primaries prints the relevant logs. + wait_for_log_messages -1 {"*Failover auth denied to* epoch * > reqConfigEpoch*"} 0 1000 10 + wait_for_log_messages -1 {"*has old slots configuration, sending an UPDATE message about*"} 0 1000 10 + wait_for_log_messages -2 {"*Failover auth denied to* epoch * > reqConfigEpoch*"} 0 1000 10 + wait_for_log_messages -2 {"*has old slots configuration, sending an UPDATE message about*"} 0 1000 10 + + # Make sure the replica has updated the config epoch. + wait_for_condition 1000 10 { + $R0_config_epoch == [dict get [cluster_get_node_by_id 1 $R0_nodeid] config_epoch] + } else { + fail "The replica does not update the config epoch" + } + + if {$type == "manual"} { + # The second manual failure will succeed because the config epoch + # has already propagated. + R 3 cluster failover force + } + + # Wait for the failover to success. + wait_for_condition 1000 50 { + [s -3 role] == "master" && + [CI 1 cluster_state] == "ok" && + [CI 2 cluster_state] == "ok" && + [CI 3 cluster_state] == "ok" + } else { + fail "Failover does not happen" + } + + # Restore the old primary, make sure it can covert + resume_process [srv 0 pid] + wait_for_condition 1000 50 { + [s 0 role] == "slave" && + [CI 0 cluster_state] == "ok" + } else { + fail "The old primary was not converted into replica" + } + } +} + +start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-replica-validity-factor 0}} { + test_replica_config_epoch_failover "automatic" +} + +start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-replica-validity-factor 0}} { + test_replica_config_epoch_failover "manual" +} diff --git a/tests/unit/cluster/manual-failover.tcl b/tests/unit/cluster/manual-failover.tcl index 2a9dff934b..a281cb58f2 100644 --- a/tests/unit/cluster/manual-failover.tcl +++ b/tests/unit/cluster/manual-failover.tcl @@ -183,3 +183,45 @@ test "Wait for instance #0 to return back alive" { } } ;# start_cluster + +start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 15000}} { + test "Manual failover will reset the on-going election" { + set CLUSTER_PACKET_TYPE_FAILOVER_AUTH_REQUEST 5 + set CLUSTER_PACKET_TYPE_NONE -1 + + # Let other primaries drop FAILOVER_AUTH_REQUEST so that the election won't + # get the enough votes and the election will time out. + R 1 debug drop-cluster-packet-filter $CLUSTER_PACKET_TYPE_FAILOVER_AUTH_REQUEST + R 2 debug drop-cluster-packet-filter $CLUSTER_PACKET_TYPE_FAILOVER_AUTH_REQUEST + + # Replica doing the manual failover. + R 3 cluster failover + + # Waiting for primary and replica to confirm manual failover timeout. + wait_for_log_messages 0 {"*Manual failover timed out*"} 0 1000 50 + wait_for_log_messages -3 {"*Manual failover timed out*"} 0 1000 50 + set loglines1 [count_log_lines 0] + set loglines2 [count_log_lines -3] + + # Undo packet drop, so that replica can win the next election. + R 1 debug drop-cluster-packet-filter $CLUSTER_PACKET_TYPE_NONE + R 2 debug drop-cluster-packet-filter $CLUSTER_PACKET_TYPE_NONE + + # Replica doing the manual failover again. + R 3 cluster failover + + # Make sure the election is reset. + wait_for_log_messages -3 {"*Failover election in progress*Resetting the election*"} $loglines2 1000 50 + + # Wait for failover. + wait_for_condition 1000 50 { + [s -3 role] == "master" + } else { + fail "No failover detected" + } + + # Make sure that the second manual failover does not time out. + verify_no_log_message 0 "*Manual failover timed out*" $loglines1 + verify_no_log_message -3 "*Manual failover timed out*" $loglines2 + } +} ;# start_cluster diff --git a/tests/unit/cluster/replica-migration.tcl b/tests/unit/cluster/replica-migration.tcl index 05d6528684..d04069ef16 100644 --- a/tests/unit/cluster/replica-migration.tcl +++ b/tests/unit/cluster/replica-migration.tcl @@ -400,3 +400,23 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} { test_cluster_setslot "setslot" } my_slot_allocation cluster_allocate_replicas ;# start_cluster + +start_cluster 3 0 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} { + test "Empty primary will check and delete the dirty slots" { + R 2 config set cluster-allow-replica-migration no + + # Write a key to slot 0. + R 2 incr key_977613 + + # Move slot 0 from primary 2 to primary 0. + R 0 cluster bumpepoch + R 0 cluster setslot 0 node [R 0 cluster myid] + + # Wait for R 2 to report that it is an empty primary (cluster-allow-replica-migration no) + wait_for_log_messages -2 {"*I am now an empty primary*"} 0 1000 50 + + # Make sure primary 0 will delete the dirty slots. + verify_log_message -2 "*Deleting keys in dirty slot 0*" 0 + assert_equal [R 2 dbsize] 0 + } +} my_slot_allocation cluster_allocate_replicas ;# start_cluster diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl index b5b19ff481..4bbaab20c2 100644 --- a/tests/unit/cluster/sharded-pubsub.tcl +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -53,4 +53,29 @@ start_cluster 1 1 {tags {external:skip cluster}} { catch {[$replica EXEC]} err assert_match {EXECABORT*} $err } + + test "SSUBSCRIBE client killed during transaction" { + # Create two clients + set rd1 [valkey_deferring_client $primary_id] + + # Get client 1 ID + $rd1 client id + set rd1_id [$rd1 read] + # Client1 subscribes to a shard channel + $rd1 ssubscribe channel0 + + # Wait for the subscription to be acknowledged + assert_equal {ssubscribe channel0 1} [$rd1 read] + + # Client2 starts a transaction + assert_equal {OK} [$primary multi] + + # sets a key so that its slot will be set to the slot of that key. + assert_equal {QUEUED} [$primary set k v] + # Kill client1 inside client2's transaction + assert_equal {QUEUED} [$primary client kill id $rd1_id] + + # Execute the transaction + assert_equal {OK 1} [$primary exec] "Transaction execution should return OK and kill count" + } } \ No newline at end of file diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 286b02b7d0..ae992bc51b 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -19,28 +19,6 @@ start_server {tags {"introspection"}} { r client info } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*} - proc get_field_in_client_info {info field} { - set info [string trim $info] - foreach item [split $info " "] { - set kv [split $item "="] - set k [lindex $kv 0] - if {[string match $field $k]} { - return [lindex $kv 1] - } - } - return "" - } - - proc get_field_in_client_list {id client_list filed} { - set list [split $client_list "\r\n"] - foreach info $list { - if {[string match "id=$id *" $info] } { - return [get_field_in_client_info $info $filed] - } - } - return "" - } - proc get_client_tot_in_out_cmds {id} { set info_list [r client list] set in [get_field_in_client_list $id $info_list "tot-net-in"] diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index d5a6a6efe2..4dc04f6e69 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -14,7 +14,7 @@ proc test_memory_efficiency {range} { for {set j 0} {$j < 10000} {incr j} { $rd read ; # Discard replies } - + $rd close set current_mem [s used_memory] set used [expr {$current_mem-$base_mem}] set efficiency [expr {double($written)/$used}] @@ -37,6 +37,15 @@ start_server {tags {"memefficiency external:skip"}} { } run_solo {defrag} { + # Make a deferring client with CLIENT REPLY OFF sync with the server by + # temporarily setting CLIENT REPLY ON and waiting for the reply, then + # switching back to CLIENT REPLY OFF. + proc client_reply_off_wait_for_server {rd} { + $rd client reply on + assert_equal OK [$rd read] + $rd client reply off + } + proc test_active_defrag {type} { if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} { test "Active defrag main dictionary: $type" { @@ -194,14 +203,12 @@ run_solo {defrag} { # Populate memory with interleaving script-key pattern of same size set dummy_script "--[string repeat x 400]\nreturn " set rd [valkey_deferring_client] + $rd client reply off for {set j 0} {$j < $n} {incr j} { set val "$dummy_script[format "%06d" $j]" $rd script load $val $rd set k$j $val - } - for {set j 0} {$j < $n} {incr j} { - $rd read ; # Discard script load replies - $rd read ; # Discard set replies + if {$j % 1000 == 999} {client_reply_off_wait_for_server $rd} } after 120 ;# serverCron only updates the info once in 100ms if {$::verbose} { @@ -213,8 +220,10 @@ run_solo {defrag} { assert_lessthan [s allocator_frag_ratio] 1.05 # Delete all the keys to create fragmentation - for {set j 0} {$j < $n} {incr j} { $rd del k$j } - for {set j 0} {$j < $n} {incr j} { $rd read } ; # Discard del replies + for {set j 0} {$j < $n} {incr j} { + $rd del k$j + if {$j % 1000 == 999} {client_reply_off_wait_for_server $rd} + } $rd close after 120 ;# serverCron only updates the info once in 100ms if {$::verbose} { @@ -287,15 +296,14 @@ run_solo {defrag} { # create big keys with 10k items set rd [valkey_deferring_client] + $rd client reply off for {set j 0} {$j < 10000} {incr j} { $rd hset bighash $j [concat "asdfasdfasdf" $j] $rd lpush biglist [concat "asdfasdfasdf" $j] $rd zadd bigzset $j [concat "asdfasdfasdf" $j] $rd sadd bigset [concat "asdfasdfasdf" $j] $rd xadd bigstream * item 1 value a - } - for {set j 0} {$j < 50000} {incr j} { - $rd read ; # Discard replies + if {$j % 100 == 99} {client_reply_off_wait_for_server $rd} } # create some small items (effective in cluster-enabled) @@ -311,9 +319,7 @@ run_solo {defrag} { # scale the hash to 1m fields in order to have a measurable the latency for {set j 10000} {$j < 1000000} {incr j} { $rd hset bighash $j [concat "asdfasdfasdf" $j] - } - for {set j 10000} {$j < 1000000} {incr j} { - $rd read ; # Discard replies + if {$j % 1000 == 999} {client_reply_off_wait_for_server $rd} } # creating that big hash, increased used_memory, so the relative frag goes down set expected_frag 1.3 @@ -322,19 +328,16 @@ run_solo {defrag} { # add a mass of string keys for {set j 0} {$j < 500000} {incr j} { $rd setrange $j 150 a - } - for {set j 0} {$j < 500000} {incr j} { - $rd read ; # Discard replies + if {$j % 1000 == 999} {client_reply_off_wait_for_server $rd} } assert_equal [r dbsize] 500015 # create some fragmentation for {set j 0} {$j < 500000} {incr j 2} { $rd del $j + if {$j % 1000 == 998} {client_reply_off_wait_for_server $rd} } - for {set j 0} {$j < 500000} {incr j 2} { - $rd read ; # Discard replies - } + $rd close assert_equal [r dbsize] 250015 # start defrag @@ -440,8 +443,11 @@ run_solo {defrag} { assert_lessthan [s allocator_frag_ratio] 1.05 # Delete all the keys to create fragmentation - for {set j 0} {$j < $n} {incr j} { $rd del k$j } - for {set j 0} {$j < $n} {incr j} { $rd read } ; # Discard del replies + $rd client reply off + for {set j 0} {$j < $n} {incr j} { + $rd del k$j + if {$j % 1000 == 999} {client_reply_off_wait_for_server $rd} + } $rd close after 120 ;# serverCron only updates the info once in 100ms if {$::verbose} { @@ -519,6 +525,7 @@ run_solo {defrag} { # create big keys with 10k items set rd [valkey_deferring_client] + $rd client reply off set expected_frag 1.7 # add a mass of list nodes to two lists (allocations are interlaced) @@ -527,10 +534,7 @@ run_solo {defrag} { for {set j 0} {$j < $elements} {incr j} { $rd lpush biglist1 $val $rd lpush biglist2 $val - } - for {set j 0} {$j < $elements} {incr j} { - $rd read ; # Discard replies - $rd read ; # Discard replies + if {$j % 1000 == 999} {client_reply_off_wait_for_server $rd} } # create some fragmentation @@ -638,12 +642,11 @@ run_solo {defrag} { # add a mass of keys with 600 bytes values, fill the bin of 640 bytes which has 32 regs per slab. set rd [valkey_deferring_client] + $rd client reply off set keys 640000 for {set j 0} {$j < $keys} {incr j} { $rd setrange $j 600 x - } - for {set j 0} {$j < $keys} {incr j} { - $rd read ; # Discard replies + if {$j % 1000 == 999} {client_reply_off_wait_for_server $rd} } # create some fragmentation of 50% @@ -652,9 +655,7 @@ run_solo {defrag} { $rd del $j incr sent incr j 1 - } - for {set j 0} {$j < $sent} {incr j} { - $rd read ; # Discard replies + if {$j % 1000 == 998} {client_reply_off_wait_for_server $rd} } # create higher fragmentation in the first slab @@ -720,11 +721,11 @@ run_solo {defrag} { } } - start_cluster 1 0 {tags {"defrag external:skip cluster"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save ""}} { + start_cluster 1 0 {tags {"defrag external:skip cluster"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save "" lazyfree-lazy-user-del no}} { test_active_defrag "cluster" } - start_server {tags {"defrag external:skip standalone"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save ""}} { + start_server {tags {"defrag external:skip standalone"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save "" lazyfree-lazy-user-del no}} { test_active_defrag "standalone" } } ;# run_solo diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl index 6e6230fc19..cb7d45b98f 100644 --- a/tests/unit/other.tcl +++ b/tests/unit/other.tcl @@ -30,6 +30,10 @@ start_server {tags {"other"}} { } } + test {Coverage: ECHO} { + assert_equal bang [r ECHO bang] + } + test {SAVE - make sure there are all the types as values} { # Wait for a background saving in progress to terminate waitForBgsave r diff --git a/tests/unit/pause.tcl b/tests/unit/pause.tcl index 121c603324..63ff0b4e7a 100644 --- a/tests/unit/pause.tcl +++ b/tests/unit/pause.tcl @@ -405,7 +405,36 @@ start_server {tags {"pause network"}} { r client unpause assert_equal [r randomkey] {} + } + + test "CLIENT UNBLOCK is not allow to unblock client blocked by CLIENT PAUSE" { + set rd1 [valkey_deferring_client] + set rd2 [valkey_deferring_client] + $rd1 client id + $rd2 client id + set client_id1 [$rd1 read] + set client_id2 [$rd2 read] + + r del mylist + r client pause 100000 write + $rd1 blpop mylist 0 + $rd2 blpop mylist 0 + wait_for_blocked_clients_count 2 50 100 + # This used to trigger a panic. + assert_equal 0 [r client unblock $client_id1 timeout] + # THis used to return a UNBLOCKED error. + assert_equal 0 [r client unblock $client_id2 error] + + # After the unpause, it must be able to unblock the client. + r client unpause + assert_equal 1 [r client unblock $client_id1 timeout] + assert_equal 1 [r client unblock $client_id2 error] + assert_equal {} [$rd1 read] + assert_error "UNBLOCKED*" {$rd2 read} + + $rd1 close + $rd2 close } # Make sure we unpause at the end diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index db37129103..3f9b2cf174 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -2457,4 +2457,27 @@ start_server {tags {"scripting"}} { assert { [r memory usage foo] <= $expected_memory}; } } + + test {EVAL - Scripts support NULL byte} { + assert_equal [r eval "return \"\x00\";" 0] "\x00" + # Using a null byte never seemed to work with functions, so + # we don't have a test for that case. + } + + test {EVAL - explicit error() call handling} { + # error("simple string error") + assert_error {ERR user_script:1: simple string error script: *} { + r eval "error('simple string error')" 0 + } + + # error({"err": "ERR table error"}) + assert_error {ERR table error script: *} { + r eval "error({err='ERR table error'})" 0 + } + + # error({}) + assert_error {ERR unknown error script: *} { + r eval "error({})" 0 + } + } } diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index 313293dcb7..73bfd4b1e9 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -36,6 +36,34 @@ start_server {tags {"tracking network logreqres:skip"}} { } } + test {Client tracking prefixes memory overhead} { + r CLIENT TRACKING off + set tot_mem_before [get_field_in_client_info [r client info] "tot-mem"] + + # We add multiple $i to prefix to avoid prefix conflicts, so in this + # args we will have about 20000 rax nodes. + set args {} + for {set i 0} {$i < 10240} {incr i} { + lappend args PREFIX + lappend args PREFIX-$i-$i-$i-$i-$i-$i-$i + } + r CLIENT TRACKING on BCAST {*}$args + + set arch_bits [s arch_bits] + set tot_mem_after [get_field_in_client_info [r client info] "tot-mem"] + set diff [expr $tot_mem_after - $tot_mem_before] + + # In 64 bits, before we would consume about 20000 * (4 * 8), that is 640000. + # And now we are 20000 * (4 + 8), that is 240000. + if {$arch_bits == 64} { + assert_lessthan $diff 300000 + } elseif {$arch_bits == 32} { + assert_lessthan $diff 200000 + } + + r CLIENT TRACKING off + } + test {Clients are able to enable tracking and redirect it} { r CLIENT TRACKING on REDIRECT $redir_id } {*OK} diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 83a93bffbf..0aa1a1f9f1 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -2430,4 +2430,27 @@ foreach {pop} {BLPOP BLMPOP_RIGHT} { close_replication_stream $repl } {} {needs:repl} + test "Blocking timeout following PAUSE should honor the timeout" { + # cleanup first + r del mylist + + # create a test client + set rd [valkey_deferring_client] + + # first PAUSE all writes for a very long time + r client pause 10000000000000 write + + # block a client on the list + $rd BLPOP mylist 1 + wait_for_blocked_clients_count 1 + + # now unpause the writes + r client unpause + + # client should time-out + wait_for_blocked_clients_count 0 + + $rd close + } + } ;# stop servers