Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ SET(valkey_sources
src/alloc.c
src/async.c
src/command.c
src/conn.c
src/crc16.c
src/dict.c
src/net.c
Expand Down
10 changes: 2 additions & 8 deletions include/valkey/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,10 @@
#include "valkey.h"

void valkeyNetClose(valkeyContext *c);
ssize_t valkeyNetRead(valkeyContext *c, char *buf, size_t bufcap);
ssize_t valkeyNetWrite(valkeyContext *c);

int valkeyCheckSocketError(valkeyContext *c);
int valkeyContextSetTimeout(valkeyContext *c, const struct timeval tv);
int valkeyContextConnectTcp(valkeyContext *c, const char *addr, int port, const struct timeval *timeout);
int valkeyContextConnectBindTcp(valkeyContext *c, const char *addr, int port,
const struct timeval *timeout,
const char *source_addr);
int valkeyContextConnectUnix(valkeyContext *c, const char *path, const struct timeval *timeout);
int valkeyTcpSetTimeout(valkeyContext *c, const struct timeval tv);
int valkeyContextConnectTcp(valkeyContext *c, const valkeyOptions *options);
int valkeyKeepAlive(valkeyContext *c, int interval);
int valkeyCheckConnectDone(valkeyContext *c, int *completed);

Expand Down
6 changes: 5 additions & 1 deletion include/valkey/valkey.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ void valkeyFreeSdsCommand(sds cmd);
enum valkeyConnectionType {
VALKEY_CONN_TCP,
VALKEY_CONN_UNIX,
VALKEY_CONN_USERFD
VALKEY_CONN_USERFD,

VALKEY_CONN_MAX
};

struct valkeySsl;
Expand Down Expand Up @@ -239,6 +241,7 @@ typedef struct {
} while(0)

typedef struct valkeyContextFuncs {
int (*connect)(struct valkeyContext *, const valkeyOptions *);
void (*close)(struct valkeyContext *);
void (*free_privctx)(void *);
void (*async_read)(struct valkeyAsyncContext *);
Expand All @@ -250,6 +253,7 @@ typedef struct valkeyContextFuncs {
* recoverable error, they should return 0. */
ssize_t (*read)(struct valkeyContext *, char *, size_t);
ssize_t (*write)(struct valkeyContext *);
int (*set_timeout)(struct valkeyContext *, const struct timeval);
} valkeyContextFuncs;


Expand Down
61 changes: 61 additions & 0 deletions src/conn.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2024, zhenwei pi <[email protected]>
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "valkey_private.h"

#include <assert.h>

static valkeyContextFuncs *_valkeyContextFuncs[VALKEY_CONN_MAX];

int valkeyContextRegisterFuncs(valkeyContextFuncs *funcs, enum valkeyConnectionType type) {
assert(type < VALKEY_CONN_MAX);
assert(!_valkeyContextFuncs[type]);

_valkeyContextFuncs[type] = funcs;
return VALKEY_OK;
}

void valkeyContextSetFuncs(valkeyContext *c) {
valkeyContextFuncs *funcs;
static int initialized;

if (!initialized) {
initialized = 1;
valkeyContextRegisterTcpFuncs();
valkeyContextRegisterUnixFuncs();
valkeyContextRegisterUserfdFuncs();
}

assert(c->connection_type < VALKEY_CONN_MAX);
assert(!c->funcs);
assert(funcs = _valkeyContextFuncs[c->connection_type]);

c->funcs = funcs;
}
86 changes: 64 additions & 22 deletions src/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <stdlib.h>
#include <time.h>

#include "async.h"
#include "net.h"
#include "sds.h"
#include "sockcompat.h"
Expand All @@ -56,7 +57,7 @@ void valkeyNetClose(valkeyContext *c) {
}
}

ssize_t valkeyNetRead(valkeyContext *c, char *buf, size_t bufcap) {
static ssize_t valkeyNetRead(valkeyContext *c, char *buf, size_t bufcap) {
ssize_t nread = recv(c->fd, buf, bufcap, 0);
if (nread == -1) {
if ((errno == EWOULDBLOCK && !(c->flags & VALKEY_BLOCK)) || (errno == EINTR)) {
Expand All @@ -78,7 +79,7 @@ ssize_t valkeyNetRead(valkeyContext *c, char *buf, size_t bufcap) {
}
}

ssize_t valkeyNetWrite(valkeyContext *c) {
static ssize_t valkeyNetWrite(valkeyContext *c) {
ssize_t nwritten;

nwritten = send(c->fd, c->obuf, sdslen(c->obuf), 0);
Expand Down Expand Up @@ -354,14 +355,10 @@ int valkeyCheckSocketError(valkeyContext *c) {
return VALKEY_OK;
}

int valkeyContextSetTimeout(valkeyContext *c, const struct timeval tv) {
int valkeyTcpSetTimeout(valkeyContext *c, const struct timeval tv) {
const void *to_ptr = &tv;
size_t to_sz = sizeof(tv);

if (valkeyContextUpdateCommandTimeout(c, &tv) != VALKEY_OK) {
valkeySetError(c, VALKEY_ERR_OOM, "Out of memory");
return VALKEY_ERR;
}
if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,to_ptr,to_sz) == -1) {
valkeySetErrorFromErrno(c,VALKEY_ERR_IO,"setsockopt(SO_RCVTIMEO)");
return VALKEY_ERR;
Expand All @@ -373,9 +370,11 @@ int valkeyContextSetTimeout(valkeyContext *c, const struct timeval tv) {
return VALKEY_OK;
}

static int _valkeyContextConnectTcp(valkeyContext *c, const char *addr, int port,
const struct timeval *timeout,
const char *source_addr) {
int valkeyContextConnectTcp(valkeyContext *c, const valkeyOptions *options) {
const struct timeval *timeout = options->connect_timeout;
const char *addr = options->endpoint.tcp.ip;
const char *source_addr = options->endpoint.tcp.source_addr;
int port = options->endpoint.tcp.port;
valkeyFD s;
int rv, n;
char _port[6]; /* strlen("65535"); */
Expand Down Expand Up @@ -553,19 +552,10 @@ static int _valkeyContextConnectTcp(valkeyContext *c, const char *addr, int port
return rv; // Need to return VALKEY_OK if alright
}

int valkeyContextConnectTcp(valkeyContext *c, const char *addr, int port,
const struct timeval *timeout) {
return _valkeyContextConnectTcp(c, addr, port, timeout, NULL);
}

int valkeyContextConnectBindTcp(valkeyContext *c, const char *addr, int port,
const struct timeval *timeout,
const char *source_addr) {
return _valkeyContextConnectTcp(c, addr, port, timeout, source_addr);
}

int valkeyContextConnectUnix(valkeyContext *c, const char *path, const struct timeval *timeout) {
static int valkeyContextConnectUnix(valkeyContext *c, const valkeyOptions *options) {
#ifndef _WIN32
const struct timeval *timeout = options->connect_timeout;
const char *path = options->endpoint.unix_socket;
int blocking = (c->flags & VALKEY_BLOCK);
struct sockaddr_un *sa;
long timeout_msec = -1;
Expand Down Expand Up @@ -630,3 +620,55 @@ int valkeyContextConnectUnix(valkeyContext *c, const char *path, const struct ti
valkeySetError(c, VALKEY_ERR_OOM, "Out of memory");
return VALKEY_ERR;
}

static valkeyContextFuncs valkeyContextTcpFuncs = {
.connect = valkeyContextConnectTcp,
.close = valkeyNetClose,
.free_privctx = NULL,
.async_read = valkeyAsyncRead,
.async_write = valkeyAsyncWrite,
.read = valkeyNetRead,
.write = valkeyNetWrite,
.set_timeout = valkeyTcpSetTimeout
};

void valkeyContextRegisterTcpFuncs(void) {
valkeyContextRegisterFuncs(&valkeyContextTcpFuncs, VALKEY_CONN_TCP);
}

static valkeyContextFuncs valkeyContextUnixFuncs = {
.connect = valkeyContextConnectUnix,
.close = valkeyNetClose,
.free_privctx = NULL,
.async_read = valkeyAsyncRead,
.async_write = valkeyAsyncWrite,
.read = valkeyNetRead,
.write = valkeyNetWrite,
.set_timeout = valkeyTcpSetTimeout
};

void valkeyContextRegisterUnixFuncs(void) {
valkeyContextRegisterFuncs(&valkeyContextUnixFuncs, VALKEY_CONN_UNIX);
}

static int valkeyContextConnectUserfd(valkeyContext *c, const valkeyOptions *options) {
c->fd = options->endpoint.fd;
c->flags |= VALKEY_CONNECTED;

return VALKEY_OK;
}

static valkeyContextFuncs valkeyContextUserfdFuncs = {
.connect = valkeyContextConnectUserfd,
.close = valkeyNetClose,
.free_privctx = NULL,
.async_read = valkeyAsyncRead,
.async_write = valkeyAsyncWrite,
.read = valkeyNetRead,
.write = valkeyNetWrite,
.set_timeout = valkeyTcpSetTimeout
};

void valkeyContextRegisterUserfdFuncs(void) {
valkeyContextRegisterFuncs(&valkeyContextUserfdFuncs, VALKEY_CONN_USERFD);
}
4 changes: 3 additions & 1 deletion src/ssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -609,11 +609,13 @@ static void valkeySSLAsyncWrite(valkeyAsyncContext *ac) {
}

static valkeyContextFuncs valkeyContextSSLFuncs = {
.connect = valkeyContextConnectTcp,
.close = valkeyNetClose,
.free_privctx = valkeySSLFree,
.async_read = valkeySSLAsyncRead,
.async_write = valkeySSLAsyncWrite,
.read = valkeySSLRead,
.write = valkeySSLWrite
.write = valkeySSLWrite,
.set_timeout = valkeyTcpSetTimeout
};

Loading