Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
addReplyError(c, "-CLUSTERDOWN Hash slot not served");
} else if (error_code == CLUSTER_REDIR_MOVED || error_code == CLUSTER_REDIR_ASK) {
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
int port = clusterNodeClientPort(n, shouldReturnTlsInfo());
int port = clusterNodeClientPort(n, shouldReturnTlsInfo(), c);
addReplyErrorSds(c,
sdscatprintf(sdsempty(), "-%s %d %s:%d", (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot, clusterNodePreferredEndpoint(n, c), port));
Expand Down Expand Up @@ -1407,7 +1407,7 @@ void addNodeToNodeReply(client *c, clusterNode *node) {
}

/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
addReplyLongLong(c, clusterNodeClientPort(node, shouldReturnTlsInfo()));
addReplyLongLong(c, clusterNodeClientPort(node, shouldReturnTlsInfo(), c));
addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN);

/* Add the additional endpoint information, this is all the known networking information
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ clusterNode *clusterNodeGetReplica(clusterNode *node, int replica_idx);
clusterNode *getMigratingSlotDest(int slot);
clusterNode *getImportingSlotSource(int slot);
clusterNode *getNodeBySlot(int slot);
int clusterNodeClientPort(clusterNode *n, int use_tls);
int clusterNodeClientPort(clusterNode *n, int use_tls, client *c);
char *clusterNodeHostname(clusterNode *node);
const char *clusterNodePreferredEndpoint(clusterNode *n, client *c);
clusterNode *clusterLookupNode(const char *name, int length);
Expand Down
134 changes: 124 additions & 10 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ int auxTcpPortPresent(clusterNode *n);
int auxTlsPortSetter(clusterNode *n, void *value, size_t length);
sds auxTlsPortGetter(clusterNode *n, sds s);
int auxTlsPortPresent(clusterNode *n);
int auxAnnounceClientTcpPortSetter(clusterNode *n, void *value, size_t length);
sds auxAnnounceClientTcpPortGetter(clusterNode *n, sds s);
int auxAnnounceClientTcpPortPresent(clusterNode *n);
int auxAnnounceClientTlsPortSetter(clusterNode *n, void *value, size_t length);
sds auxAnnounceClientTlsPortGetter(clusterNode *n, sds s);
int auxAnnounceClientTlsPortPresent(clusterNode *n);
int auxAnnounceClientTlsPortPresent(clusterNode *n);
static void clusterBuildMessageHdrLight(clusterMsgLight *hdr, int type, size_t msglen);
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
void freeClusterLink(clusterLink *link);
Expand All @@ -143,8 +150,20 @@ static inline int getNodeDefaultReplicationPort(clusterNode *n) {
return server.tls_replication ? n->tls_port : n->tcp_port;
}

int clusterNodeClientPort(clusterNode *n, int use_tls) {
return use_tls ? n->tls_port : n->tcp_port;
int clusterNodeClientPort(clusterNode *n, int use_tls, client *c) {
int port = n->tcp_port;
int tlsport = n->tls_port;

if (c != NULL) {
if (n->announce_client_tcp_port) {
port = n->announce_client_tcp_port;
}
if (n->announce_client_tls_port) {
tlsport = n->announce_client_tls_port;
}
}

return use_tls ? tlsport : port;
}

static inline int defaultClientPort(void) {
Expand Down Expand Up @@ -295,6 +314,8 @@ typedef enum {
af_tls_port,
af_announce_client_ipv4,
af_announce_client_ipv6,
af_announce_client_tcp_port,
af_announce_client_tls_port,
af_count, /* must be the last field */
} auxFieldIndex;

Expand All @@ -309,6 +330,8 @@ auxFieldHandler auxFieldHandlers[] = {
{"tls-port", auxTlsPortSetter, auxTlsPortGetter, auxTlsPortPresent},
{"client-ipv4", auxAnnounceClientIpV4Setter, auxAnnounceClientIpV4Getter, auxAnnounceClientIpV4Present},
{"client-ipv6", auxAnnounceClientIpV6Setter, auxAnnounceClientIpV6Getter, auxAnnounceClientIpV6Present},
{"client-tcp-port", auxAnnounceClientTcpPortSetter, auxAnnounceClientTcpPortGetter, auxAnnounceClientTcpPortPresent},
{"client-tls-port", auxAnnounceClientTlsPortSetter, auxAnnounceClientTlsPortGetter, auxAnnounceClientTlsPortPresent},
};

int auxShardIdSetter(clusterNode *n, void *value, size_t length) {
Expand Down Expand Up @@ -448,6 +471,44 @@ int auxTlsPortPresent(clusterNode *n) {
return n->tls_port >= 0 && n->tls_port < 65536;
}

int auxAnnounceClientTcpPortSetter(clusterNode *n, void *value, size_t length) {
if (length > 5 || length < 1) {
return C_ERR;
}
char buf[length + 1];
memcpy(buf, (char *)value, length);
buf[length] = '\0';
n->announce_client_tcp_port = atoi(buf);
return (n->announce_client_tcp_port < 0 || n->announce_client_tcp_port >= 65536) ? C_ERR : C_OK;
}

sds auxAnnounceClientTcpPortGetter(clusterNode *n, sds s) {
return sdscatfmt(s, "%i", n->announce_client_tcp_port);
}

int auxAnnounceClientTcpPortPresent(clusterNode *n) {
return n->announce_client_tcp_port > 0 && n->announce_client_tcp_port < 65536;
}

int auxAnnounceClientTlsPortSetter(clusterNode *n, void *value, size_t length) {
if (length > 5 || length < 1) {
return C_ERR;
}
char buf[length + 1];
memcpy(buf, (char *)value, length);
buf[length] = '\0';
n->announce_client_tls_port = atoi(buf);
return (n->announce_client_tls_port < 0 || n->announce_client_tls_port >= 65536) ? C_ERR : C_OK;
}

sds auxAnnounceClientTlsPortGetter(clusterNode *n, sds s) {
return sdscatfmt(s, "%i", n->announce_client_tls_port);
}

int auxAnnounceClientTlsPortPresent(clusterNode *n) {
return n->announce_client_tls_port > 0 && n->announce_client_tls_port < 65536;
}

/* clusterLink send queue blocks */
typedef struct {
size_t totlen; /* Total length of this block including the message */
Expand Down Expand Up @@ -1000,7 +1061,7 @@ int clusterLockConfig(char *filename) {
}

/* Derives our ports to be announced in the cluster bus. */
void deriveAnnouncedPorts(int *announced_tcp_port, int *announced_tls_port, int *announced_cport) {
void deriveAnnouncedPorts(int *announced_tcp_port, int *announced_tls_port, int *announced_cport, int *announced_client_tcp_port, int *announced_client_tls_port) {
/* Config overriding announced ports. */
*announced_tcp_port = server.cluster_announce_port ? server.cluster_announce_port : server.port;
*announced_tls_port = server.cluster_announce_tls_port ? server.cluster_announce_tls_port : server.tls_port;
Expand All @@ -1012,6 +1073,9 @@ void deriveAnnouncedPorts(int *announced_tcp_port, int *announced_tls_port, int
} else {
*announced_cport = defaultClientPort() + CLUSTER_PORT_INCR;
}

*announced_client_tcp_port = server.cluster_announce_client_port;
*announced_client_tls_port = server.cluster_announce_client_tls_port;
}

/* Some flags (currently just the NOFAILOVER flag) may need to be updated
Expand All @@ -1038,7 +1102,7 @@ void clusterUpdateMyselfFlags(void) {
* The option can be set at runtime via CONFIG SET. */
void clusterUpdateMyselfAnnouncedPorts(void) {
if (!myself) return;
deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport);
deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport, &myself->announce_client_tcp_port, &myself->announce_client_tls_port);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

Expand Down Expand Up @@ -1106,6 +1170,24 @@ static void updateAnnouncedClientIpV6(clusterNode *node, char *value) {
updateSdsExtensionField(&node->announce_client_ipv6, value);
}

static void updateAnnouncedClientPort(clusterNode *node, int value) {
if (value == node->announce_client_tcp_port) {
return;
}

node->announce_client_tcp_port = value;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

static void updateAnnouncedClientTlsPort(clusterNode *node, int value) {
if (value == node->announce_client_tls_port) {
return;
}

node->announce_client_tls_port = value;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

static void updateShardId(clusterNode *node, const char *shard_id) {
/* Ensure replica shard IDs match their primary's to maintain cluster consistency.
*
Expand Down Expand Up @@ -1251,7 +1333,7 @@ void clusterInit(void) {

/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport);
deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport, &myself->announce_client_tcp_port, &myself->announce_client_tls_port);

server.cluster->mf_end = 0;
server.cluster->mf_replica = NULL;
Expand Down Expand Up @@ -1676,6 +1758,8 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->tcp_port = 0;
node->cport = 0;
node->tls_port = 0;
node->announce_client_tcp_port = 0;
node->announce_client_tls_port = 0;
node->fail_reports = raxNew();
node->orphaned_time = 0;
node->repl_offset = 0;
Expand Down Expand Up @@ -3023,6 +3107,20 @@ writeSdsPingExtIfNonempty(uint32_t *totlen_ptr, clusterMsgPingExt **cursor_ptr,
return 1;
}

static uint32_t
writePortPingExtIfNonzero(uint32_t *totlen_ptr, clusterMsgPingExt **cursor_ptr, clusterMsgPingtypes type, uint16_t value) {
if (value == 0) return 0;
size_t size = getAlignedPingExtSize(sizeof(clusterMsgPingExtClientPort));
if (*cursor_ptr != NULL) {
void *ext = preparePingExt(*cursor_ptr, type, size);
value = htons(value);
memcpy(ext, &value, sizeof(value));
*cursor_ptr = getNextPingExt(*cursor_ptr);
}
*totlen_ptr += size;
return 1;
}

/* 1. If a NULL hdr is provided, compute the extension size;
* 2. If a non-NULL hdr is provided, write the ping
* extensions at the start of the cursor. This function
Expand All @@ -3046,6 +3144,10 @@ static uint32_t writePingExtensions(clusterMsg *hdr, int gossipcount) {
writeSdsPingExtIfNonempty(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_CLIENT_IPV4, myself->announce_client_ipv4);
extensions +=
writeSdsPingExtIfNonempty(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_CLIENT_IPV6, myself->announce_client_ipv6);
extensions +=
writePortPingExtIfNonzero(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_CLIENT_PORT, myself->announce_client_tcp_port);
extensions +=
writePortPingExtIfNonzero(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_CLIENT_TLS_PORT, myself->announce_client_tls_port);

/* Gossip forgotten nodes */
if (dictSize(server.cluster->nodes_black_list) > 0) {
Expand Down Expand Up @@ -3097,6 +3199,8 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
char *ext_humannodename = NULL;
char *ext_clientipv4 = NULL;
char *ext_clientipv6 = NULL;
int ext_clientport = 0;
int ext_clienttlsport = 0;
char *ext_shardid = NULL;
uint16_t extensions = ntohs(hdr->extensions);
/* Loop through all the extensions and process them */
Expand All @@ -3118,6 +3222,14 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
clusterMsgPingExtClientIpV6 *clientipv6_ext =
(clusterMsgPingExtClientIpV6 *)&(ext->ext[0].announce_client_ipv6);
ext_clientipv6 = clientipv6_ext->announce_client_ipv6;
} else if (type == CLUSTERMSG_EXT_TYPE_CLIENT_PORT) {
clusterMsgPingExtClientPort *clientport_ext =
(clusterMsgPingExtClientPort *)&(ext->ext[0].announce_client_port);
ext_clientport = ntohs(clientport_ext->announce_client_port);
} else if (type == CLUSTERMSG_EXT_TYPE_CLIENT_TLS_PORT) {
clusterMsgPingExtClientTlsPort *clienttlsport_ext =
(clusterMsgPingExtClientTlsPort *)&(ext->ext[0].announce_client_tls_port);
ext_clienttlsport = ntohs(clienttlsport_ext->announce_client_tls_port);
} else if (type == CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE) {
clusterMsgPingExtForgottenNode *forgotten_node_ext = &(ext->ext[0].forgotten_node);
clusterNode *n = clusterLookupNode(forgotten_node_ext->name, CLUSTER_NAMELEN);
Expand Down Expand Up @@ -3152,6 +3264,8 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
updateAnnouncedHumanNodename(sender, ext_humannodename);
updateAnnouncedClientIpV4(sender, ext_clientipv4);
updateAnnouncedClientIpV6(sender, ext_clientipv6);
updateAnnouncedClientPort(sender, ext_clientport);
updateAnnouncedClientTlsPort(sender, ext_clienttlsport);
/* If the node did not send us a shard-id extension, it means the sender
* does not support it (old version), node->shard_id is randomly generated.
* A cluster-wide consensus for the node's shard_id is not necessary.
Expand Down Expand Up @@ -4244,8 +4358,8 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
}

/* Handle cluster-announce-[tls-|bus-]port. */
int announced_tcp_port, announced_tls_port, announced_cport;
deriveAnnouncedPorts(&announced_tcp_port, &announced_tls_port, &announced_cport);
int announced_tcp_port, announced_tls_port, announced_cport, announced_client_tcp_port, announced_client_tls_port;
deriveAnnouncedPorts(&announced_tcp_port, &announced_tls_port, &announced_cport, &announced_client_tcp_port, &announced_client_tls_port);

memcpy(hdr->myslots, primary->slots, sizeof(hdr->myslots));
memset(hdr->replicaof, 0, CLUSTER_NAMELEN);
Expand Down Expand Up @@ -6256,7 +6370,7 @@ sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_cou
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) {
int j, start;
sds ci;
int port = clusterNodeClientPort(node, tls_primary);
int port = clusterNodeClientPort(node, tls_primary, c);
char *ip = clusterNodeIp(node, c);

/* Node coordinates */
Expand Down Expand Up @@ -6575,13 +6689,13 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {

if (node->tcp_port) {
addReplyBulkCString(c, "port");
addReplyLongLong(c, node->tcp_port);
addReplyLongLong(c, clusterNodeClientPort(node, false, c));
reply_count++;
}

if (node->tls_port) {
addReplyBulkCString(c, "tls-port");
addReplyLongLong(c, node->tls_port);
addReplyLongLong(c, clusterNodeClientPort(node, true, c));
reply_count++;
}

Expand Down
15 changes: 15 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLUSTER_LEGACY_H
#define CLUSTER_LEGACY_H

#include <stdint.h>
#define CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */

/* The following defines are amount of time, sometimes expressed as
Expand Down Expand Up @@ -160,6 +161,8 @@ typedef enum {
CLUSTERMSG_EXT_TYPE_SHARDID,
CLUSTERMSG_EXT_TYPE_CLIENT_IPV4,
CLUSTERMSG_EXT_TYPE_CLIENT_IPV6,
CLUSTERMSG_EXT_TYPE_CLIENT_PORT,
CLUSTERMSG_EXT_TYPE_CLIENT_TLS_PORT,
} clusterMsgPingtypes;

/* Helper function for making sure extensions are eight byte aligned. */
Expand Down Expand Up @@ -192,6 +195,14 @@ typedef struct {
char announce_client_ipv6[1]; /* Announced client IPv6, ends with \0. */
} clusterMsgPingExtClientIpV6;

typedef struct {
uint16_t announce_client_port; /* Announced client port. */
} clusterMsgPingExtClientPort;

typedef struct {
uint16_t announce_client_tls_port; /* Announced client TLS port. */
} clusterMsgPingExtClientTlsPort;

typedef struct {
uint32_t length; /* Total length of this extension message (including this header) */
uint16_t type; /* Type of this extension message (see clusterMsgPingtypes) */
Expand All @@ -203,6 +214,8 @@ typedef struct {
clusterMsgPingExtShardId shard_id;
clusterMsgPingExtClientIpV4 announce_client_ipv4;
clusterMsgPingExtClientIpV6 announce_client_ipv6;
clusterMsgPingExtClientPort announce_client_port;
clusterMsgPingExtClientTlsPort announce_client_tls_port;
} ext[]; /* Actual extension information, formatted so that the data is 8
* byte aligned, regardless of its content. */
} clusterMsgPingExt;
Expand Down Expand Up @@ -363,6 +376,8 @@ struct _clusterNode {
int tcp_port; /* Latest known clients TCP port. */
int tls_port; /* Latest known clients TLS port */
int cport; /* Latest known cluster port of this node. */
int announce_client_tcp_port; /* Port for clients only. */
int announce_client_tls_port; /* TLS port for clients only. */
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
rax *fail_reports; /* Radix tree for failure reports with sorted order by timestamp */
Expand Down
2 changes: 2 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3298,6 +3298,8 @@ standardConfig static_configs[] = {
createIntConfig("cluster-announce-bus-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_bus_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Default: Use +10000 offset. */
createIntConfig("cluster-announce-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Use server.port */
createIntConfig("cluster-announce-tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_tls_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Use server.tls_port */
createIntConfig("cluster-announce-client-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_client_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort),
createIntConfig("cluster-announce-client-tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_client_tls_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort),
createIntConfig("repl-timeout", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_timeout, 60, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-ping-replica-period", "repl-ping-slave-period", MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_ping_replica_period, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("list-compress-depth", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL),
Expand Down
2 changes: 1 addition & 1 deletion src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -9234,7 +9234,7 @@ int moduleGetClusterNodeInfoForClient(ValkeyModuleCtx *ctx,
else
memset(primary_id, 0, VALKEYMODULE_NODE_ID_LEN);
}
if (port) *port = getNodeDefaultClientPort(node);
if (port) *port = clusterNodeClientPort(node, server.tls_cluster, c);

/* As usually we have to remap flags for modules, in order to ensure
* we can provide binary compatibility. */
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2142,6 +2142,8 @@ struct valkeyServer {
int cluster_announce_port; /* base port to announce on cluster bus. */
int cluster_announce_tls_port; /* TLS port to announce on cluster bus. */
int cluster_announce_bus_port; /* bus port to announce on cluster bus. */
int cluster_announce_client_port; /* base port for clients to announce on cluster bus. */
int cluster_announce_client_tls_port; /* TLS port for clients to announce on cluster bus. */
int cluster_module_flags; /* Set of flags that modules are able
to set in order to suppress certain
native Redis Cluster features. Check the
Expand Down
Loading
Loading