Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
addReplyError(c, "-CLUSTERDOWN Hash slot not served");
} else if (error_code == CLUSTER_REDIR_MOVED || error_code == CLUSTER_REDIR_ASK) {
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
int port = clusterNodeClientPort(n, shouldReturnTlsInfo());
int port = clusterNodeClientPort(n, shouldReturnTlsInfo(), c);
addReplyErrorSds(c,
sdscatprintf(sdsempty(), "-%s %d %s:%d", (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot, clusterNodePreferredEndpoint(n, c), port));
Expand Down Expand Up @@ -1407,7 +1407,7 @@ void addNodeToNodeReply(client *c, clusterNode *node) {
}

/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
addReplyLongLong(c, clusterNodeClientPort(node, shouldReturnTlsInfo()));
addReplyLongLong(c, clusterNodeClientPort(node, shouldReturnTlsInfo(), c));
addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN);

/* Add the additional endpoint information, this is all the known networking information
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ clusterNode *clusterNodeGetReplica(clusterNode *node, int replica_idx);
clusterNode *getMigratingSlotDest(int slot);
clusterNode *getImportingSlotSource(int slot);
clusterNode *getNodeBySlot(int slot);
int clusterNodeClientPort(clusterNode *n, int use_tls);
int clusterNodeClientPort(clusterNode *n, int use_tls, client *c);
char *clusterNodeHostname(clusterNode *node);
const char *clusterNodePreferredEndpoint(clusterNode *n, client *c);
clusterNode *clusterLookupNode(const char *name, int length);
Expand Down
137 changes: 127 additions & 10 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ int auxTcpPortPresent(clusterNode *n);
int auxTlsPortSetter(clusterNode *n, void *value, size_t length);
sds auxTlsPortGetter(clusterNode *n, sds s);
int auxTlsPortPresent(clusterNode *n);
int auxAnnounceClientTcpPortSetter(clusterNode *n, void *value, size_t length);
sds auxAnnounceClientTcpPortGetter(clusterNode *n, sds s);
int auxAnnounceClientTcpPortPresent(clusterNode *n);
int auxAnnounceClientTlsPortSetter(clusterNode *n, void *value, size_t length);
sds auxAnnounceClientTlsPortGetter(clusterNode *n, sds s);
int auxAnnounceClientTlsPortPresent(clusterNode *n);
int auxAnnounceClientTlsPortPresent(clusterNode *n);
static void clusterBuildMessageHdrLight(clusterMsgLight *hdr, int type, size_t msglen);
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
void freeClusterLink(clusterLink *link);
Expand All @@ -143,8 +150,20 @@ static inline int getNodeDefaultReplicationPort(clusterNode *n) {
return server.tls_replication ? n->tls_port : n->tcp_port;
}

int clusterNodeClientPort(clusterNode *n, int use_tls) {
return use_tls ? n->tls_port : n->tcp_port;
int clusterNodeClientPort(clusterNode *n, int use_tls, client *c) {
int port = n->tcp_port;
int tlsport = n->tls_port;

if (c != NULL) {
if (n->announce_client_tcp_port) {
port = n->announce_client_tcp_port;
}
if (n->announce_client_tls_port) {
tlsport = n->announce_client_tls_port;
}
}

return use_tls ? tlsport : port;
}

static inline int defaultClientPort(void) {
Expand Down Expand Up @@ -295,6 +314,8 @@ typedef enum {
af_tls_port,
af_announce_client_ipv4,
af_announce_client_ipv6,
af_announce_client_tcp_port,
af_announce_client_tls_port,
af_count, /* must be the last field */
} auxFieldIndex;

Expand All @@ -309,6 +330,8 @@ auxFieldHandler auxFieldHandlers[] = {
{"tls-port", auxTlsPortSetter, auxTlsPortGetter, auxTlsPortPresent},
{"client-ipv4", auxAnnounceClientIpV4Setter, auxAnnounceClientIpV4Getter, auxAnnounceClientIpV4Present},
{"client-ipv6", auxAnnounceClientIpV6Setter, auxAnnounceClientIpV6Getter, auxAnnounceClientIpV6Present},
{"client-tcp-port", auxAnnounceClientTcpPortSetter, auxAnnounceClientTcpPortGetter, auxAnnounceClientTcpPortPresent},
{"client-tls-port", auxAnnounceClientTlsPortSetter, auxAnnounceClientTlsPortGetter, auxAnnounceClientTlsPortPresent},
};

int auxShardIdSetter(clusterNode *n, void *value, size_t length) {
Expand Down Expand Up @@ -448,6 +471,44 @@ int auxTlsPortPresent(clusterNode *n) {
return n->tls_port >= 0 && n->tls_port < 65536;
}

int auxAnnounceClientTcpPortSetter(clusterNode *n, void *value, size_t length) {
if (length > 5 || length < 1) {
return C_ERR;
}
char buf[length + 1];
memcpy(buf, (char *)value, length);
buf[length] = '\0';
n->announce_client_tcp_port = atoi(buf);
return (n->announce_client_tcp_port < 0 || n->announce_client_tcp_port >= 65536) ? C_ERR : C_OK;
}

sds auxAnnounceClientTcpPortGetter(clusterNode *n, sds s) {
return sdscatfmt(s, "%i", n->announce_client_tcp_port);
}

int auxAnnounceClientTcpPortPresent(clusterNode *n) {
return n->announce_client_tcp_port > 0 && n->announce_client_tcp_port < 65536;
}

int auxAnnounceClientTlsPortSetter(clusterNode *n, void *value, size_t length) {
if (length > 5 || length < 1) {
return C_ERR;
}
char buf[length + 1];
memcpy(buf, (char *)value, length);
buf[length] = '\0';
n->announce_client_tls_port = atoi(buf);
return (n->announce_client_tls_port < 0 || n->announce_client_tls_port >= 65536) ? C_ERR : C_OK;
}

sds auxAnnounceClientTlsPortGetter(clusterNode *n, sds s) {
return sdscatfmt(s, "%i", n->announce_client_tls_port);
}

int auxAnnounceClientTlsPortPresent(clusterNode *n) {
return n->announce_client_tls_port > 0 && n->announce_client_tls_port < 65536;
}

/* clusterLink send queue blocks */
typedef struct {
size_t totlen; /* Total length of this block including the message */
Expand Down Expand Up @@ -1000,7 +1061,8 @@ int clusterLockConfig(char *filename) {
}

/* Derives our ports to be announced in the cluster bus. */
void deriveAnnouncedPorts(int *announced_tcp_port, int *announced_tls_port, int *announced_cport) {
void deriveAnnouncedPorts(int *announced_tcp_port, int *announced_tls_port, int *announced_cport,
int *announced_client_tcp_port, int *announced_client_tls_port) {
/* Config overriding announced ports. */
*announced_tcp_port = server.cluster_announce_port ? server.cluster_announce_port : server.port;
*announced_tls_port = server.cluster_announce_tls_port ? server.cluster_announce_tls_port : server.tls_port;
Expand All @@ -1012,6 +1074,9 @@ void deriveAnnouncedPorts(int *announced_tcp_port, int *announced_tls_port, int
} else {
*announced_cport = defaultClientPort() + CLUSTER_PORT_INCR;
}

*announced_client_tcp_port = server.cluster_announce_client_port;
*announced_client_tls_port = server.cluster_announce_client_tls_port;
}

/* Some flags (currently just the NOFAILOVER flag) may need to be updated
Expand All @@ -1038,7 +1103,8 @@ void clusterUpdateMyselfFlags(void) {
* The option can be set at runtime via CONFIG SET. */
void clusterUpdateMyselfAnnouncedPorts(void) {
if (!myself) return;
deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport);
deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport,
&myself->announce_client_tcp_port, &myself->announce_client_tls_port);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

Expand Down Expand Up @@ -1106,6 +1172,24 @@ static void updateAnnouncedClientIpV6(clusterNode *node, char *value) {
updateSdsExtensionField(&node->announce_client_ipv6, value);
}

static void updateAnnouncedClientPort(clusterNode *node, int value) {
if (value == node->announce_client_tcp_port) {
return;
}

node->announce_client_tcp_port = value;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

static void updateAnnouncedClientTlsPort(clusterNode *node, int value) {
if (value == node->announce_client_tls_port) {
return;
}

node->announce_client_tls_port = value;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

static void updateShardId(clusterNode *node, const char *shard_id) {
/* Ensure replica shard IDs match their primary's to maintain cluster consistency.
*
Expand Down Expand Up @@ -1251,7 +1335,8 @@ void clusterInit(void) {

/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport);
deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport,
&myself->announce_client_tcp_port, &myself->announce_client_tls_port);

server.cluster->mf_end = 0;
server.cluster->mf_replica = NULL;
Expand Down Expand Up @@ -1676,6 +1761,8 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->tcp_port = 0;
node->cport = 0;
node->tls_port = 0;
node->announce_client_tcp_port = 0;
node->announce_client_tls_port = 0;
node->fail_reports = raxNew();
node->orphaned_time = 0;
node->repl_offset = 0;
Expand Down Expand Up @@ -3023,6 +3110,20 @@ writeSdsPingExtIfNonempty(uint32_t *totlen_ptr, clusterMsgPingExt **cursor_ptr,
return 1;
}

static uint32_t
writePortPingExtIfNonzero(uint32_t *totlen_ptr, clusterMsgPingExt **cursor_ptr, clusterMsgPingtypes type, uint16_t value) {
if (value == 0) return 0;
size_t size = getAlignedPingExtSize(sizeof(clusterMsgPingExtClientPort));
if (*cursor_ptr != NULL) {
void *ext = preparePingExt(*cursor_ptr, type, size);
value = htons(value);
memcpy(ext, &value, sizeof(value));
*cursor_ptr = getNextPingExt(*cursor_ptr);
}
*totlen_ptr += size;
return 1;
}

/* 1. If a NULL hdr is provided, compute the extension size;
* 2. If a non-NULL hdr is provided, write the ping
* extensions at the start of the cursor. This function
Expand All @@ -3046,6 +3147,10 @@ static uint32_t writePingExtensions(clusterMsg *hdr, int gossipcount) {
writeSdsPingExtIfNonempty(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_CLIENT_IPV4, myself->announce_client_ipv4);
extensions +=
writeSdsPingExtIfNonempty(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_CLIENT_IPV6, myself->announce_client_ipv6);
extensions +=
writePortPingExtIfNonzero(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_CLIENT_PORT, myself->announce_client_tcp_port);
extensions +=
writePortPingExtIfNonzero(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_CLIENT_TLS_PORT, myself->announce_client_tls_port);

/* Gossip forgotten nodes */
if (dictSize(server.cluster->nodes_black_list) > 0) {
Expand Down Expand Up @@ -3097,6 +3202,8 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
char *ext_humannodename = NULL;
char *ext_clientipv4 = NULL;
char *ext_clientipv6 = NULL;
int ext_clientport = 0;
int ext_clienttlsport = 0;
char *ext_shardid = NULL;
uint16_t extensions = ntohs(hdr->extensions);
/* Loop through all the extensions and process them */
Expand All @@ -3118,6 +3225,14 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
clusterMsgPingExtClientIpV6 *clientipv6_ext =
(clusterMsgPingExtClientIpV6 *)&(ext->ext[0].announce_client_ipv6);
ext_clientipv6 = clientipv6_ext->announce_client_ipv6;
} else if (type == CLUSTERMSG_EXT_TYPE_CLIENT_PORT) {
clusterMsgPingExtClientPort *clientport_ext =
(clusterMsgPingExtClientPort *)&(ext->ext[0].announce_client_port);
ext_clientport = ntohs(clientport_ext->announce_client_port);
} else if (type == CLUSTERMSG_EXT_TYPE_CLIENT_TLS_PORT) {
clusterMsgPingExtClientTlsPort *clienttlsport_ext =
(clusterMsgPingExtClientTlsPort *)&(ext->ext[0].announce_client_tls_port);
ext_clienttlsport = ntohs(clienttlsport_ext->announce_client_tls_port);
} else if (type == CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE) {
clusterMsgPingExtForgottenNode *forgotten_node_ext = &(ext->ext[0].forgotten_node);
clusterNode *n = clusterLookupNode(forgotten_node_ext->name, CLUSTER_NAMELEN);
Expand Down Expand Up @@ -3152,6 +3267,8 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
updateAnnouncedHumanNodename(sender, ext_humannodename);
updateAnnouncedClientIpV4(sender, ext_clientipv4);
updateAnnouncedClientIpV6(sender, ext_clientipv6);
updateAnnouncedClientPort(sender, ext_clientport);
updateAnnouncedClientTlsPort(sender, ext_clienttlsport);
/* If the node did not send us a shard-id extension, it means the sender
* does not support it (old version), node->shard_id is randomly generated.
* A cluster-wide consensus for the node's shard_id is not necessary.
Expand Down Expand Up @@ -4244,8 +4361,8 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
}

/* Handle cluster-announce-[tls-|bus-]port. */
int announced_tcp_port, announced_tls_port, announced_cport;
deriveAnnouncedPorts(&announced_tcp_port, &announced_tls_port, &announced_cport);
int announced_tcp_port, announced_tls_port, announced_cport, announced_client_tcp_port, announced_client_tls_port;
deriveAnnouncedPorts(&announced_tcp_port, &announced_tls_port, &announced_cport, &announced_client_tcp_port, &announced_client_tls_port);

memcpy(hdr->myslots, primary->slots, sizeof(hdr->myslots));
memset(hdr->replicaof, 0, CLUSTER_NAMELEN);
Expand Down Expand Up @@ -6256,7 +6373,7 @@ sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_cou
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) {
int j, start;
sds ci;
int port = clusterNodeClientPort(node, tls_primary);
int port = clusterNodeClientPort(node, tls_primary, c);
char *ip = clusterNodeIp(node, c);

/* Node coordinates */
Expand Down Expand Up @@ -6575,13 +6692,13 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {

if (node->tcp_port) {
addReplyBulkCString(c, "port");
addReplyLongLong(c, node->tcp_port);
addReplyLongLong(c, clusterNodeClientPort(node, false, c));
reply_count++;
}

if (node->tls_port) {
addReplyBulkCString(c, "tls-port");
addReplyLongLong(c, node->tls_port);
addReplyLongLong(c, clusterNodeClientPort(node, true, c));
reply_count++;
}

Expand Down
15 changes: 15 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLUSTER_LEGACY_H
#define CLUSTER_LEGACY_H

#include <stdint.h>
#define CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */

/* The following defines are amount of time, sometimes expressed as
Expand Down Expand Up @@ -160,6 +161,8 @@ typedef enum {
CLUSTERMSG_EXT_TYPE_SHARDID,
CLUSTERMSG_EXT_TYPE_CLIENT_IPV4,
CLUSTERMSG_EXT_TYPE_CLIENT_IPV6,
CLUSTERMSG_EXT_TYPE_CLIENT_PORT,
CLUSTERMSG_EXT_TYPE_CLIENT_TLS_PORT,
} clusterMsgPingtypes;

/* Helper function for making sure extensions are eight byte aligned. */
Expand Down Expand Up @@ -192,6 +195,14 @@ typedef struct {
char announce_client_ipv6[1]; /* Announced client IPv6, ends with \0. */
} clusterMsgPingExtClientIpV6;

typedef struct {
uint16_t announce_client_port; /* Announced client port. */
} clusterMsgPingExtClientPort;

typedef struct {
uint16_t announce_client_tls_port; /* Announced client TLS port. */
} clusterMsgPingExtClientTlsPort;

typedef struct {
uint32_t length; /* Total length of this extension message (including this header) */
uint16_t type; /* Type of this extension message (see clusterMsgPingtypes) */
Expand All @@ -203,6 +214,8 @@ typedef struct {
clusterMsgPingExtShardId shard_id;
clusterMsgPingExtClientIpV4 announce_client_ipv4;
clusterMsgPingExtClientIpV6 announce_client_ipv6;
clusterMsgPingExtClientPort announce_client_port;
clusterMsgPingExtClientTlsPort announce_client_tls_port;
} ext[]; /* Actual extension information, formatted so that the data is 8
* byte aligned, regardless of its content. */
} clusterMsgPingExt;
Expand Down Expand Up @@ -363,6 +376,8 @@ struct _clusterNode {
int tcp_port; /* Latest known clients TCP port. */
int tls_port; /* Latest known clients TLS port */
int cport; /* Latest known cluster port of this node. */
int announce_client_tcp_port; /* Port for clients only. */
int announce_client_tls_port; /* TLS port for clients only. */
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
rax *fail_reports; /* Radix tree for failure reports with sorted order by timestamp */
Expand Down
2 changes: 2 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3298,6 +3298,8 @@ standardConfig static_configs[] = {
createIntConfig("cluster-announce-bus-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_bus_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Default: Use +10000 offset. */
createIntConfig("cluster-announce-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Use server.port */
createIntConfig("cluster-announce-tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_tls_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Use server.tls_port */
createIntConfig("cluster-announce-client-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_client_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort),
createIntConfig("cluster-announce-client-tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_client_tls_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort),
createIntConfig("repl-timeout", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_timeout, 60, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-ping-replica-period", "repl-ping-slave-period", MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_ping_replica_period, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("list-compress-depth", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL),
Expand Down
2 changes: 1 addition & 1 deletion src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -9234,7 +9234,7 @@ int moduleGetClusterNodeInfoForClient(ValkeyModuleCtx *ctx,
else
memset(primary_id, 0, VALKEYMODULE_NODE_ID_LEN);
}
if (port) *port = getNodeDefaultClientPort(node);
if (port) *port = clusterNodeClientPort(node, server.tls_cluster, c);

/* As usually we have to remap flags for modules, in order to ensure
* we can provide binary compatibility. */
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2142,6 +2142,8 @@ struct valkeyServer {
int cluster_announce_port; /* base port to announce on cluster bus. */
int cluster_announce_tls_port; /* TLS port to announce on cluster bus. */
int cluster_announce_bus_port; /* bus port to announce on cluster bus. */
int cluster_announce_client_port; /* TCP port for clients to announce on cluster bus. */
int cluster_announce_client_tls_port; /* TLS port for clients to announce on cluster bus. */
int cluster_module_flags; /* Set of flags that modules are able
to set in order to suppress certain
native Redis Cluster features. Check the
Expand Down
Loading
Loading