From c677bb1111bc12288ced27d29e1f006989a10f91 Mon Sep 17 00:00:00 2001 From: Ram Prasad Voleti Date: Fri, 12 Apr 2024 23:59:04 +0000 Subject: [PATCH 1/3] Maintain deterministic order of CLUSTER SHARDS response Maintain deterministic order of CLUSTER SHARDS response. Currently we don't maintain the shards/masters in sorted fashion and hence we get the order of CLUSTER SHARDS response non-deterministic on different nodes. Maintain the sorted Masters list of pointers, similar to replicas, and get rid of > dict which is not suitable for sorting. Add TOPOLOGY argument to get the deterministic response which would remove the replication offset and node health status from cluster shards response. Sort the nodes based on the node Id. Use it in proc `cluster_config_consistent` for the test coverage and sanity purpose. Signed-off-by: Ram Prasad Voleti --- src/cluster.c | 18 ++- src/cluster.h | 2 +- src/cluster_legacy.c | 222 ++++++++++++++++--------------- src/cluster_legacy.h | 3 +- src/commands.def | 12 +- src/commands/cluster-shards.json | 17 ++- tests/cluster/cluster.tcl | 8 +- tests/support/cluster_util.tcl | 7 +- 8 files changed, 161 insertions(+), 128 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 45fde52842..6fe2e4f4c6 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -849,10 +849,20 @@ void clusterCommand(client *c) { } else if (!strcasecmp(c->argv[1]->ptr, "slots") && c->argc == 2) { /* CLUSTER SLOTS */ clusterCommandSlots(c); - } else if (!strcasecmp(c->argv[1]->ptr, "shards") && c->argc == 2) { - /* CLUSTER SHARDS */ - clusterCommandShards(c); - } else if (!strcasecmp(c->argv[1]->ptr, "info") && c->argc == 2) { + } else if (!strcasecmp(c->argv[1]->ptr,"shards") && + (c->argc == 2 || c->argc == 3)) + { + /* CLUSTER SHARDS [TOPOLOGY] */ + int topology = 1; + if (c->argc == 3 && (strcasecmp(c->argv[2]->ptr,"topology"))) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } else if (c->argc == 2) { + topology = 0; + } + + clusterCommandShards(c, topology); + } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { /* CLUSTER INFO */ sds info = genClusterInfoString(); diff --git a/src/cluster.h b/src/cluster.h index a83b4ac282..f573ffc224 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -73,7 +73,7 @@ int clusterManualFailoverTimeLimit(void); void clusterCommandSlots(client *c); void clusterCommandMyId(client *c); void clusterCommandMyShardId(client *c); -void clusterCommandShards(client *c); +void clusterCommandShards(client *c, int topology); sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary); int clusterNodeCoversSlot(clusterNode *n, int slot); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 61b5af8e29..7b75f0d1d8 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -64,6 +64,8 @@ void clusterUpdateState(void); int clusterNodeCoversSlot(clusterNode *n, int slot); list *clusterGetNodesInMyShard(clusterNode *node); int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica); +int clusterNodeAddMaster(clusterNode *master); +int clusterNodeRemoveMaster(clusterNode *master); int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); int clusterDelNodeSlots(clusterNode *node); @@ -95,9 +97,7 @@ void removeChannelsInSlot(unsigned int slot); unsigned int countKeysInSlot(unsigned int hashslot); unsigned int countChannelsInSlot(unsigned int hashslot); unsigned int delKeysInSlot(unsigned int hashslot); -void clusterAddNodeToShard(const char *shard_id, clusterNode *node); list *clusterLookupNodeListByShardId(const char *shard_id); -void clusterRemoveNodeFromShard(clusterNode *node); int auxShardIdSetter(clusterNode *n, void *value, int length); sds auxShardIdGetter(clusterNode *n, sds s); int auxShardIdPresent(clusterNode *n); @@ -237,7 +237,9 @@ int auxShardIdSetter(clusterNode *n, void *value, int length) { return C_ERR; } } - clusterAddNodeToShard(value, n); + /* Initially, during the load, add every node as master until the respective + * role is assigned with the persisted shard ID. */ + clusterNodeAddMaster(n); return C_OK; } @@ -322,6 +324,19 @@ typedef struct { * Initialization * -------------------------------------------------------------------------- */ +/* We initially assign a temporary node name, role, and shardID to the nodes other than `myself`. Once we finish the handshake + * with other nodes or once we finish loading all the information from nodes.conf file, we will know the actual information + * of the respective other nodes. A node can be said persisted if we have the permanent information of the node. We intend + * to maintain masters list only after knowing the permanent information of the node. */ +int isMasterPersisted(clusterNode *node) { + for (int j = 0; j < server.cluster->nummasters; j++) { + if (node == server.cluster->masters[j]) { + return 1; + } + } + return 0; +} + /* Load the cluster config from 'filename'. * * If the file does not exist or is zero-length (this may happen because @@ -573,14 +588,18 @@ int clusterLoadConfig(char *filename) { * shard_id in this case */ if (auxFieldHandlers[af_shard_id].isPresent(n) == 0) { memcpy(n->shard_id, primary->shard_id, CLUSTER_NAMELEN); - clusterAddNodeToShard(primary->shard_id, n); - } else if (clusterGetNodesInMyShard(primary) != NULL && - memcmp(primary->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) { + } else if (isMasterPersisted(primary) && + memcmp(primary->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) + { /* If the primary has been added to a shard, make sure this * node has the same persisted shard id as the primary. */ sdsfreesplitres(argv, argc); goto fmterr; } + /* Since the role of node is decided as replica, remove it from + * master list which was added initially during the load and continue + * maintain the persisted master list */ + clusterNodeRemoveMaster(n); n->replicaof = primary; clusterNodeAddReplica(primary, n); } else if (auxFieldHandlers[af_shard_id].isPresent(n) == 0) { @@ -588,7 +607,7 @@ int clusterLoadConfig(char *filename) { * This happens if we are loading a nodes.conf generated by * an older version of the server. We should manually update the * shard membership in this case */ - clusterAddNodeToShard(n->shard_id, n); + clusterNodeAddMaster(n); } /* Set ping sent / pong received timestamps */ @@ -920,18 +939,14 @@ static void updateAnnouncedHumanNodename(clusterNode *node, char *new) { static void updateShardId(clusterNode *node, const char *shard_id) { if (shard_id && memcmp(node->shard_id, shard_id, CLUSTER_NAMELEN) != 0) { - clusterRemoveNodeFromShard(node); memcpy(node->shard_id, shard_id, CLUSTER_NAMELEN); - clusterAddNodeToShard(shard_id, node); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } if (shard_id && myself != node && myself->replicaof == node) { if (memcmp(myself->shard_id, shard_id, CLUSTER_NAMELEN) != 0) { /* shard-id can diverge right after a rolling upgrade * from pre-7.2 releases */ - clusterRemoveNodeFromShard(myself); memcpy(myself->shard_id, shard_id, CLUSTER_NAMELEN); - clusterAddNodeToShard(shard_id, myself); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); } } @@ -966,7 +981,8 @@ void clusterInit(void) { server.cluster->size = 0; server.cluster->todo_before_sleep = 0; server.cluster->nodes = dictCreate(&clusterNodesDictType); - server.cluster->shards = dictCreate(&clusterSdsToListType); + server.cluster->masters = NULL; + server.cluster->nummasters = 0; server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType); server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; @@ -1000,7 +1016,7 @@ void clusterInit(void) { myself = server.cluster->myself = createClusterNode(NULL, CLUSTER_NODE_MYSELF | CLUSTER_NODE_PRIMARY); serverLog(LL_NOTICE, "No cluster configuration found, I'm %.40s", myself->name); clusterAddNode(myself); - clusterAddNodeToShard(myself->shard_id, myself); + clusterNodeAddMaster(myself); saveconf = 1; } if (saveconf) clusterSaveConfigOrDie(1); @@ -1098,9 +1114,6 @@ void clusterReset(int hard) { /* Unassign all the slots. */ for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j); - /* Recreate shards dict */ - dictEmpty(server.cluster->shards, NULL); - /* Forget all the nodes, but myself. */ di = dictGetSafeIterator(server.cluster->nodes); while ((de = dictNext(di)) != NULL) { @@ -1134,8 +1147,8 @@ void clusterReset(int hard) { serverLog(LL_NOTICE, "Node hard reset, now I'm %.40s", myself->name); } - /* Re-populate shards */ - clusterAddNodeToShard(myself->shard_id, myself); + /* Re-populate masters */ + clusterNodeAddMaster(myself); /* Make sure to persist the new config and update the state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); @@ -1475,6 +1488,22 @@ int clusterNodeRemoveReplica(clusterNode *primary, clusterNode *replica) { return C_ERR; } +int clusterNodeRemoveMaster(clusterNode *master) { + for (int j = 0; j < server.cluster->nummasters; j++) { + if (server.cluster->masters[j] == master) { + if ((j+1) < server.cluster->nummasters) { + int remaining_masters = (server.cluster->nummasters - j) - 1; + memmove(server.cluster->masters+j,server.cluster->masters+(j+1), + (sizeof(*server.cluster->masters) * remaining_masters)); + } + server.cluster->nummasters--; + master->flags &= ~(CLUSTER_NODE_PRIMARY|CLUSTER_NODE_MIGRATE_TO); + return C_OK; + } + } + return C_ERR; +} + int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica) { int j; @@ -1489,6 +1518,19 @@ int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica) { return C_OK; } +int clusterNodeAddMaster(clusterNode *master) { + /* If it's already a master, don't add it again. */ + for (int j = 0; j < server.cluster->nummasters; j++) + if (server.cluster->masters[j] == master) return C_ERR; + server.cluster->masters = zrealloc(server.cluster->masters, + sizeof(clusterNode*)*((server.cluster->nummasters) + 1)); + server.cluster->masters[server.cluster->nummasters] = master; + server.cluster->nummasters++; + qsort(server.cluster->masters, server.cluster->nummasters, sizeof(clusterNode *), clusterNodeNameComparator); + master->flags |= CLUSTER_NODE_PRIMARY; + return C_OK; +} + int clusterCountNonFailingReplicas(clusterNode *n) { int j, ok_replicas = 0; @@ -1508,6 +1550,7 @@ void freeClusterNode(clusterNode *n) { /* Remove this node from the list of replicas of its primary. */ if (nodeIsReplica(n) && n->replicaof) clusterNodeRemoveReplica(n->replicaof, n); + else clusterNodeRemoveMaster(n); /* Unlink from the set of nodes. */ nodename = sdsnewlen(n->name, CLUSTER_NAMELEN); @@ -1566,9 +1609,6 @@ void clusterDelNode(clusterNode *delnode) { } dictReleaseIterator(di); - /* 3) Remove the node from the owning shard */ - clusterRemoveNodeFromShard(delnode); - /* 4) Free the node, unlinking it from the cluster. */ freeClusterNode(delnode); } @@ -1584,16 +1624,18 @@ clusterNode *clusterLookupNode(const char *name, int length) { } /* Get all the nodes in my shard. - * Note that the list returned is not computed on the fly - * via replicaof; rather, it is maintained permanently to - * track the shard membership and its life cycle is tied - * to this process. Therefore, the caller must not - * release the list. */ + * We ensure that we maintain the master nodes only after they have been assigned with a persisted + * shard ID. Generate the list of nodes in a shard if they are persisted, else return + * NULL. The caller should release the list */ list *clusterGetNodesInMyShard(clusterNode *node) { - sds s = sdsnewlen(node->shard_id, CLUSTER_NAMELEN); - dictEntry *de = dictFind(server.cluster->shards, s); - sdsfree(s); - return (de != NULL) ? dictGetVal(de) : NULL; + clusterNode *master = clusterNodeGetPrimary(node); + + list *l = listCreate(); + listAddNodeTail(l, master); + for (int i = 0; i < master->num_replicas; i++) { + listAddNodeTail(l, master->replicas[i]); + } + return l; } /* This is only used after the handshake. When we connect a given IP/PORT @@ -1610,39 +1652,7 @@ void clusterRenameNode(clusterNode *node, char *newname) { serverAssert(retval == DICT_OK); memcpy(node->name, newname, CLUSTER_NAMELEN); clusterAddNode(node); - clusterAddNodeToShard(node->shard_id, node); -} - -void clusterAddNodeToShard(const char *shard_id, clusterNode *node) { - sds s = sdsnewlen(shard_id, CLUSTER_NAMELEN); - dictEntry *de = dictFind(server.cluster->shards, s); - if (de == NULL) { - list *l = listCreate(); - listAddNodeTail(l, node); - serverAssert(dictAdd(server.cluster->shards, s, l) == DICT_OK); - } else { - list *l = dictGetVal(de); - if (listSearchKey(l, node) == NULL) { - listAddNodeTail(l, node); - } - sdsfree(s); - } -} - -void clusterRemoveNodeFromShard(clusterNode *node) { - sds s = sdsnewlen(node->shard_id, CLUSTER_NAMELEN); - dictEntry *de = dictFind(server.cluster->shards, s); - if (de != NULL) { - list *l = dictGetVal(de); - listNode *ln = listSearchKey(l, node); - if (ln != NULL) { - listDelNode(l, ln); - } - if (listLength(l) == 0) { - dictDelete(server.cluster->shards, s); - } - } - sdsfree(s); + clusterNodeAddMaster(node); } /* ----------------------------------------------------------------------------- @@ -2168,7 +2178,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { node->tls_port = msg_tls_port; node->cport = ntohs(g->cport); clusterAddNode(node); - clusterAddNodeToShard(node->shard_id, node); + clusterNodeAddMaster(node); } } @@ -2257,9 +2267,10 @@ void clusterSetNodeAsPrimary(clusterNode *n) { if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO; } n->flags &= ~CLUSTER_NODE_REPLICA; - n->flags |= CLUSTER_NODE_PRIMARY; n->replicaof = NULL; + clusterNodeAddMaster(n); + /* Update config and state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE); } @@ -3104,10 +3115,11 @@ int clusterProcessPacket(clusterLink *link) { /* Primary node changed for this replica? */ if (primary && sender->replicaof != primary) { - if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender); - serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s", - sender->name, sender->human_nodename, primary->name, primary->human_nodename, - sender->shard_id); + if (sender->replicaof) + clusterNodeRemoveReplica(sender->replicaof, sender); + else + clusterNodeRemoveMaster(sender); + clusterNodeAddReplica(primary, sender); sender->replicaof = primary; @@ -3948,6 +3960,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) { if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; clusterSendMessage(node->link, msgblock); } + listRelease(nodes_for_slot); clusterMsgSendBlockDecrRefCount(msgblock); } @@ -5037,20 +5050,15 @@ void clusterUpdateState(void) { * At the same time count the number of reachable primaries having * at least one slot. */ { - dictIterator *di; - dictEntry *de; - server.cluster->size = 0; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); - if (clusterNodeIsVotingPrimary(node)) { + for (int i = 0; i < server.cluster->nummasters; i++) { + if (server.cluster->masters[i] && server.cluster->masters[i]->numslots) { server.cluster->size++; - if ((node->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) == 0) reachable_primaries++; + if ((server.cluster->masters[i]->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0) + reachable_primaries++; } } - dictReleaseIterator(di); } /* If we are in a minority partition, change the cluster state @@ -5185,8 +5193,9 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) { serverAssert(myself->numslots == 0); if (clusterNodeIsPrimary(myself)) { - myself->flags &= ~(CLUSTER_NODE_PRIMARY | CLUSTER_NODE_MIGRATE_TO); + clusterNodeRemoveMaster(myself); myself->flags |= CLUSTER_NODE_REPLICA; + clusterCloseAllSlots(); } else { if (myself->replicaof) clusterNodeRemoveReplica(myself->replicaof, myself); } @@ -5551,7 +5560,7 @@ long long getNodeReplicationOffset(clusterNode *node) { } /* Add detailed information of a node to the output buffer of the given client. */ -void addNodeDetailsToShardReply(client *c, clusterNode *node) { +void addNodeDetailsToShardReply(client *c, clusterNode *node, int topology) { int reply_count = 0; void *node_replylen = addReplyDeferredLen(c); addReplyBulkCString(c, "id"); @@ -5590,35 +5599,32 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) { addReplyBulkCString(c, nodeIsReplica(node) ? "replica" : "master"); reply_count++; - addReplyBulkCString(c, "replication-offset"); - addReplyLongLong(c, node_offset); - reply_count++; + if (!topology) { + addReplyBulkCString(c, "replication-offset"); + addReplyLongLong(c, node_offset); + reply_count++; - addReplyBulkCString(c, "health"); - const char *health_msg = NULL; - if (nodeFailed(node)) { - health_msg = "fail"; - } else if (nodeIsReplica(node) && node_offset == 0) { - health_msg = "loading"; - } else { - health_msg = "online"; + addReplyBulkCString(c, "health"); + const char *health_msg = NULL; + if (nodeFailed(node)) { + health_msg = "fail"; + } else if (nodeIsReplica(node) && node_offset == 0) { + health_msg = "loading"; + } else { + health_msg = "online"; + } + addReplyBulkCString(c, health_msg); + reply_count++; } - addReplyBulkCString(c, health_msg); - reply_count++; setDeferredMapLen(c, node_replylen, reply_count); } /* Add the shard reply of a single shard based off the given primary node. */ -void addShardReplyForClusterShards(client *c, list *nodes) { - serverAssert(listLength(nodes) > 0); - clusterNode *n = listNodeValue(listFirst(nodes)); +void addShardReplyForClusterShards(client *c, clusterNode* n, int topology) { addReplyMapLen(c, 2); addReplyBulkCString(c, "slots"); - /* Use slot_info_pairs from the primary only */ - n = clusterNodeGetPrimary(n); - if (n->slot_info_pairs != NULL) { serverAssert((n->slot_info_pairs_count % 2) == 0); addReplyArrayLen(c, n->slot_info_pairs_count); @@ -5628,29 +5634,31 @@ void addShardReplyForClusterShards(client *c, list *nodes) { addReplyArrayLen(c, 0); } + list *nodes = clusterGetNodesInMyShard(n); + addReplyBulkCString(c, "nodes"); addReplyArrayLen(c, listLength(nodes)); listIter li; listRewind(nodes, &li); for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) { - clusterNode *n = listNodeValue(ln); - addNodeDetailsToShardReply(c, n); + n = listNodeValue(ln); + addNodeDetailsToShardReply(c, n, topology); clusterFreeNodesSlotsInfo(n); } + listRelease(nodes); } /* Add to the output buffer of the given client, an array of slot (start, end) * pair owned by the shard, also the primary and set of replica(s) along with * information about each node. */ -void clusterCommandShards(client *c) { - addReplyArrayLen(c, dictSize(server.cluster->shards)); +void clusterCommandShards(client *c, int topology) { + serverAssert(server.cluster->nummasters > 0); + addReplyArrayLen(c, server.cluster->nummasters); /* This call will add slot_info_pairs to all nodes */ clusterGenNodesSlotsInfo(0); - dictIterator *di = dictGetSafeIterator(server.cluster->shards); - for (dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) { - addShardReplyForClusterShards(c, dictGetVal(de)); + for (int i = 0; i < server.cluster->nummasters; i++) { + addShardReplyForClusterShards(c, server.cluster->masters[i], topology); } - dictReleaseIterator(di); } sds genClusterInfoString(void) { diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index d054d86017..7118c064b6 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -321,8 +321,9 @@ struct clusterState { int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */ int size; /* Num of primary nodes with at least one slot */ dict *nodes; /* Hash table of name -> clusterNode structures */ - dict *shards; /* Hash table of shard_id -> list (of nodes) structures */ dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */ + clusterNode **masters; /* pointers to master nodes */ + int nummasters; /* Number of master nodes */ clusterNode *migrating_slots_to[CLUSTER_SLOTS]; clusterNode *importing_slots_from[CLUSTER_SLOTS]; clusterNode *slots[CLUSTER_SLOTS]; diff --git a/src/commands.def b/src/commands.def index 4559c0aefe..f4c9fcb2a5 100644 --- a/src/commands.def +++ b/src/commands.def @@ -896,9 +896,7 @@ struct COMMAND_ARG CLUSTER_SETSLOT_Args[] = { #ifndef SKIP_CMD_TIPS_TABLE /* CLUSTER SHARDS tips */ -const char *CLUSTER_SHARDS_Tips[] = { -"nondeterministic_output", -}; +#define CLUSTER_SHARDS_Tips NULL #endif #ifndef SKIP_CMD_KEY_SPECS_TABLE @@ -906,6 +904,11 @@ const char *CLUSTER_SHARDS_Tips[] = { #define CLUSTER_SHARDS_Keyspecs NULL #endif +/* CLUSTER SHARDS argument table */ +struct COMMAND_ARG CLUSTER_SHARDS_Args[] = { +{MAKE_ARG("topology",ARG_TYPE_PURE_TOKEN,-1,"TOPOLOGY",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)}, +}; + /********** CLUSTER SLAVES ********************/ #ifndef SKIP_CMD_HISTORY_TABLE @@ -1029,9 +1032,8 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)}, {MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args}, {MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args}, -{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)}, +{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,0,clusterCommand,-2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,1),.args=CLUSTER_SHARDS_Args}, {MAKE_CMD("slaves","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args}, -{MAKE_CMD("slot-stats","Return an array of slot usage statistics for slots assigned to the current node.","O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-4,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args}, {MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)}, {0} }; diff --git a/src/commands/cluster-shards.json b/src/commands/cluster-shards.json index e63c129ea9..b0ac99b2f3 100644 --- a/src/commands/cluster-shards.json +++ b/src/commands/cluster-shards.json @@ -4,15 +4,20 @@ "complexity": "O(N) where N is the total number of cluster nodes", "group": "cluster", "since": "7.0.0", - "arity": 2, + "arity": -2, "container": "CLUSTER", "function": "clusterCommand", "command_flags": [ "LOADING", "STALE" ], - "command_tips": [ - "NONDETERMINISTIC_OUTPUT" + "arguments":[ + { + "name": "topology", + "type": "pure-token", + "token": "TOPOLOGY", + "optional": true + } ], "reply_schema": { "description": "A nested list of a map of hash ranges and shard nodes describing individual shards.", @@ -64,7 +69,8 @@ ] }, "replication-offset": { - "type": "integer" + "type": "integer", + "optional": true }, "health": { "oneOf": [ @@ -77,7 +83,8 @@ { "const": "online" } - ] + ], + "optional": true } } } diff --git a/tests/cluster/cluster.tcl b/tests/cluster/cluster.tcl index 0080501bf4..ec9bd2a3cd 100644 --- a/tests/cluster/cluster.tcl +++ b/tests/cluster/cluster.tcl @@ -198,10 +198,12 @@ proc cluster_config_consistent {} { } if {$j == 0} { - set base_cfg [R $j cluster slots] + set base_slots_cfg [R $j cluster slots] + set base_shards_cfg [R $j CLUSTER SHARDS TOPOLOGY] } else { - set cfg [R $j cluster slots] - if {$cfg != $base_cfg} { + set slots_cfg [R $j cluster slots] + set shards_cfg [R $j CLUSTER SHARDS TOPOLOGY] + if {$slots_cfg != $base_slots_cfg || $shards_cfg != $base_shards_cfg} { return 0 } } diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index 5708dfac7e..fccb9c8f02 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -103,9 +103,12 @@ proc cluster_config_consistent {} { } if {$j == 0} { - set base_cfg [R $j cluster slots] + set base_slots_cfg [R $j cluster slots] + set base_shards_cfg [R $j CLUSTER SHARDS TOPOLOGY] } else { - if {[R $j cluster slots] != $base_cfg} { + set slots_cfg [R $j cluster slots] + set shards_cfg [R $j CLUSTER SHARDS TOPOLOGY] + if {$slots_cfg != $base_slots_cfg || $shards_cfg != $base_shards_cfg} { return 0 } } From 622a1bf4c4c601c93bed4c92398040da20fce192 Mon Sep 17 00:00:00 2001 From: Ram Prasad Voleti Date: Fri, 17 May 2024 06:35:42 +0000 Subject: [PATCH 2/3] Remove topology argument and cleanup related code changes Remove topology argument and cleanup related code changes. Signed-off-by: Ram Prasad Voleti --- src/cluster.c | 16 +--- src/cluster.h | 2 +- src/cluster_legacy.c | 90 +++++++++++------------ src/commands.def | 8 +- src/commands/cluster-shards.json | 16 +--- tests/cluster/cluster.tcl | 8 +- tests/cluster/tests/28-cluster-shards.tcl | 21 ++++++ tests/support/cluster_util.tcl | 7 +- 8 files changed, 79 insertions(+), 89 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 6fe2e4f4c6..c61c36d768 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -849,19 +849,9 @@ void clusterCommand(client *c) { } else if (!strcasecmp(c->argv[1]->ptr, "slots") && c->argc == 2) { /* CLUSTER SLOTS */ clusterCommandSlots(c); - } else if (!strcasecmp(c->argv[1]->ptr,"shards") && - (c->argc == 2 || c->argc == 3)) - { - /* CLUSTER SHARDS [TOPOLOGY] */ - int topology = 1; - if (c->argc == 3 && (strcasecmp(c->argv[2]->ptr,"topology"))) { - addReplyErrorObject(c,shared.syntaxerr); - return; - } else if (c->argc == 2) { - topology = 0; - } - - clusterCommandShards(c, topology); + } else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) { + /* CLUSTER SHARDS */ + clusterCommandShards(c); } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { /* CLUSTER INFO */ diff --git a/src/cluster.h b/src/cluster.h index f573ffc224..a83b4ac282 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -73,7 +73,7 @@ int clusterManualFailoverTimeLimit(void); void clusterCommandSlots(client *c); void clusterCommandMyId(client *c); void clusterCommandMyShardId(client *c); -void clusterCommandShards(client *c, int topology); +void clusterCommandShards(client *c); sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary); int clusterNodeCoversSlot(clusterNode *n, int slot); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 7b75f0d1d8..dd33c172e6 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -64,8 +64,8 @@ void clusterUpdateState(void); int clusterNodeCoversSlot(clusterNode *n, int slot); list *clusterGetNodesInMyShard(clusterNode *node); int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica); -int clusterNodeAddMaster(clusterNode *master); -int clusterNodeRemoveMaster(clusterNode *master); +int clusterNodeAddToMasters(clusterNode *master); +int clusterNodeRemoveFromMasters(clusterNode *master); int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); int clusterDelNodeSlots(clusterNode *node); @@ -239,7 +239,7 @@ int auxShardIdSetter(clusterNode *n, void *value, int length) { } /* Initially, during the load, add every node as master until the respective * role is assigned with the persisted shard ID. */ - clusterNodeAddMaster(n); + clusterNodeAddToMasters(n); return C_OK; } @@ -599,7 +599,7 @@ int clusterLoadConfig(char *filename) { /* Since the role of node is decided as replica, remove it from * master list which was added initially during the load and continue * maintain the persisted master list */ - clusterNodeRemoveMaster(n); + clusterNodeRemoveFromMasters(n); n->replicaof = primary; clusterNodeAddReplica(primary, n); } else if (auxFieldHandlers[af_shard_id].isPresent(n) == 0) { @@ -607,7 +607,7 @@ int clusterLoadConfig(char *filename) { * This happens if we are loading a nodes.conf generated by * an older version of the server. We should manually update the * shard membership in this case */ - clusterNodeAddMaster(n); + clusterNodeAddToMasters(n); } /* Set ping sent / pong received timestamps */ @@ -1016,7 +1016,7 @@ void clusterInit(void) { myself = server.cluster->myself = createClusterNode(NULL, CLUSTER_NODE_MYSELF | CLUSTER_NODE_PRIMARY); serverLog(LL_NOTICE, "No cluster configuration found, I'm %.40s", myself->name); clusterAddNode(myself); - clusterNodeAddMaster(myself); + clusterNodeAddToMasters(myself); saveconf = 1; } if (saveconf) clusterSaveConfigOrDie(1); @@ -1148,7 +1148,7 @@ void clusterReset(int hard) { } /* Re-populate masters */ - clusterNodeAddMaster(myself); + clusterNodeAddToMasters(myself); /* Make sure to persist the new config and update the state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); @@ -1488,7 +1488,7 @@ int clusterNodeRemoveReplica(clusterNode *primary, clusterNode *replica) { return C_ERR; } -int clusterNodeRemoveMaster(clusterNode *master) { +int clusterNodeRemoveFromMasters(clusterNode *master) { for (int j = 0; j < server.cluster->nummasters; j++) { if (server.cluster->masters[j] == master) { if ((j+1) < server.cluster->nummasters) { @@ -1518,7 +1518,7 @@ int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica) { return C_OK; } -int clusterNodeAddMaster(clusterNode *master) { +int clusterNodeAddToMasters(clusterNode *master) { /* If it's already a master, don't add it again. */ for (int j = 0; j < server.cluster->nummasters; j++) if (server.cluster->masters[j] == master) return C_ERR; @@ -1550,7 +1550,7 @@ void freeClusterNode(clusterNode *n) { /* Remove this node from the list of replicas of its primary. */ if (nodeIsReplica(n) && n->replicaof) clusterNodeRemoveReplica(n->replicaof, n); - else clusterNodeRemoveMaster(n); + else clusterNodeRemoveFromMasters(n); /* Unlink from the set of nodes. */ nodename = sdsnewlen(n->name, CLUSTER_NAMELEN); @@ -1582,8 +1582,7 @@ void clusterAddNode(clusterNode *node) { * 1) Mark all the slots handled by it as unassigned. * 2) Remove all the failure reports sent by this node and referenced by * other nodes. - * 3) Remove the node from the owning shard - * 4) Free the node with freeClusterNode() that will in turn remove it + * 3) Free the node with freeClusterNode() that will in turn remove it * from the hash table and from the list of replicas of its primary, if * it is a replica node. */ @@ -1609,7 +1608,7 @@ void clusterDelNode(clusterNode *delnode) { } dictReleaseIterator(di); - /* 4) Free the node, unlinking it from the cluster. */ + /* 3) Free the node, unlinking it from the cluster. */ freeClusterNode(delnode); } @@ -1652,7 +1651,7 @@ void clusterRenameNode(clusterNode *node, char *newname) { serverAssert(retval == DICT_OK); memcpy(node->name, newname, CLUSTER_NAMELEN); clusterAddNode(node); - clusterNodeAddMaster(node); + clusterNodeAddToMasters(node); } /* ----------------------------------------------------------------------------- @@ -2178,7 +2177,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { node->tls_port = msg_tls_port; node->cport = ntohs(g->cport); clusterAddNode(node); - clusterNodeAddMaster(node); + clusterNodeAddToMasters(node); } } @@ -2269,7 +2268,7 @@ void clusterSetNodeAsPrimary(clusterNode *n) { n->flags &= ~CLUSTER_NODE_REPLICA; n->replicaof = NULL; - clusterNodeAddMaster(n); + clusterNodeAddToMasters(n); /* Update config and state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE); @@ -3118,8 +3117,10 @@ int clusterProcessPacket(clusterLink *link) { if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender); else - clusterNodeRemoveMaster(sender); - + clusterNodeRemoveFromMasters(sender); + serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s", + sender->name, sender->human_nodename, primary->name, primary->human_nodename, + sender->shard_id); clusterNodeAddReplica(primary, sender); sender->replicaof = primary; @@ -5193,7 +5194,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) { serverAssert(myself->numslots == 0); if (clusterNodeIsPrimary(myself)) { - clusterNodeRemoveMaster(myself); + clusterNodeRemoveFromMasters(myself); myself->flags |= CLUSTER_NODE_REPLICA; clusterCloseAllSlots(); } else { @@ -5560,7 +5561,7 @@ long long getNodeReplicationOffset(clusterNode *node) { } /* Add detailed information of a node to the output buffer of the given client. */ -void addNodeDetailsToShardReply(client *c, clusterNode *node, int topology) { +void addNodeDetailsToShardReply(client *c, clusterNode *node) { int reply_count = 0; void *node_replylen = addReplyDeferredLen(c); addReplyBulkCString(c, "id"); @@ -5599,50 +5600,48 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node, int topology) { addReplyBulkCString(c, nodeIsReplica(node) ? "replica" : "master"); reply_count++; - if (!topology) { - addReplyBulkCString(c, "replication-offset"); - addReplyLongLong(c, node_offset); - reply_count++; + addReplyBulkCString(c, "replication-offset"); + addReplyLongLong(c, node_offset); + reply_count++; - addReplyBulkCString(c, "health"); - const char *health_msg = NULL; - if (nodeFailed(node)) { - health_msg = "fail"; - } else if (nodeIsReplica(node) && node_offset == 0) { - health_msg = "loading"; - } else { - health_msg = "online"; - } - addReplyBulkCString(c, health_msg); - reply_count++; + addReplyBulkCString(c, "health"); + const char *health_msg = NULL; + if (nodeFailed(node)) { + health_msg = "fail"; + } else if (nodeIsReplica(node) && node_offset == 0) { + health_msg = "loading"; + } else { + health_msg = "online"; } + addReplyBulkCString(c, health_msg); + reply_count++; setDeferredMapLen(c, node_replylen, reply_count); } /* Add the shard reply of a single shard based off the given primary node. */ -void addShardReplyForClusterShards(client *c, clusterNode* n, int topology) { +void addShardReplyForClusterShards(client *c, clusterNode* primary) { addReplyMapLen(c, 2); addReplyBulkCString(c, "slots"); - if (n->slot_info_pairs != NULL) { - serverAssert((n->slot_info_pairs_count % 2) == 0); - addReplyArrayLen(c, n->slot_info_pairs_count); - for (int i = 0; i < n->slot_info_pairs_count; i++) addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]); + if (primary->slot_info_pairs != NULL) { + serverAssert((primary->slot_info_pairs_count % 2) == 0); + addReplyArrayLen(c, primary->slot_info_pairs_count); + for (int i = 0; i < primary->slot_info_pairs_count; i++) addReplyLongLong(c, (unsigned long)primary->slot_info_pairs[i]); } else { /* If no slot info pair is provided, the node owns no slots */ addReplyArrayLen(c, 0); } - list *nodes = clusterGetNodesInMyShard(n); + list *nodes = clusterGetNodesInMyShard(primary); addReplyBulkCString(c, "nodes"); addReplyArrayLen(c, listLength(nodes)); listIter li; listRewind(nodes, &li); for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) { - n = listNodeValue(ln); - addNodeDetailsToShardReply(c, n, topology); + clusterNode *n = listNodeValue(ln); + addNodeDetailsToShardReply(c, n); clusterFreeNodesSlotsInfo(n); } listRelease(nodes); @@ -5651,13 +5650,12 @@ void addShardReplyForClusterShards(client *c, clusterNode* n, int topology) { /* Add to the output buffer of the given client, an array of slot (start, end) * pair owned by the shard, also the primary and set of replica(s) along with * information about each node. */ -void clusterCommandShards(client *c, int topology) { - serverAssert(server.cluster->nummasters > 0); +void clusterCommandShards(client *c) { addReplyArrayLen(c, server.cluster->nummasters); /* This call will add slot_info_pairs to all nodes */ clusterGenNodesSlotsInfo(0); for (int i = 0; i < server.cluster->nummasters; i++) { - addShardReplyForClusterShards(c, server.cluster->masters[i], topology); + addShardReplyForClusterShards(c, server.cluster->masters[i]); } } diff --git a/src/commands.def b/src/commands.def index f4c9fcb2a5..c069393fe3 100644 --- a/src/commands.def +++ b/src/commands.def @@ -904,11 +904,6 @@ struct COMMAND_ARG CLUSTER_SETSLOT_Args[] = { #define CLUSTER_SHARDS_Keyspecs NULL #endif -/* CLUSTER SHARDS argument table */ -struct COMMAND_ARG CLUSTER_SHARDS_Args[] = { -{MAKE_ARG("topology",ARG_TYPE_PURE_TOKEN,-1,"TOPOLOGY",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)}, -}; - /********** CLUSTER SLAVES ********************/ #ifndef SKIP_CMD_HISTORY_TABLE @@ -1032,8 +1027,9 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)}, {MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args}, {MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args}, -{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,0,clusterCommand,-2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,1),.args=CLUSTER_SHARDS_Args}, +{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,0,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)}, {MAKE_CMD("slaves","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args}, +{MAKE_CMD("slot-stats","Return an array of slot usage statistics for slots assigned to the current node.","O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-4,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args}, {MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)}, {0} }; diff --git a/src/commands/cluster-shards.json b/src/commands/cluster-shards.json index b0ac99b2f3..ec782183fc 100644 --- a/src/commands/cluster-shards.json +++ b/src/commands/cluster-shards.json @@ -4,21 +4,13 @@ "complexity": "O(N) where N is the total number of cluster nodes", "group": "cluster", "since": "7.0.0", - "arity": -2, + "arity": 2, "container": "CLUSTER", "function": "clusterCommand", "command_flags": [ "LOADING", "STALE" ], - "arguments":[ - { - "name": "topology", - "type": "pure-token", - "token": "TOPOLOGY", - "optional": true - } - ], "reply_schema": { "description": "A nested list of a map of hash ranges and shard nodes describing individual shards.", "type": "array", @@ -69,8 +61,7 @@ ] }, "replication-offset": { - "type": "integer", - "optional": true + "type": "integer" }, "health": { "oneOf": [ @@ -83,8 +74,7 @@ { "const": "online" } - ], - "optional": true + ] } } } diff --git a/tests/cluster/cluster.tcl b/tests/cluster/cluster.tcl index ec9bd2a3cd..0080501bf4 100644 --- a/tests/cluster/cluster.tcl +++ b/tests/cluster/cluster.tcl @@ -198,12 +198,10 @@ proc cluster_config_consistent {} { } if {$j == 0} { - set base_slots_cfg [R $j cluster slots] - set base_shards_cfg [R $j CLUSTER SHARDS TOPOLOGY] + set base_cfg [R $j cluster slots] } else { - set slots_cfg [R $j cluster slots] - set shards_cfg [R $j CLUSTER SHARDS TOPOLOGY] - if {$slots_cfg != $base_slots_cfg || $shards_cfg != $base_shards_cfg} { + set cfg [R $j cluster slots] + if {$cfg != $base_cfg} { return 0 } } diff --git a/tests/cluster/tests/28-cluster-shards.tcl b/tests/cluster/tests/28-cluster-shards.tcl index d6534c816b..0b607566b0 100644 --- a/tests/cluster/tests/28-cluster-shards.tcl +++ b/tests/cluster/tests/28-cluster-shards.tcl @@ -285,3 +285,24 @@ test "CLUSTER MYSHARDID reports same shard id after cluster restart" { assert_equal [dict get $node_ids $i] [R $i cluster myshardid] } } + +test "Deterministic order of CLUSTER SHARDS response" { + set node_ids {} + for {set j 0} {$j < 8} {incr j} { + set shards_cfg [R $j CLUSTER SHARDS] + set i 0 + foreach shard_cfg $shards_cfg { + set nodes [dict get $shard_cfg nodes] + foreach node $nodes { + if {$j == 0} { + # Save the node ids from the first node response + dict set node_ids $i [dict get $node id] + } else { + # Verify the order of the node ids is the same as the first node response + assert_equal [dict get $node id] [dict get $node_ids $i] + } + incr i + } + } + } +} diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index fccb9c8f02..5708dfac7e 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -103,12 +103,9 @@ proc cluster_config_consistent {} { } if {$j == 0} { - set base_slots_cfg [R $j cluster slots] - set base_shards_cfg [R $j CLUSTER SHARDS TOPOLOGY] + set base_cfg [R $j cluster slots] } else { - set slots_cfg [R $j cluster slots] - set shards_cfg [R $j CLUSTER SHARDS TOPOLOGY] - if {$slots_cfg != $base_slots_cfg || $shards_cfg != $base_shards_cfg} { + if {[R $j cluster slots] != $base_cfg} { return 0 } } From d2303cf1b89f87fb949f3809d18d9b3e6266a72e Mon Sep 17 00:00:00 2001 From: Ram Prasad Voleti Date: Fri, 5 Jul 2024 05:02:37 +0000 Subject: [PATCH 3/3] Replace Dict with Rax for Cluster Nodes Replace Dict with Rax for Cluster Nodes and construct primaries list on the go, instead of maintaining shards/masters list. Signed-off-by: Ram Prasad Voleti --- src/cluster.c | 4 +- src/cluster_legacy.c | 418 +++++++++------------- src/cluster_legacy.h | 4 +- src/commands.def | 6 +- src/commands/cluster-shards.json | 3 + src/rax.h | 1 + tests/cluster/tests/28-cluster-shards.tcl | 21 -- tests/unit/cluster/cluster-shards.tcl | 22 ++ 8 files changed, 212 insertions(+), 267 deletions(-) create mode 100644 tests/unit/cluster/cluster-shards.tcl diff --git a/src/cluster.c b/src/cluster.c index c61c36d768..45fde52842 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -849,10 +849,10 @@ void clusterCommand(client *c) { } else if (!strcasecmp(c->argv[1]->ptr, "slots") && c->argc == 2) { /* CLUSTER SLOTS */ clusterCommandSlots(c); - } else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) { + } else if (!strcasecmp(c->argv[1]->ptr, "shards") && c->argc == 2) { /* CLUSTER SHARDS */ clusterCommandShards(c); - } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { + } else if (!strcasecmp(c->argv[1]->ptr, "info") && c->argc == 2) { /* CLUSTER INFO */ sds info = genClusterInfoString(); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index dd33c172e6..7c75fa848b 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -64,8 +64,7 @@ void clusterUpdateState(void); int clusterNodeCoversSlot(clusterNode *n, int slot); list *clusterGetNodesInMyShard(clusterNode *node); int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica); -int clusterNodeAddToMasters(clusterNode *master); -int clusterNodeRemoveFromMasters(clusterNode *master); +list *clusterGetPrimaries(void); int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); int clusterDelNodeSlots(clusterNode *node); @@ -147,17 +146,6 @@ static inline int defaultClientPort(void) { /* Fixed timeout value for cluster operations (milliseconds) */ #define CLUSTER_OPERATION_TIMEOUT 2000 -/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to - * clusterNode structures. */ -dictType clusterNodesDictType = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - dictSdsKeyCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ - NULL, /* val destructor */ - NULL /* allow to expand */ -}; - /* Cluster re-addition blacklist. This maps node IDs to the time * we can re-add this node. The goal is to avoid reading a removed * node for some time. */ @@ -170,16 +158,6 @@ dictType clusterNodesBlackListDictType = { NULL /* allow to expand */ }; -/* Cluster shards hash table, mapping shard id to list of nodes */ -dictType clusterSdsToListType = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - dictSdsKeyCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ - dictListDestructor, /* val destructor */ - NULL /* allow to expand */ -}; - /* Aux fields were introduced in Redis OSS 7.2 to support the persistence * of various important node properties, such as shard id, in nodes.conf. * Aux fields take an explicit format of name=value pairs and have no @@ -237,9 +215,6 @@ int auxShardIdSetter(clusterNode *n, void *value, int length) { return C_ERR; } } - /* Initially, during the load, add every node as master until the respective - * role is assigned with the persisted shard ID. */ - clusterNodeAddToMasters(n); return C_OK; } @@ -324,19 +299,6 @@ typedef struct { * Initialization * -------------------------------------------------------------------------- */ -/* We initially assign a temporary node name, role, and shardID to the nodes other than `myself`. Once we finish the handshake - * with other nodes or once we finish loading all the information from nodes.conf file, we will know the actual information - * of the respective other nodes. A node can be said persisted if we have the permanent information of the node. We intend - * to maintain masters list only after knowing the permanent information of the node. */ -int isMasterPersisted(clusterNode *node) { - for (int j = 0; j < server.cluster->nummasters; j++) { - if (node == server.cluster->masters[j]) { - return 1; - } - } - return 0; -} - /* Load the cluster config from 'filename'. * * If the file does not exist or is zero-length (this may happen because @@ -579,7 +541,9 @@ int clusterLoadConfig(char *filename) { goto fmterr; } primary = clusterLookupNode(argv[3], sdslen(argv[3])); + int primary_loaded = 1; if (!primary) { + primary_loaded = 0; primary = createClusterNode(argv[3], 0); clusterAddNode(primary); } @@ -588,26 +552,14 @@ int clusterLoadConfig(char *filename) { * shard_id in this case */ if (auxFieldHandlers[af_shard_id].isPresent(n) == 0) { memcpy(n->shard_id, primary->shard_id, CLUSTER_NAMELEN); - } else if (isMasterPersisted(primary) && - memcmp(primary->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) - { - /* If the primary has been added to a shard, make sure this + } else if (primary_loaded && memcmp(primary->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) { + /* If the primary information has already been loaded from the conf file, make sure this * node has the same persisted shard id as the primary. */ sdsfreesplitres(argv, argc); goto fmterr; } - /* Since the role of node is decided as replica, remove it from - * master list which was added initially during the load and continue - * maintain the persisted master list */ - clusterNodeRemoveFromMasters(n); n->replicaof = primary; clusterNodeAddReplica(primary, n); - } else if (auxFieldHandlers[af_shard_id].isPresent(n) == 0) { - /* n is a primary but it does not have a persisted shard_id. - * This happens if we are loading a nodes.conf generated by - * an older version of the server. We should manually update the - * shard membership in this case */ - clusterNodeAddToMasters(n); } /* Set ping sent / pong received timestamps */ @@ -980,9 +932,7 @@ void clusterInit(void) { server.cluster->state = CLUSTER_FAIL; server.cluster->size = 0; server.cluster->todo_before_sleep = 0; - server.cluster->nodes = dictCreate(&clusterNodesDictType); - server.cluster->masters = NULL; - server.cluster->nummasters = 0; + server.cluster->nodes = raxNew(); server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType); server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; @@ -1016,7 +966,6 @@ void clusterInit(void) { myself = server.cluster->myself = createClusterNode(NULL, CLUSTER_NODE_MYSELF | CLUSTER_NODE_PRIMARY); serverLog(LL_NOTICE, "No cluster configuration found, I'm %.40s", myself->name); clusterAddNode(myself); - clusterNodeAddToMasters(myself); saveconf = 1; } if (saveconf) clusterSaveConfigOrDie(1); @@ -1096,8 +1045,7 @@ void clusterInitLast(void) { * 6) The new configuration is saved and the cluster state updated. * 7) If the node was a replica, the whole data set is flushed away. */ void clusterReset(int hard) { - dictIterator *di; - dictEntry *de; + raxIterator ri; int j; /* Turn into primary. */ @@ -1115,22 +1063,22 @@ void clusterReset(int hard) { for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j); /* Forget all the nodes, but myself. */ - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node == myself) continue; clusterDelNode(node); + raxSeek(&ri, ">=", ri.key, ri.key_len); } - dictReleaseIterator(di); + raxStop(&ri); /* Empty the nodes blacklist. */ dictEmpty(server.cluster->nodes_black_list, NULL); /* Hard reset only: set epochs to 0, change node ID. */ if (hard) { - sds oldname; - server.cluster->currentEpoch = 0; server.cluster->lastVoteEpoch = 0; myself->configEpoch = 0; @@ -1138,18 +1086,13 @@ void clusterReset(int hard) { /* To change the Node ID we need to remove the old name from the * nodes table, change the ID, and re-add back with new name. */ - oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN); - dictDelete(server.cluster->nodes, oldname); - sdsfree(oldname); + raxRemove(server.cluster->nodes, (unsigned char *)myself->name, CLUSTER_NAMELEN, NULL); getRandomHexChars(myself->name, CLUSTER_NAMELEN); getRandomHexChars(myself->shard_id, CLUSTER_NAMELEN); clusterAddNode(myself); serverLog(LL_NOTICE, "Node hard reset, now I'm %.40s", myself->name); } - /* Re-populate masters */ - clusterNodeAddToMasters(myself); - /* Make sure to persist the new config and update the state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } @@ -1318,7 +1261,7 @@ unsigned long getClusterConnectionsCount(void) { /* We decrement the number of nodes by one, since there is the * "myself" node too in the list. Each node uses two file descriptors, * one incoming and one outgoing, thus the multiplication by 2. */ - return server.cluster_enabled ? ((dictSize(server.cluster->nodes) - 1) * 2) : 0; + return server.cluster_enabled ? ((raxSize(server.cluster->nodes) - 1) * 2) : 0; } /* ----------------------------------------------------------------------------- @@ -1488,22 +1431,6 @@ int clusterNodeRemoveReplica(clusterNode *primary, clusterNode *replica) { return C_ERR; } -int clusterNodeRemoveFromMasters(clusterNode *master) { - for (int j = 0; j < server.cluster->nummasters; j++) { - if (server.cluster->masters[j] == master) { - if ((j+1) < server.cluster->nummasters) { - int remaining_masters = (server.cluster->nummasters - j) - 1; - memmove(server.cluster->masters+j,server.cluster->masters+(j+1), - (sizeof(*server.cluster->masters) * remaining_masters)); - } - server.cluster->nummasters--; - master->flags &= ~(CLUSTER_NODE_PRIMARY|CLUSTER_NODE_MIGRATE_TO); - return C_OK; - } - } - return C_ERR; -} - int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica) { int j; @@ -1518,19 +1445,6 @@ int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica) { return C_OK; } -int clusterNodeAddToMasters(clusterNode *master) { - /* If it's already a master, don't add it again. */ - for (int j = 0; j < server.cluster->nummasters; j++) - if (server.cluster->masters[j] == master) return C_ERR; - server.cluster->masters = zrealloc(server.cluster->masters, - sizeof(clusterNode*)*((server.cluster->nummasters) + 1)); - server.cluster->masters[server.cluster->nummasters] = master; - server.cluster->nummasters++; - qsort(server.cluster->masters, server.cluster->nummasters, sizeof(clusterNode *), clusterNodeNameComparator); - master->flags |= CLUSTER_NODE_PRIMARY; - return C_OK; -} - int clusterCountNonFailingReplicas(clusterNode *n) { int j, ok_replicas = 0; @@ -1541,7 +1455,6 @@ int clusterCountNonFailingReplicas(clusterNode *n) { /* Low level cleanup of the node structure. Only called by clusterDelNode(). */ void freeClusterNode(clusterNode *n) { - sds nodename; int j; /* If the node has associated replicas, we have to set @@ -1550,12 +1463,9 @@ void freeClusterNode(clusterNode *n) { /* Remove this node from the list of replicas of its primary. */ if (nodeIsReplica(n) && n->replicaof) clusterNodeRemoveReplica(n->replicaof, n); - else clusterNodeRemoveFromMasters(n); /* Unlink from the set of nodes. */ - nodename = sdsnewlen(n->name, CLUSTER_NAMELEN); - serverAssert(dictDelete(server.cluster->nodes, nodename) == DICT_OK); - sdsfree(nodename); + serverAssert(raxRemove(server.cluster->nodes, (unsigned char *)n->name, CLUSTER_NAMELEN, NULL) == RAX_OK); sdsfree(n->hostname); sdsfree(n->human_nodename); @@ -1571,8 +1481,9 @@ void freeClusterNode(clusterNode *n) { void clusterAddNode(clusterNode *node) { int retval; - retval = dictAdd(server.cluster->nodes, sdsnewlen(node->name, CLUSTER_NAMELEN), node); - serverAssert(retval == DICT_OK); + retval = raxInsert(server.cluster->nodes, (unsigned char *)node->name, CLUSTER_NAMELEN, node, NULL); + + serverAssert(retval == RAX_OK); } /* Remove a node from the cluster. The function performs the high level @@ -1588,8 +1499,7 @@ void clusterAddNode(clusterNode *node) { */ void clusterDelNode(clusterNode *delnode) { int j; - dictIterator *di; - dictEntry *de; + raxIterator ri; /* 1) Mark slots as unassigned. */ for (j = 0; j < CLUSTER_SLOTS; j++) { @@ -1599,14 +1509,15 @@ void clusterDelNode(clusterNode *delnode) { } /* 2) Remove failure reports. */ - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node == delnode) continue; clusterNodeDelFailureReport(node, delnode); } - dictReleaseIterator(di); + raxStop(&ri); /* 3) Free the node, unlinking it from the cluster. */ freeClusterNode(delnode); @@ -1615,43 +1526,50 @@ void clusterDelNode(clusterNode *delnode) { /* Node lookup by name */ clusterNode *clusterLookupNode(const char *name, int length) { if (verifyClusterNodeId(name, length) != C_OK) return NULL; - sds s = sdsnewlen(name, length); - dictEntry *de = dictFind(server.cluster->nodes, s); - sdsfree(s); - if (de == NULL) return NULL; - return dictGetVal(de); + void *n = NULL; + raxFind(server.cluster->nodes, (unsigned char *)name, length, &n); + return n; } /* Get all the nodes in my shard. - * We ensure that we maintain the master nodes only after they have been assigned with a persisted - * shard ID. Generate the list of nodes in a shard if they are persisted, else return - * NULL. The caller should release the list */ + * Generate the list of nodes in a shard. The caller should release the list */ list *clusterGetNodesInMyShard(clusterNode *node) { - clusterNode *master = clusterNodeGetPrimary(node); + clusterNode *primary = clusterNodeGetPrimary(node); list *l = listCreate(); - listAddNodeTail(l, master); - for (int i = 0; i < master->num_replicas; i++) { - listAddNodeTail(l, master->replicas[i]); + listAddNodeTail(l, primary); + for (int i = 0; i < primary->num_replicas; i++) { + listAddNodeTail(l, primary->replicas[i]); } return l; } +/* Get all the primaries in the cluster. + * Generate the list of primaries in a cluster which have slot coverage. The caller should release the list */ +list *clusterGetPrimaries(void) { + list *l = listCreate(); + raxIterator ri; + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; + if (clusterNodeIsVotingPrimary(node)) { + listAddNodeTail(l, node); + } + } + raxStop(&ri); + return l; +} + /* This is only used after the handshake. When we connect a given IP/PORT * as a result of CLUSTER MEET we don't have the node name yet, so we * pick a random one, and will fix it when we receive the PONG request using * this function. */ void clusterRenameNode(clusterNode *node, char *newname) { - int retval; - sds s = sdsnewlen(node->name, CLUSTER_NAMELEN); - serverLog(LL_DEBUG, "Renaming node %.40s (%s) into %.40s", node->name, node->human_nodename, newname); - retval = dictDelete(server.cluster->nodes, s); - sdsfree(s); - serverAssert(retval == DICT_OK); + serverAssert(raxRemove(server.cluster->nodes, (unsigned char *)node->name, CLUSTER_NAMELEN, NULL) == RAX_OK); memcpy(node->name, newname, CLUSTER_NAMELEN); clusterAddNode(node); - clusterNodeAddToMasters(node); } /* ----------------------------------------------------------------------------- @@ -1662,15 +1580,16 @@ void clusterRenameNode(clusterNode *node, char *newname) { * epoch if greater than any node configEpoch. */ uint64_t clusterGetMaxEpoch(void) { uint64_t max = 0; - dictIterator *di; - dictEntry *de; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxIterator ri; + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; + if (node->configEpoch > max) max = node->configEpoch; } - dictReleaseIterator(di); + raxStop(&ri); if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch; return max; } @@ -1940,18 +1859,22 @@ void clearNodeFailureIfNeeded(clusterNode *node) { * specified ip address and port number. This function is used in order to * avoid adding a new handshake node for the same address multiple times. */ int clusterHandshakeInProgress(char *ip, int port, int cport) { - dictIterator *di; - dictEntry *de; + raxIterator ri; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + clusterNode *node = NULL; + while (raxNext(&ri)) { + node = ri.data; if (!nodeInHandshake(node)) continue; - if (!strcasecmp(node->ip, ip) && getNodeDefaultClientPort(node) == port && node->cport == cport) break; + if (!strcasecmp(node->ip, ip) && getNodeDefaultClientPort(node) == port && node->cport == cport) { + raxStop(&ri); + return 1; // Return 1 if a matching node is found + } } - dictReleaseIterator(di); - return de != NULL; + raxStop(&ri); + return 0; } /* Start a handshake with the specified address if there is not one @@ -2177,7 +2100,6 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { node->tls_port = msg_tls_port; node->cport = ntohs(g->cport); clusterAddNode(node); - clusterNodeAddToMasters(node); } } @@ -2266,10 +2188,9 @@ void clusterSetNodeAsPrimary(clusterNode *n) { if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO; } n->flags &= ~CLUSTER_NODE_REPLICA; + n->flags |= CLUSTER_NODE_PRIMARY; n->replicaof = NULL; - clusterNodeAddToMasters(n); - /* Update config and state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE); } @@ -3114,10 +3035,7 @@ int clusterProcessPacket(clusterLink *link) { /* Primary node changed for this replica? */ if (primary && sender->replicaof != primary) { - if (sender->replicaof) - clusterNodeRemoveReplica(sender->replicaof, sender); - else - clusterNodeRemoveFromMasters(sender); + if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender); serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s", sender->name, sender->human_nodename, primary->name, primary->human_nodename, sender->shard_id); @@ -3543,17 +3461,17 @@ void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) { * some node->link to be invalidated, so it is safe to call this function * from event handlers that will do stuff with node links later. */ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) { - dictIterator *di; - dictEntry *de; + raxIterator ri; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; clusterSendMessage(node->link, msgblock); } - dictReleaseIterator(di); + raxStop(&ri); } /* Build the message header. hdr must point to a buffer at least @@ -3652,7 +3570,7 @@ void clusterSendPing(clusterLink *link, int type) { * nodes available minus two (ourself and the node we are sending the * message to). However practically there may be less valid nodes since * nodes in handshake state, disconnected, are not considered. */ - int freshnodes = dictSize(server.cluster->nodes) - 2; + int freshnodes = (int)raxSize(server.cluster->nodes) - 2; /* How many gossip sections we want to add? 1/10 of the number of nodes * and anyway at least 3. Why 1/10? @@ -3680,7 +3598,7 @@ void clusterSendPing(clusterLink *link, int type) { * Since we have non-voting replicas that lower the probability of an entry * to feature our node, we set the number of entries per packet as * 10% of the total nodes we have. */ - wanted = floor(dictSize(server.cluster->nodes) / 10); + wanted = floor(raxSize(server.cluster->nodes) / 10); if (wanted < 3) wanted = 3; if (wanted > freshnodes) wanted = freshnodes; @@ -3706,9 +3624,13 @@ void clusterSendPing(clusterLink *link, int type) { /* Populate the gossip fields */ int maxiterations = wanted * 3; + raxIterator ri; + raxStart(&ri, server.cluster->nodes); while (freshnodes > 0 && gossipcount < wanted && maxiterations--) { - dictEntry *de = dictGetRandomKey(server.cluster->nodes); - clusterNode *this = dictGetVal(de); + raxSeek(&ri, "^", NULL, 0); + raxRandomWalk(&ri, 0); + if (raxEOF(&ri)) break; + clusterNode *this = ri.data; /* Don't include this node: the whole packet header is about us * already, so we just gossip about other nodes. @@ -3739,15 +3661,14 @@ void clusterSendPing(clusterLink *link, int type) { freshnodes--; gossipcount++; } + raxStop(&ri); /* If there are PFAIL nodes, add them at the end. */ if (pfail_wanted) { - dictIterator *di; - dictEntry *de; - - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL && pfail_wanted > 0) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri) && pfail_wanted > 0) { + clusterNode *node = ri.data; if (node->flags & CLUSTER_NODE_HANDSHAKE) continue; if (node->flags & CLUSTER_NODE_NOADDR) continue; if (!(node->flags & CLUSTER_NODE_PFAIL)) continue; @@ -3758,7 +3679,7 @@ void clusterSendPing(clusterLink *link, int type) { * of PFAIL nodes. */ pfail_wanted--; } - dictReleaseIterator(di); + raxStop(&ri); } /* Compute the actual total length and send! */ @@ -3797,12 +3718,12 @@ void clusterSendPing(clusterLink *link, int type) { #define CLUSTER_BROADCAST_ALL 0 #define CLUSTER_BROADCAST_LOCAL_REPLICAS 1 void clusterBroadcastPong(int target) { - dictIterator *di; - dictEntry *de; + raxIterator ri; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (!node->link) continue; if (node == myself || nodeInHandshake(node)) continue; @@ -3813,7 +3734,7 @@ void clusterBroadcastPong(int target) { } clusterSendPing(node->link, CLUSTERMSG_TYPE_PONG); } - dictReleaseIterator(di); + raxStop(&ri); } /* Create a PUBLISH message block. @@ -4418,8 +4339,7 @@ void clusterHandleReplicaFailover(void) { void clusterHandleReplicaMigration(int max_replicas) { int j, ok_replicas = 0; clusterNode *my_primary = myself->replicaof, *target = NULL, *candidate = NULL; - dictIterator *di; - dictEntry *de; + raxIterator ri; /* Step 1: Don't migrate if the cluster state is not ok. */ if (server.cluster->state != CLUSTER_OK) return; @@ -4442,9 +4362,11 @@ void clusterHandleReplicaMigration(int max_replicas) { * replicas to migrate at the same time, but this is unlikely to * happen and relatively harmless when it does. */ candidate = myself; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; + int ok_replicas = 0, is_orphaned = 1; /* We want to migrate only if this primary is working, orphaned, and @@ -4479,7 +4401,7 @@ void clusterHandleReplicaMigration(int max_replicas) { } } } - dictReleaseIterator(di); + raxStop(&ri); /* Step 4: perform the migration if there is a target, and if I'm the * candidate, but only if the primary is continuously orphaned for a @@ -4641,8 +4563,7 @@ static void clusterNodeCronFreeLinkOnBufferLimitReached(clusterNode *node) { /* This is executed 10 times every second */ void clusterCron(void) { - dictIterator *di; - dictEntry *de; + raxIterator ri; int update_state = 0; int orphaned_primaries; /* How many primaries there are without ok replicas. */ int max_replicas; /* Max number of ok replicas for a single primary. */ @@ -4666,9 +4587,11 @@ void clusterCron(void) { /* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */ server.cluster->stats_pfail_nodes = 0; /* Run through some of the operations we want to do on each cluster node. */ - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; + /* We free the inbound or outboud link to the node if the link has an * oversized message send queue and immediately try reconnecting. */ clusterNodeCronFreeLinkOnBufferLimitReached(node); @@ -4677,7 +4600,7 @@ void clusterCron(void) { */ if (clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue; } - dictReleaseIterator(di); + raxStop(&ri); /* Ping some random node 1 time every 10 iterations, so that we usually ping * one random node every second. */ @@ -4686,10 +4609,12 @@ void clusterCron(void) { /* Check a few random nodes and ping the one with the oldest * pong_received time. */ + raxStart(&ri, server.cluster->nodes); for (j = 0; j < 5; j++) { - de = dictGetRandomKey(server.cluster->nodes); - clusterNode *this = dictGetVal(de); - + raxSeek(&ri, "^", NULL, 0); + raxRandomWalk(&ri, 0); + if (raxEOF(&ri)) break; + clusterNode *this = ri.data; /* Don't ping nodes disconnected or with a ping currently active. */ if (this->link == NULL || this->ping_sent != 0) continue; if (this->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; @@ -4698,6 +4623,7 @@ void clusterCron(void) { min_pong = this->pong_received; } } + raxStop(&ri); if (min_pong_node) { serverLog(LL_DEBUG, "Pinging node %.40s", min_pong_node->name); clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING); @@ -4713,9 +4639,10 @@ void clusterCron(void) { orphaned_primaries = 0; max_replicas = 0; this_replicas = 0; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; now = mstime(); /* Use an updated time at every iteration. */ if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR | CLUSTER_NODE_HANDSHAKE)) continue; @@ -4797,7 +4724,7 @@ void clusterCron(void) { } } } - dictReleaseIterator(di); + raxStop(&ri); /* If we are a replica node but the replication is still turned off, * enable it if we know the address of our primary and it appears to @@ -4897,16 +4824,17 @@ void bitmapClearBit(unsigned char *bitmap, int pos) { * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the * MIGRATE_TO flag the when a primary gets the first slot. */ int clusterPrimariesHaveReplicas(void) { - dictIterator di; - dictInitIterator(&di, server.cluster->nodes); - dictEntry *de; + raxIterator ri; int replicas = 0; - while ((de = dictNext(&di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (nodeIsReplica(node)) continue; replicas += node->num_replicas; } + raxStop(&ri); return replicas != 0; } @@ -5053,13 +4981,17 @@ void clusterUpdateState(void) { { server.cluster->size = 0; - for (int i = 0; i < server.cluster->nummasters; i++) { - if (server.cluster->masters[i] && server.cluster->masters[i]->numslots) { + raxIterator ri; + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; + if (clusterNodeIsVotingPrimary(node)) { server.cluster->size++; - if ((server.cluster->masters[i]->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0) - reachable_primaries++; + if ((node->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) == 0) reachable_primaries++; } } + raxStop(&ri); } /* If we are in a minority partition, change the cluster state @@ -5194,9 +5126,8 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) { serverAssert(myself->numslots == 0); if (clusterNodeIsPrimary(myself)) { - clusterNodeRemoveFromMasters(myself); + myself->flags &= ~(CLUSTER_NODE_PRIMARY | CLUSTER_NODE_MIGRATE_TO); myself->flags |= CLUSTER_NODE_REPLICA; - clusterCloseAllSlots(); } else { if (myself->replicaof) clusterNodeRemoveReplica(myself->replicaof, myself); } @@ -5400,15 +5331,15 @@ void clusterFreeNodesSlotsInfo(clusterNode *n) { * configuration file (nodes.conf) for a given node. */ sds clusterGenNodesDescription(client *c, int filter, int tls_primary) { sds ci = sdsempty(), ni; - dictIterator *di; - dictEntry *de; + raxIterator ri; /* Generate all nodes slots info firstly. */ clusterGenNodesSlotsInfo(filter); - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node->flags & filter) continue; ni = clusterGenNodeDescription(c, node, tls_primary); @@ -5419,7 +5350,7 @@ sds clusterGenNodesDescription(client *c, int filter, int tls_primary) { /* Release slots info. */ clusterFreeNodesSlotsInfo(node); } - dictReleaseIterator(di); + raxStop(&ri); return ci; } @@ -5463,16 +5394,16 @@ void addReplyClusterLinkDescription(client *c, clusterLink *link) { /* Add to the output buffer of the given client an array of cluster link descriptions, * with array entry being a description of a single current cluster link. */ void addReplyClusterLinksDescription(client *c) { - dictIterator *di; - dictEntry *de; + raxIterator ri; void *arraylen_ptr = NULL; int num_links = 0; arraylen_ptr = addReplyDeferredLen(c); - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node->link) { num_links++; addReplyClusterLinkDescription(c, node->link); @@ -5482,7 +5413,7 @@ void addReplyClusterLinksDescription(client *c) { addReplyClusterLinkDescription(c, node->inbound_link); } } - dictReleaseIterator(di); + raxStop(&ri); setDeferredArrayLen(c, arraylen_ptr, num_links); } @@ -5620,14 +5551,15 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) { } /* Add the shard reply of a single shard based off the given primary node. */ -void addShardReplyForClusterShards(client *c, clusterNode* primary) { +void addShardReplyForClusterShards(client *c, clusterNode *primary) { addReplyMapLen(c, 2); addReplyBulkCString(c, "slots"); if (primary->slot_info_pairs != NULL) { serverAssert((primary->slot_info_pairs_count % 2) == 0); addReplyArrayLen(c, primary->slot_info_pairs_count); - for (int i = 0; i < primary->slot_info_pairs_count; i++) addReplyLongLong(c, (unsigned long)primary->slot_info_pairs[i]); + for (int i = 0; i < primary->slot_info_pairs_count; i++) + addReplyLongLong(c, (unsigned long)primary->slot_info_pairs[i]); } else { /* If no slot info pair is provided, the node owns no slots */ addReplyArrayLen(c, 0); @@ -5651,12 +5583,17 @@ void addShardReplyForClusterShards(client *c, clusterNode* primary) { * pair owned by the shard, also the primary and set of replica(s) along with * information about each node. */ void clusterCommandShards(client *c) { - addReplyArrayLen(c, server.cluster->nummasters); /* This call will add slot_info_pairs to all nodes */ clusterGenNodesSlotsInfo(0); - for (int i = 0; i < server.cluster->nummasters; i++) { - addShardReplyForClusterShards(c, server.cluster->masters[i]); + list *primaries = clusterGetPrimaries(); + addReplyArrayLen(c, listLength(primaries)); + listIter li; + listRewind(primaries, &li); + for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) { + clusterNode *n = listNodeValue(ln); + addShardReplyForClusterShards(c, n); } + listRelease(primaries); } sds genClusterInfoString(void) { @@ -5685,12 +5622,12 @@ sds genClusterInfoString(void) { "cluster_slots_ok:%d\r\n" "cluster_slots_pfail:%d\r\n" "cluster_slots_fail:%d\r\n" - "cluster_known_nodes:%lu\r\n" + "cluster_known_nodes:%llu\r\n" "cluster_size:%d\r\n" "cluster_current_epoch:%llu\r\n" "cluster_my_epoch:%llu\r\n", statestr[server.cluster->state], slots_assigned, slots_ok, slots_pfail, slots_fail, - dictSize(server.cluster->nodes), server.cluster->size, + (unsigned long long)raxSize(server.cluster->nodes), server.cluster->size, (unsigned long long)server.cluster->currentEpoch, (unsigned long long)nodeEpoch(myself)); /* Show stats about messages sent and received. */ @@ -5772,7 +5709,7 @@ int clusterManualFailoverTimeLimit(void) { } int getClusterSize(void) { - return dictSize(server.cluster->nodes); + return (int)raxSize(server.cluster->nodes); } int getMyShardSlotCount(void) { @@ -5786,13 +5723,15 @@ int getMyShardSlotCount(void) { } char **getClusterNodesList(size_t *numnodes) { - size_t count = dictSize(server.cluster->nodes); + size_t count = raxSize(server.cluster->nodes); char **ids = zmalloc((count + 1) * CLUSTER_NAMELEN); - dictIterator *di = dictGetIterator(server.cluster->nodes); - dictEntry *de; + raxIterator ri; int j = 0; - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node->flags & (CLUSTER_NODE_NOADDR | CLUSTER_NODE_HANDSHAKE)) continue; ids[j] = zmalloc(CLUSTER_NAMELEN); memcpy(ids[j], node->name, CLUSTER_NAMELEN); @@ -5801,7 +5740,7 @@ char **getClusterNodesList(size_t *numnodes) { *numnodes = j; ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need * to also get the count argument. */ - dictReleaseIterator(di); + raxStop(&ri); return ids; } @@ -6391,7 +6330,7 @@ int clusterCommandSpecial(client *c) { if (epoch < 0) { addReplyErrorFormat(c, "Invalid config epoch specified: %lld", epoch); - } else if (dictSize(server.cluster->nodes) > 1) { + } else if (raxSize(server.cluster->nodes) > 1) { addReplyError(c, "The user can assign a config epoch only when the " "node does not know any other node."); } else if (myself->configEpoch != 0) { @@ -6539,19 +6478,20 @@ void clusterPromoteSelfToPrimary(void) { } int detectAndUpdateCachedNodeHealth(void) { - dictIterator di; - dictInitIterator(&di, server.cluster->nodes); - dictEntry *de; + raxIterator ri; + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); clusterNode *node; int overall_health_changed = 0; - while ((de = dictNext(&di)) != NULL) { - node = dictGetVal(de); + while (raxNext(&ri)) { + node = ri.data; int present_is_node_healthy = isNodeAvailable(node); if (present_is_node_healthy != node->is_node_healthy) { overall_health_changed = 1; node->is_node_healthy = present_is_node_healthy; } } + raxStop(&ri); return overall_health_changed; } diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 7118c064b6..a7cdbe13f8 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -320,10 +320,8 @@ struct clusterState { uint64_t currentEpoch; int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */ int size; /* Num of primary nodes with at least one slot */ - dict *nodes; /* Hash table of name -> clusterNode structures */ + rax *nodes; /* Table mapping of name -> clusterNode structures */ dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */ - clusterNode **masters; /* pointers to master nodes */ - int nummasters; /* Number of master nodes */ clusterNode *migrating_slots_to[CLUSTER_SLOTS]; clusterNode *importing_slots_from[CLUSTER_SLOTS]; clusterNode *slots[CLUSTER_SLOTS]; diff --git a/src/commands.def b/src/commands.def index c069393fe3..4559c0aefe 100644 --- a/src/commands.def +++ b/src/commands.def @@ -896,7 +896,9 @@ struct COMMAND_ARG CLUSTER_SETSLOT_Args[] = { #ifndef SKIP_CMD_TIPS_TABLE /* CLUSTER SHARDS tips */ -#define CLUSTER_SHARDS_Tips NULL +const char *CLUSTER_SHARDS_Tips[] = { +"nondeterministic_output", +}; #endif #ifndef SKIP_CMD_KEY_SPECS_TABLE @@ -1027,7 +1029,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)}, {MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args}, {MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args}, -{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,0,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)}, +{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)}, {MAKE_CMD("slaves","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args}, {MAKE_CMD("slot-stats","Return an array of slot usage statistics for slots assigned to the current node.","O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-4,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args}, {MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)}, diff --git a/src/commands/cluster-shards.json b/src/commands/cluster-shards.json index ec782183fc..e63c129ea9 100644 --- a/src/commands/cluster-shards.json +++ b/src/commands/cluster-shards.json @@ -11,6 +11,9 @@ "LOADING", "STALE" ], + "command_tips": [ + "NONDETERMINISTIC_OUTPUT" + ], "reply_schema": { "description": "A nested list of a map of hash ranges and shard nodes describing individual shards.", "type": "array", diff --git a/src/rax.h b/src/rax.h index c03e0303a0..0ec9031cea 100644 --- a/src/rax.h +++ b/src/rax.h @@ -95,6 +95,7 @@ */ #define RAX_NODE_MAX_SIZE ((1 << 29) - 1) +#define RAX_OK 1 typedef struct raxNode { uint32_t iskey : 1; /* Does this node contain a key? */ uint32_t isnull : 1; /* Associated value is NULL (don't store it). */ diff --git a/tests/cluster/tests/28-cluster-shards.tcl b/tests/cluster/tests/28-cluster-shards.tcl index 0b607566b0..d6534c816b 100644 --- a/tests/cluster/tests/28-cluster-shards.tcl +++ b/tests/cluster/tests/28-cluster-shards.tcl @@ -285,24 +285,3 @@ test "CLUSTER MYSHARDID reports same shard id after cluster restart" { assert_equal [dict get $node_ids $i] [R $i cluster myshardid] } } - -test "Deterministic order of CLUSTER SHARDS response" { - set node_ids {} - for {set j 0} {$j < 8} {incr j} { - set shards_cfg [R $j CLUSTER SHARDS] - set i 0 - foreach shard_cfg $shards_cfg { - set nodes [dict get $shard_cfg nodes] - foreach node $nodes { - if {$j == 0} { - # Save the node ids from the first node response - dict set node_ids $i [dict get $node id] - } else { - # Verify the order of the node ids is the same as the first node response - assert_equal [dict get $node id] [dict get $node_ids $i] - } - incr i - } - } - } -} diff --git a/tests/unit/cluster/cluster-shards.tcl b/tests/unit/cluster/cluster-shards.tcl new file mode 100644 index 0000000000..ae6ade812a --- /dev/null +++ b/tests/unit/cluster/cluster-shards.tcl @@ -0,0 +1,22 @@ +start_cluster 4 4 {tags {external:skip cluster}} { + test "Deterministic order of CLUSTER SHARDS response" { + set node_ids {} + for {set j 0} {$j < 8} {incr j} { + set shards_cfg [R $j CLUSTER SHARDS] + set i 0 + foreach shard_cfg $shards_cfg { + set nodes [dict get $shard_cfg nodes] + foreach node $nodes { + if {$j == 0} { + # Save the node ids from the first node response + dict set node_ids $i [dict get $node id] + } else { + # Verify the order of the node ids is the same as the first node response + assert_equal [dict get $node id] [dict get $node_ids $i] + } + incr i + } + } + } + } +}