Skip to content

Commit 09b47d8

Browse files
pizhenweihwware
authored andcommitted
Introduce MPTCP (valkey-io#1811)
Multipath TCP (MPTCP) is an extension of the standard TCP protocol that allows a single transport connection to use multiple network interfaces or paths. MPTCP is useful for applications like bandwidth aggregation, failover, and more resilient connections. Linux kernel starts to support MPTCP since v5.6, it's time to support it. The test report shows that MPTCP reduces latency by ~25% in a 1% networking packet drop environment. Thanks to Matthieu Baerts <[email protected]> for lots of review suggestions. Proposed-by: Geliang Tang <[email protected]> Tested-by: Gang Yan <[email protected]> Signed-off-by: zhenwei pi <[email protected]> Signed-off-by: zhenwei pi <[email protected]> Cc Linux kernel MPTCP maintainer @matttbe Signed-off-by: hwware <[email protected]>
1 parent e89ae28 commit 09b47d8

File tree

7 files changed

+56
-10
lines changed

7 files changed

+56
-10
lines changed

src/anet.c

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
#include "anet.h"
5151
#include "config.h"
5252
#include "util.h"
53+
#include "serverassert.h"
5354

5455
#define UNUSED(x) (void)(x)
5556

@@ -573,7 +574,22 @@ static int anetV6Only(char *err, int s) {
573574
return ANET_OK;
574575
}
575576

576-
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) {
577+
/* XXX: Until glibc 2.41, getaddrinfo with hints.ai_protocol of IPPROTO_MPTCP leads error.
578+
* Use hints.ai_protocol IPPROTO_IP (0) or IPPROTO_TCP (6) to resolve address and overwrite
579+
* it when MPTCP is enabled.
580+
* Ref: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/tools/testing/selftests/net/mptcp/mptcp_connect.c
581+
* https://sourceware.org/git/?p=glibc.git;a=commit;h=a8e9022e0f829d44a818c642fc85b3bfbd26a514
582+
*/
583+
static int anetTcpGetProtocol(int is_mptcp_enabled) {
584+
#ifdef IPPROTO_MPTCP
585+
return is_mptcp_enabled ? IPPROTO_MPTCP : IPPROTO_TCP;
586+
#else
587+
assert(!is_mptcp_enabled);
588+
return IPPROTO_TCP;
589+
#endif
590+
}
591+
592+
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog, int mptcp) {
577593
int s = -1, rv;
578594
char _port[6]; /* strlen("65535") */
579595
struct addrinfo hints, *servinfo, *p;
@@ -591,7 +607,7 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl
591607
return ANET_ERR;
592608
}
593609
for (p = servinfo; p != NULL; p = p->ai_next) {
594-
if ((s = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) continue;
610+
if ((s = socket(p->ai_family, p->ai_socktype, anetTcpGetProtocol(mptcp))) == -1) continue;
595611

596612
if (af == AF_INET6 && anetV6Only(err, s) == ANET_ERR) goto error;
597613
if (anetSetReuseAddr(err, s) == ANET_ERR) goto error;
@@ -611,12 +627,12 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl
611627
return s;
612628
}
613629

614-
int anetTcpServer(char *err, int port, char *bindaddr, int backlog) {
615-
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
630+
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int mptcp) {
631+
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog, mptcp);
616632
}
617633

618-
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog) {
619-
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
634+
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int mptcp) {
635+
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog, mptcp);
620636
}
621637

622638
int anetUnixServer(char *err, char *path, mode_t perm, int backlog, char *group) {

src/anet.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@
5454
int anetTcpNonBlockConnect(char *err, const char *addr, int port);
5555
int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port, const char *source_addr);
5656
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len, int flags);
57-
int anetTcpServer(char *err, int port, char *bindaddr, int backlog);
58-
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog);
57+
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int mptcp);
58+
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int mptcp);
5959
int anetUnixServer(char *err, char *path, mode_t perm, int backlog, char *group);
6060
int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port);
6161
int anetUnixAccept(char *err, int serversock);
@@ -75,4 +75,12 @@ int anetSetSockMarkId(char *err, int fd, uint32_t id);
7575
int anetGetError(int fd);
7676
int anetIsFifo(char *filepath);
7777

78+
static inline int anetHasMptcp(void) {
79+
#ifdef IPPROTO_MPTCP
80+
return 1;
81+
#else
82+
return 0;
83+
#endif
84+
}
85+
7886
#endif

src/config.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2427,6 +2427,15 @@ static int isValidIpV6(char *val, const char **err) {
24272427
return 1;
24282428
}
24292429

2430+
static int isValidMptcp(int val, const char **err) {
2431+
if (val && !anetHasMptcp()) {
2432+
*err = "MPTCP is not supported on this platform";
2433+
return 0;
2434+
}
2435+
2436+
return 1;
2437+
}
2438+
24302439
/* Validate specified string is a valid proc-title-template */
24312440
static int isValidProcTitleTemplate(char *val, const char **err) {
24322441
if (!validateProcTitleTemplate(val)) {
@@ -3277,6 +3286,7 @@ standardConfig static_configs[] = {
32773286
createIntConfig("timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.maxidletime, 0, INTEGER_CONFIG, NULL, NULL), /* Default client timeout: infinite */
32783287
createIntConfig("replica-announce-port", "slave-announce-port", MODIFIABLE_CONFIG, 0, 65535, server.replica_announce_port, 0, INTEGER_CONFIG, NULL, NULL),
32793288
createIntConfig("tcp-backlog", NULL, IMMUTABLE_CONFIG, 0, INT_MAX, server.tcp_backlog, 511, INTEGER_CONFIG, NULL, NULL), /* TCP listen backlog. */
3289+
createBoolConfig("mptcp", NULL, IMMUTABLE_CONFIG, server.mptcp, 0, isValidMptcp, NULL), /* Multipath TCP. */
32803290
createIntConfig("cluster-port", NULL, IMMUTABLE_CONFIG, 0, 65535, server.cluster_port, 0, INTEGER_CONFIG, NULL, NULL),
32813291
createIntConfig("cluster-announce-bus-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_bus_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Default: Use +10000 offset. */
32823292
createIntConfig("cluster-announce-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Use server.port */

src/server.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2631,10 +2631,10 @@ int listenToPort(connListener *sfd) {
26312631
if (optional) addr++;
26322632
if (strchr(addr, ':')) {
26332633
/* Bind IPv6 address. */
2634-
sfd->fd[sfd->count] = anetTcp6Server(server.neterr, port, addr, server.tcp_backlog);
2634+
sfd->fd[sfd->count] = anetTcp6Server(server.neterr, port, addr, server.tcp_backlog, server.mptcp);
26352635
} else {
26362636
/* Bind IPv4 address. */
2637-
sfd->fd[sfd->count] = anetTcpServer(server.neterr, port, addr, server.tcp_backlog);
2637+
sfd->fd[sfd->count] = anetTcpServer(server.neterr, port, addr, server.tcp_backlog, server.mptcp);
26382638
}
26392639
if (sfd->fd[sfd->count] == ANET_ERR) {
26402640
int net_errno = errno;

src/server.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,6 +1613,7 @@ struct valkeyServer {
16131613
int port; /* TCP listening port */
16141614
int tls_port; /* TLS listening port */
16151615
int tcp_backlog; /* TCP listen() backlog */
1616+
int mptcp; /* Use Multipath TCP */
16161617
char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */
16171618
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
16181619
char *bind_source_addr; /* Source address to bind on for outgoing connections */

tests/unit/introspection.tcl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,7 @@ start_server {tags {"introspection"}} {
920920
rdbchecksum
921921
daemonize
922922
tcp-backlog
923+
mptcp
923924
always-show-logo
924925
syslog-enabled
925926
cluster-enabled

valkey.conf

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,16 @@ port 6379
147147
# in order to get the desired effect.
148148
tcp-backlog 511
149149

150+
# Multipath TCP (MPTCP)
151+
#
152+
# MPTCP splits a single TCP connection into subflows over multiple interfaces or paths.
153+
# It enables bandwidth aggregation, failover, and improved reliability.
154+
# When set to 'yes', clients will be able to use MPTCP if requested. When not
155+
# requested, regular TCP can be used like before.
156+
# Note: MPTCP is supported in the mainline Linux kernel starting from version 5.6.
157+
#
158+
# mptcp yes
159+
150160
# Unix socket.
151161
#
152162
# Specify the path for the Unix socket that will be used to listen for

0 commit comments

Comments
 (0)