Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions src/anet.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "anet.h"
#include "config.h"
#include "util.h"
#include "serverassert.h"

#define UNUSED(x) (void)(x)

Expand Down Expand Up @@ -573,7 +574,22 @@ static int anetV6Only(char *err, int s) {
return ANET_OK;
}

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) {
/* XXX: Until glibc 2.41, getaddrinfo with hints.ai_protocol of IPPROTO_MPTCP leads error.
* Use hints.ai_protocol IPPROTO_IP (0) or IPPROTO_TCP (6) to resolve address and overwrite
* it when MPTCP is enabled.
* Ref: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/tools/testing/selftests/net/mptcp/mptcp_connect.c
* https://sourceware.org/git/?p=glibc.git;a=commit;h=a8e9022e0f829d44a818c642fc85b3bfbd26a514
*/
static int anetTcpGetProtocol(int is_mptcp_enabled) {
#ifdef IPPROTO_MPTCP
return is_mptcp_enabled ? IPPROTO_MPTCP : IPPROTO_TCP;
#else
assert(!is_mptcp_enabled);
return IPPROTO_TCP;
#endif
}

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog, int mptcp) {
int s = -1, rv;
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;
Expand All @@ -591,7 +607,10 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl
return ANET_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((s = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) continue;
rv = anetTcpGetProtocol(mptcp);
if (rv == ANET_ERR) goto error;

if ((s = socket(p->ai_family, p->ai_socktype, rv)) == -1) continue;

if (af == AF_INET6 && anetV6Only(err, s) == ANET_ERR) goto error;
if (anetSetReuseAddr(err, s) == ANET_ERR) goto error;
Expand All @@ -611,12 +630,12 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl
return s;
}

int anetTcpServer(char *err, int port, char *bindaddr, int backlog) {
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int mptcp) {
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog, mptcp);
}

int anetTcp6Server(char *err, int port, char *bindaddr, int backlog) {
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int mptcp) {
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog, mptcp);
}

int anetUnixServer(char *err, char *path, mode_t perm, int backlog, char *group) {
Expand Down
4 changes: 2 additions & 2 deletions src/anet.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
int anetTcpNonBlockConnect(char *err, const char *addr, int port);
int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port, const char *source_addr);
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len, int flags);
int anetTcpServer(char *err, int port, char *bindaddr, int backlog);
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog);
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int mptcp);
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int mptcp);
int anetUnixServer(char *err, char *path, mode_t perm, int backlog, char *group);
int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port);
int anetUnixAccept(char *err, int serversock);
Expand Down
14 changes: 14 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2427,6 +2427,19 @@ static int isValidIpV6(char *val, const char **err) {
return 1;
}

static int isValidMptcp(int val, const char **err) {
#ifndef IPPROTO_MPTCP
if (val) {
*err = "MPTCP is not supported on this platform";
return 0;
}
#else
UNUSED(val);
UNUSED(err);
#endif
return 1;
}

/* Validate specified string is a valid proc-title-template */
static int isValidProcTitleTemplate(char *val, const char **err) {
if (!validateProcTitleTemplate(val)) {
Expand Down Expand Up @@ -3276,6 +3289,7 @@ standardConfig static_configs[] = {
createIntConfig("timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.maxidletime, 0, INTEGER_CONFIG, NULL, NULL), /* Default client timeout: infinite */
createIntConfig("replica-announce-port", "slave-announce-port", MODIFIABLE_CONFIG, 0, 65535, server.replica_announce_port, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("tcp-backlog", NULL, IMMUTABLE_CONFIG, 0, INT_MAX, server.tcp_backlog, 511, INTEGER_CONFIG, NULL, NULL), /* TCP listen backlog. */
createBoolConfig("mptcp", NULL, IMMUTABLE_CONFIG, server.mptcp, 0, isValidMptcp, NULL), /* Multipath TCP. */
createIntConfig("cluster-port", NULL, IMMUTABLE_CONFIG, 0, 65535, server.cluster_port, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-announce-bus-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_bus_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Default: Use +10000 offset. */
createIntConfig("cluster-announce-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Use server.port */
Expand Down
4 changes: 2 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2651,10 +2651,10 @@ int listenToPort(connListener *sfd) {
if (optional) addr++;
if (strchr(addr, ':')) {
/* Bind IPv6 address. */
sfd->fd[sfd->count] = anetTcp6Server(server.neterr, port, addr, server.tcp_backlog);
sfd->fd[sfd->count] = anetTcp6Server(server.neterr, port, addr, server.tcp_backlog, server.mptcp);
} else {
/* Bind IPv4 address. */
sfd->fd[sfd->count] = anetTcpServer(server.neterr, port, addr, server.tcp_backlog);
sfd->fd[sfd->count] = anetTcpServer(server.neterr, port, addr, server.tcp_backlog, server.mptcp);
}
if (sfd->fd[sfd->count] == ANET_ERR) {
int net_errno = errno;
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1612,6 +1612,7 @@ struct valkeyServer {
int port; /* TCP listening port */
int tls_port; /* TLS listening port */
int tcp_backlog; /* TCP listen() backlog */
int mptcp; /* Use Multipath TCP */
char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
char *bind_source_addr; /* Source address to bind on for outgoing connections */
Expand Down
1 change: 1 addition & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ start_server {tags {"introspection"}} {
rdbchecksum
daemonize
tcp-backlog
mptcp
always-show-logo
syslog-enabled
cluster-enabled
Expand Down
10 changes: 10 additions & 0 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ port 6379
# in order to get the desired effect.
tcp-backlog 511

# Multipath TCP
#
# Multipath TCP (MPTCP) allows a single transport connection to use multiple network
# interfaces or paths. It's useful for applications like bandwidth aggregation,
# failover, and more resilient connections. Set this to yes to allow clients to
# use MPTCP. Clients that don't support it will use "plain" TCP.
# Note that MPTCP is supported in the official Linux kernel starting with version 5.6.
#
# mptcp yes

# Unix socket.
#
# Specify the path for the Unix socket that will be used to listen for
Expand Down
Loading