diff --git a/common/Makefile.am b/common/Makefile.am index fec265852..32532736a 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -29,6 +29,8 @@ libswsscommon_la_SOURCES = \ logger.cpp \ redisreply.cpp \ dbconnector.cpp \ + dbinterface.cpp \ + sonicv2connector.cpp \ table.cpp \ json.cpp \ producertable.cpp \ diff --git a/common/dbconnector.cpp b/common/dbconnector.cpp index 4aec7b729..94b1fa1cf 100644 --- a/common/dbconnector.cpp +++ b/common/dbconnector.cpp @@ -13,8 +13,7 @@ using json = nlohmann::json; using namespace std; - -namespace swss { +using namespace swss; void SonicDBConfig::parseDatabaseConfig(const string &file, std::unordered_map &inst_entry, @@ -362,6 +361,22 @@ vector SonicDBConfig::getNamespaces() return list; } +std::vector SonicDBConfig::getDbList(const std::string &netns) +{ + if (!m_init) + { + initialize(); + } + validateNamespace(netns); + + std::vector dbNames; + for (auto& imap: m_db_info.at(netns)) + { + dbNames.push_back(imap.first); + } + return dbNames; +} + constexpr const char *SonicDBConfig::DEFAULT_SONIC_DB_CONFIG_FILE; constexpr const char *SonicDBConfig::DEFAULT_SONIC_DB_GLOBAL_CONFIG_FILE; unordered_map> SonicDBConfig::m_inst_info; @@ -370,7 +385,100 @@ unordered_map> SonicDBConfig::m_db_separator; bool SonicDBConfig::m_init = false; bool SonicDBConfig::m_global_init = false; -constexpr const char *DBConnector::DEFAULT_UNIXSOCKET; +constexpr const char *RedisContext::DEFAULT_UNIXSOCKET; + +RedisContext::~RedisContext() +{ + redisFree(m_conn); +} + +RedisContext::RedisContext() +{ +} + +RedisContext::RedisContext(const RedisContext &other) +{ + auto octx = other.getContext(); + const char *unixPath = octx->unix_sock.path; + if (unixPath) + { + initContext(unixPath, *octx->timeout); + } + else + { + initContext(octx->tcp.host, octx->tcp.port, *octx->timeout); + } +} + +RedisContext::RedisContext(const string& hostname, int port, + unsigned int timeout) +{ + struct timeval tv = {0, (suseconds_t)timeout * 1000}; + initContext(hostname.c_str(), port, tv); +} + +RedisContext::RedisContext(const string& unixPath, unsigned int timeout) +{ + struct timeval tv = {0, (suseconds_t)timeout * 1000}; + initContext(unixPath.c_str(), tv); +} + +void RedisContext::initContext(const char *host, int port, const timeval& tv) +{ + m_conn = redisConnectWithTimeout(host, port, tv); + + if (m_conn->err) + throw system_error(make_error_code(errc::address_not_available), + "Unable to connect to redis"); +} + +void RedisContext::initContext(const char *path, const timeval &tv) +{ + m_conn = redisConnectUnixWithTimeout(path, tv); + + if (m_conn->err) + throw system_error(make_error_code(errc::address_not_available), + "Unable to connect to redis (unix-socket)"); +} + +redisContext *RedisContext::getContext() const +{ + return m_conn; +} + +void RedisContext::setContext(redisContext *ctx) +{ + m_conn = ctx; +} + +void RedisContext::setClientName(const string& clientName) +{ + string command("CLIENT SETNAME "); + command += clientName; + + RedisReply r(this, command, REDIS_REPLY_STATUS); + r.checkStatusOK(); +} + +string RedisContext::getClientName() +{ + string command("CLIENT GETNAME"); + + RedisReply r(this, command); + + auto ctx = r.getContext(); + if (ctx->type == REDIS_REPLY_STRING) + { + return r.getReply(); + } + else + { + if (ctx->type != REDIS_REPLY_NIL) + SWSS_LOG_ERROR("Unable to obtain Redis client name"); + + return ""; + } +} void DBConnector::select(DBConnector *db) { @@ -381,45 +489,36 @@ void DBConnector::select(DBConnector *db) r.checkStatusOK(); } -DBConnector::~DBConnector() +DBConnector::DBConnector(const DBConnector &other) + : RedisContext(other) + , m_dbId(other.m_dbId) + , m_namespace(other.m_namespace) { - redisFree(m_conn); + select(this); +} + +DBConnector::DBConnector(int dbId, const RedisContext& ctx) + : RedisContext(ctx) + , m_dbId(dbId) + , m_namespace(EMPTY_NAMESPACE) +{ + select(this); } DBConnector::DBConnector(int dbId, const string& hostname, int port, unsigned int timeout) : + RedisContext(hostname, port, timeout), m_dbId(dbId), m_namespace(EMPTY_NAMESPACE) { - struct timeval tv = {0, (suseconds_t)timeout * 1000}; - - if (timeout) - m_conn = redisConnectWithTimeout(hostname.c_str(), port, tv); - else - m_conn = redisConnect(hostname.c_str(), port); - - if (m_conn->err) - throw system_error(make_error_code(errc::address_not_available), - "Unable to connect to redis"); - select(this); } DBConnector::DBConnector(int dbId, const string& unixPath, unsigned int timeout) : + RedisContext(unixPath, timeout), m_dbId(dbId), m_namespace(EMPTY_NAMESPACE) { - struct timeval tv = {0, (suseconds_t)timeout * 1000}; - - if (timeout) - m_conn = redisConnectUnixWithTimeout(unixPath.c_str(), tv); - else - m_conn = redisConnectUnix(unixPath.c_str()); - - if (m_conn->err) - throw system_error(make_error_code(errc::address_not_available), - "Unable to connect to redis (unix-socket)"); - select(this); } @@ -430,25 +529,15 @@ DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpC { struct timeval tv = {0, (suseconds_t)timeout * 1000}; - if (timeout) + if (isTcpConn) { - if (isTcpConn) - m_conn = redisConnectWithTimeout(SonicDBConfig::getDbHostname(dbName, netns).c_str(), SonicDBConfig::getDbPort(dbName, netns), tv); - else - m_conn = redisConnectUnixWithTimeout(SonicDBConfig::getDbSock(dbName, netns).c_str(), tv); + initContext(SonicDBConfig::getDbHostname(dbName, netns).c_str(), SonicDBConfig::getDbPort(dbName, netns), tv); } else { - if (isTcpConn) - m_conn = redisConnect(SonicDBConfig::getDbHostname(dbName, netns).c_str(), SonicDBConfig::getDbPort(dbName, netns)); - else - m_conn = redisConnectUnix(SonicDBConfig::getDbSock(dbName, netns).c_str()); + initContext(SonicDBConfig::getDbSock(dbName, netns).c_str(), tv); } - if (m_conn->err) - throw system_error(make_error_code(errc::address_not_available), - "Unable to connect to redis"); - select(this); } @@ -458,11 +547,6 @@ DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpC // Empty contructor } -redisContext *DBConnector::getContext() const -{ - return m_conn; -} - int DBConnector::getDbId() const { return m_dbId; @@ -473,6 +557,11 @@ string DBConnector::getDbName() const return m_dbName; } +void DBConnector::setNamespace(const string& netns) +{ + m_namespace = netns; +} + string DBConnector::getNamespace() const { return m_namespace; @@ -493,40 +582,11 @@ DBConnector *DBConnector::newConnector(unsigned int timeout) const timeout); ret->m_dbName = m_dbName; - ret->m_namespace = m_namespace; + ret->setNamespace(getNamespace()); return ret; } -void DBConnector::setClientName(const string& clientName) -{ - string command("CLIENT SETNAME "); - command += clientName; - - RedisReply r(this, command, REDIS_REPLY_STATUS); - r.checkStatusOK(); -} - -string DBConnector::getClientName() -{ - string command("CLIENT GETNAME"); - - RedisReply r(this, command); - - auto ctx = r.getContext(); - if (ctx->type == REDIS_REPLY_STRING) - { - return r.getReply(); - } - else - { - if (ctx->type != REDIS_REPLY_NIL) - SWSS_LOG_ERROR("Unable to obtain Redis client name"); - - return ""; - } -} - int64_t DBConnector::del(const string &key) { RedisCommand sdel; @@ -578,6 +638,13 @@ void DBConnector::set(const string &key, const string &value) RedisReply r(this, sset, REDIS_REPLY_STATUS); } +void DBConnector::config_set(const std::string &key, const std::string &value) +{ + RedisCommand sset; + sset.format("CONFIG SET %s %s", key.c_str(), value.c_str()); + RedisReply r(this, sset, REDIS_REPLY_STATUS); +} + unordered_map DBConnector::hgetall(const string &key) { unordered_map map; @@ -688,4 +755,24 @@ shared_ptr DBConnector::blpop(const string &list, int timeout) throw runtime_error("GET failed, memory exception"); } +void DBConnector::subscribe(const std::string &pattern) +{ + std::string s("SUBSCRIBE "); + s += pattern; + RedisReply r(this, s, REDIS_REPLY_ARRAY); +} + +void DBConnector::psubscribe(const std::string &pattern) +{ + std::string s("PSUBSCRIBE "); + s += pattern; + RedisReply r(this, s, REDIS_REPLY_ARRAY); +} + +int64_t DBConnector::publish(const string &channel, const string &message) +{ + RedisCommand publish; + publish.format("PUBLISH %s %s", channel.c_str(), message.c_str()); + RedisReply r(this, publish, REDIS_REPLY_INTEGER); + return r.getReply(); } diff --git a/common/dbconnector.h b/common/dbconnector.h index 84859d3ec..958f78d2d 100644 --- a/common/dbconnector.h +++ b/common/dbconnector.h @@ -35,6 +35,8 @@ class SonicDBInfo class SonicDBConfig { public: + static constexpr const char *DEFAULT_SONIC_DB_CONFIG_FILE = "/var/run/redis/sonic-db/database_config.json"; + static constexpr const char *DEFAULT_SONIC_DB_GLOBAL_CONFIG_FILE = "/var/run/redis/sonic-db/database_global.json"; static void initialize(const std::string &file = DEFAULT_SONIC_DB_CONFIG_FILE); static void initializeGlobalConfig(const std::string &file = DEFAULT_SONIC_DB_GLOBAL_CONFIG_FILE); static void validateNamespace(const std::string &netns); @@ -47,12 +49,11 @@ class SonicDBConfig static std::string getDbHostname(const std::string &dbName, const std::string &netns = EMPTY_NAMESPACE); static int getDbPort(const std::string &dbName, const std::string &netns = EMPTY_NAMESPACE); static std::vector getNamespaces(); + static std::vector getDbList(const std::string &netns = EMPTY_NAMESPACE); static bool isInit() { return m_init; }; static bool isGlobalInit() { return m_global_init; }; private: - static constexpr const char *DEFAULT_SONIC_DB_CONFIG_FILE = "/var/run/redis/sonic-db/database_config.json"; - static constexpr const char *DEFAULT_SONIC_DB_GLOBAL_CONFIG_FILE = "/var/run/redis/sonic-db/database_global.json"; // { namespace { instName, { unix_socket_path, hostname, port } } } static std::unordered_map> m_inst_info; // { namespace, { dbName, {instName, dbId, separator} } } @@ -69,7 +70,46 @@ class SonicDBConfig static RedisInstInfo& getRedisInfo(const std::string &dbName, const std::string &netns = EMPTY_NAMESPACE); }; -class DBConnector +class RedisContext +{ +public: + static constexpr const char *DEFAULT_UNIXSOCKET = "/var/run/redis/redis.sock"; + + /* + * Connect to Redis DB wither with a hostname:port or unix socket + * Select the database index provided by "db" + * + * Timeout - The time in milisecond until exception is been thrown. For + * infinite wait, set this value to 0 + */ + RedisContext(const std::string &hostname, int port, unsigned int timeout); + RedisContext(const std::string &unixPath, unsigned int timeout); + RedisContext(const RedisContext &other); + RedisContext& operator=(const RedisContext&) = delete; + + ~RedisContext(); + + redisContext *getContext() const; + + /* + * Assign a name to the Redis client used for this connection + * This is helpful when debugging Redis clients using `redis-cli client list` + */ + void setClientName(const std::string& clientName); + + std::string getClientName(); + +protected: + RedisContext(); + void initContext(const char *host, int port, const timeval& tv); + void initContext(const char *path, const timeval &tv); + void setContext(redisContext *ctx); + +private: + redisContext *m_conn; +}; + +class DBConnector : public RedisContext { public: static constexpr const char *DEFAULT_UNIXSOCKET = "/var/run/redis/redis.sock"; @@ -81,31 +121,31 @@ class DBConnector * Timeout - The time in milisecond until exception is been thrown. For * infinite wait, set this value to 0 */ + DBConnector(const DBConnector &other); + DBConnector(int dbId, const RedisContext &ctx); DBConnector(int dbId, const std::string &hostname, int port, unsigned int timeout); DBConnector(int dbId, const std::string &unixPath, unsigned int timeout); DBConnector(const std::string &dbName, unsigned int timeout, bool isTcpConn = false); DBConnector(const std::string &dbName, unsigned int timeout, bool isTcpConn, const std::string &netns); + DBConnector& operator=(const DBConnector&) = delete; - ~DBConnector(); - - redisContext *getContext() const; int getDbId() const; std::string getDbName() const; std::string getNamespace() const; +#ifdef SWIG + %pythoncode %{ + __swig_getmethods__["namespace"] = getNamespace + __swig_setmethods__["namespace"] = None + if _newclass: namespace = property(getNamespace, None) + %} +#endif + static void select(DBConnector *db); /* Create new context to DB */ DBConnector *newConnector(unsigned int timeout) const; - /* - * Assign a name to the Redis client used for this connection - * This is helpful when debugging Redis clients using `redis-cli client list` - */ - void setClientName(const std::string& clientName); - - std::string getClientName(); - int64_t del(const std::string &key); bool exists(const std::string &key); @@ -140,8 +180,17 @@ class DBConnector std::shared_ptr blpop(const std::string &list, int timeout); + void subscribe(const std::string &pattern); + + void psubscribe(const std::string &pattern); + + int64_t publish(const std::string &channel, const std::string &message); + + void config_set(const std::string &key, const std::string &value); + private: - redisContext *m_conn; + void setNamespace(const std::string &netns); + int m_dbId; std::string m_dbName; std::string m_namespace; diff --git a/common/dbinterface.cpp b/common/dbinterface.cpp new file mode 100644 index 000000000..dc86ffade --- /dev/null +++ b/common/dbinterface.cpp @@ -0,0 +1,347 @@ +#include +#include +#include +#include "dbinterface.h" + +using namespace std; +using namespace std::chrono; +using namespace swss; + +void DBInterface::set_redis_kwargs(std::string unix_socket_path, std::string host, int port) +{ + m_unix_socket_path = unix_socket_path; + m_host = host; + m_port = port; +} + +void DBInterface::connect(int dbId, const std::string& dbName, bool retry) +{ + if (retry) + { + _persistent_connect(dbId, dbName); + } + else + { + _onetime_connect(dbId, dbName); + } +} + +void DBInterface::close(const std::string& dbName) +{ + m_redisClient.erase(dbName); +} + +int64_t DBInterface::del(const string& dbName, const std::string& key, bool blocking) +{ + auto innerfunc = [&] + { + return m_redisClient.at(dbName).del(key); + }; + return blockable(innerfunc, dbName, blocking); +} + +void DBInterface::delete_all_by_pattern(const string& dbName, const string& pattern) +{ + auto& client = m_redisClient.at(dbName); + auto keys = client.keys(pattern); + for (auto& key: keys) + { + client.del(key); + } +} + +bool DBInterface::exists(const string& dbName, const std::string& key) +{ + return m_redisClient.at(dbName).exists(key); +} + +std::string DBInterface::get(const std::string& dbName, const std::string& hash, const std::string& key, bool blocking) +{ + auto innerfunc = [&] + { + auto pvalue = m_redisClient.at(dbName).hget(hash, key); + if (!pvalue) + { + std::string message = "Key '" + hash + "' field '" + key + "' unavailable in database '" + dbName + "'"; + SWSS_LOG_WARN("%s", message.c_str()); + throw UnavailableDataError(message, hash); + } + const std::string& value = *pvalue; + return value == "None" ? "" : value; + }; + return blockable(innerfunc, dbName, blocking); +} + +std::map DBInterface::get_all(const std::string& dbName, const std::string& hash, bool blocking) +{ + auto innerfunc = [&] + { + std::map map; + m_redisClient.at(dbName).hgetall(hash, std::inserter(map, map.end())); + + if (map.empty()) + { + std::string message = "Key '{" + hash + "}' unavailable in database '{" + dbName + "}'"; + SWSS_LOG_WARN("%s", message.c_str()); + throw UnavailableDataError(message, hash); + } + for (auto& i : map) + { + std::string& value = i.second; + if (value == "None") + { + value = ""; + } + } + + return map; + }; + return blockable>(innerfunc, dbName, blocking); +} + +std::vector DBInterface::keys(const std::string& dbName, const char *pattern, bool blocking) +{ + auto innerfunc = [&] + { + auto keys = m_redisClient.at(dbName).keys(pattern); + if (keys.empty()) + { + std::string message = "DB '{" + dbName + "}' is empty!"; + SWSS_LOG_WARN("%s", message.c_str()); + throw UnavailableDataError(message, "hset"); + } + return keys; + }; + return blockable>(innerfunc, dbName, blocking); +} + +int64_t DBInterface::publish(const std::string& dbName, const std::string& channel, const std::string& message) +{ + return m_redisClient.at(dbName).publish(channel, message); +} + +int64_t DBInterface::set(const std::string& dbName, const std::string& hash, const std::string& key, const std::string& value, bool blocking) +{ + auto innerfunc = [&] + { + m_redisClient.at(dbName).hset(hash, key, value); + // Return the number of fields that were added. + return 1; + }; + return blockable(innerfunc, dbName, blocking); +} + +DBConnector& DBInterface::get_redis_client(const std::string& dbName) +{ + return m_redisClient.at(dbName); +} + +template +T DBInterface::blockable(FUNC f, const std::string& dbName, bool blocking) +{ + int attempts = 0; + for (;;) + { + try + { + T ret_data = f(); + _unsubscribe_keyspace_notification(dbName); + return ret_data; + } + catch (const UnavailableDataError& e) + { + if (blocking) + { + auto found = keyspace_notification_channels.find(dbName); + if (found != keyspace_notification_channels.end()) + { + bool result = _unavailable_data_handler(dbName, e.getData()); + if (result) + { + continue; // received updates, try to read data again + } + else + { + _unsubscribe_keyspace_notification(dbName); + throw; // No updates was received. Raise exception + } + } + else + { + // Subscribe to updates and try it again (avoiding race condition) + _subscribe_keyspace_notification(dbName); + } + } + else + { + return T(); + } + } + catch (const std::system_error&) + { + /* + Something is fundamentally wrong with the request itself. + Retrying the request won't pass unless the schema itself changes. In this case, the error + should be attributed to the application itself. Re-raise the error. + */ + SWSS_LOG_ERROR("Bad DB request [%s]", dbName.c_str()); + throw; + } + catch (const RedisError&) + { + // Redis connection broken and we need to retry several times + attempts += 1; + _connection_error_handler(dbName); + std::string msg = "DB access failure by [" + dbName + + "]"; + if (BLOCKING_ATTEMPT_ERROR_THRESHOLD < attempts && attempts < BLOCKING_ATTEMPT_SUPPRESSION) + { + // Repeated access failures implies the database itself is unhealthy. + SWSS_LOG_ERROR("%s", msg.c_str()); + } + else + { + SWSS_LOG_WARN("%s", msg.c_str()); + } + } + } +} + +// Unsubscribe the chosent client from keyspace event notifications +void DBInterface::_unsubscribe_keyspace_notification(const std::string& dbName) +{ + auto found = keyspace_notification_channels.find(dbName); + if (found != keyspace_notification_channels.end()) + { + SWSS_LOG_DEBUG("Unsubscribe from keyspace notification"); + + keyspace_notification_channels.erase(found); + } +} + +// When the queried config is not available in Redis--wait until it is available. +// Two timeouts are at work here: +// 1. Notification timeout - how long to wait before giving up on receiving any given pub-sub message. +// 2. Max data wait - swsssdk-specific. how long to wait for the data to populate (in absolute time) +bool DBInterface::_unavailable_data_handler(const std::string& dbName, const char *data) +{ + auto start = system_clock::now(); + SWSS_LOG_DEBUG("Listening on pubsub channel '%s'", dbName.c_str()); + auto wait = duration(PUB_SUB_MAXIMUM_DATA_WAIT); + while (system_clock::now() - start < wait) + { + auto& channel = keyspace_notification_channels.at(dbName); + auto ctx = channel->getContext(); + redisReply *reply; + int rc = redisGetReply(ctx, reinterpret_cast(&reply)); + if (rc == REDIS_ERR && ctx->err == REDIS_ERR_IO && errno == EAGAIN) + { + // Timeout + continue; + } + if (rc != REDIS_OK) + { + throw RedisError("Failed to redisGetReply with on pubsub channel on dbName=" + dbName, ctx); + } + + RedisReply r(reply); + // r is an array of: + // 0. 'type': 'pmessage', + // 1. 'pattern': '__key*__:*' + // 2. 'channel': + // 3. 'data': + redisReply& r3 = *r.getChild(3); + if (r3.type != REDIS_REPLY_STRING) + { + throw system_error(make_error_code(errc::io_error), + "Wrong expected type of result"); + } + + if (strcmp(r3.str, data) == 0) + { + SWSS_LOG_INFO("'%s' acquired via pub-sub dbName=%s. Unblocking...", data, dbName.c_str()); + // Wait for a "settling" period before releasing the wait. + sleep(DATA_RETRIEVAL_WAIT_TIME); + return true; + } + } + + SWSS_LOG_WARN("No notification for '%s' from '%s' received before timeout.", data, dbName.c_str()); + return false; +} + +// Subscribe the chosent client to keyspace event notifications +void DBInterface::_subscribe_keyspace_notification(const std::string& dbName) +{ + SWSS_LOG_DEBUG("Subscribe to keyspace notification"); + auto& client = m_redisClient.at(dbName); + DBConnector *pubsub = client.newConnector(0); + pubsub->psubscribe(KEYSPACE_PATTERN); + + // Set the timeout of the pubsub channel, so future redisGetReply will be impacted + struct timeval tv = { 0, (suseconds_t)(1000 * PUB_SUB_NOTIFICATION_TIMEOUT) }; + int rc = redisSetTimeout(pubsub->getContext(), tv); + if (rc != REDIS_OK) + { + throw RedisError("Failed to redisSetTimeout", pubsub->getContext()); + } + + keyspace_notification_channels.emplace(std::piecewise_construct, std::forward_as_tuple(dbName), std::forward_as_tuple(pubsub)); +} + +// In the event Redis is unavailable, close existing connections, and try again. +void DBInterface::_connection_error_handler(const std::string& dbName) +{ + SWSS_LOG_WARN("Could not connect to Redis--waiting before trying again."); + int dbId = get_redis_client(dbName).getDbId(); + close(dbName); + sleep(CONNECT_RETRY_WAIT_TIME); + connect(dbId, dbName, true); +} + +void DBInterface::_onetime_connect(int dbId, const string& dbName) +{ + if (dbName.empty()) + { + throw invalid_argument("dbName"); + } + + pair rc; + if (m_unix_socket_path.empty()) + { + rc = m_redisClient.emplace(std::piecewise_construct + , std::forward_as_tuple(dbName) + , std::forward_as_tuple(dbId, m_host, m_port, 0)); + } + else + { + rc = m_redisClient.emplace(std::piecewise_construct + , std::forward_as_tuple(dbName) + , std::forward_as_tuple(dbId, m_unix_socket_path, 0)); + } + bool inserted = rc.second; + if (inserted) + { + auto redisClient = rc.first->second; + redisClient.config_set("notify-keyspace-events", KEYSPACE_EVENTS); + } +} + +// Keep reconnecting to Database 'dbId' until success +void DBInterface::_persistent_connect(int dbId, const string& dbName) +{ + for (;;) + { + try + { + _onetime_connect(dbId, dbName); + return; + } + catch (RedisError&) + { + const int wait = CONNECT_RETRY_WAIT_TIME; + SWSS_LOG_WARN("Connecting to DB '%s(%d)' failed, will retry in %d s", dbName.c_str(), dbId, wait); + close(dbName); + sleep(wait); + } + } +} diff --git a/common/dbinterface.h b/common/dbinterface.h new file mode 100644 index 000000000..12a249578 --- /dev/null +++ b/common/dbinterface.h @@ -0,0 +1,109 @@ +#pragma once + +#include +#include +#include + +#include "dbconnector.h" +#include "redisclient.h" +#include "logger.h" + +namespace swss +{ + +class UnavailableDataError : public std::runtime_error +{ +public: + UnavailableDataError(const std::string& message, const std::string& data) + : std::runtime_error(message) + , m_data(data) + { + } + + const char *getData() const + { + return m_data.c_str(); + } + +private: + const std::string m_data; +}; + +class DBInterface +{ +public: + void connect(int dbId, const std::string& dbName, bool retry = true); + void close(const std::string& dbName); + int64_t del(const std::string& dbName, const std::string& key, bool blocking = false); + // Delete all keys which match %pattern from DB + void delete_all_by_pattern(const std::string& dbName, const std::string& pattern); + bool exists(const std::string& dbName, const std::string& key); + std::string get(const std::string& dbName, const std::string& hash, const std::string& key, bool blocking = false); + std::map get_all(const std::string& dbName, const std::string& hash, bool blocking = false); + std::vector keys(const std::string& dbName, const char *pattern = "*", bool blocking = false); + int64_t publish(const std::string& dbName, const std::string& channel, const std::string& message); + int64_t set(const std::string& dbName, const std::string& hash, const std::string& key, const std::string& value, bool blocking = false); + DBConnector& get_redis_client(const std::string& dbName); + void set_redis_kwargs(std::string unix_socket_path, std::string host, int port); + +private: + template + T blockable(FUNC f, const std::string& dbName, bool blocking = false); + // Unsubscribe the chosent client from keyspace event notifications + void _unsubscribe_keyspace_notification(const std::string& dbName); + bool _unavailable_data_handler(const std::string& dbName, const char *data); + // Subscribe the chosent client to keyspace event notifications + void _subscribe_keyspace_notification(const std::string& dbName); + // In the event Redis is unavailable, close existing connections, and try again. + void _connection_error_handler(const std::string& dbName); + void _onetime_connect(int dbId, const std::string& dbName); + // Keep reconnecting to Database 'dbId' until success + void _persistent_connect(int dbId, const std::string& dbName); + + static const int BLOCKING_ATTEMPT_ERROR_THRESHOLD = 10; + static const int BLOCKING_ATTEMPT_SUPPRESSION = BLOCKING_ATTEMPT_ERROR_THRESHOLD + 5; + + // Wait period in seconds before attempting to reconnect to Redis. + static const int CONNECT_RETRY_WAIT_TIME = 10; + + // Wait period in seconds to wait before attempting to retrieve missing data. + static const int DATA_RETRIEVAL_WAIT_TIME = 3; + + // Time to wait for any given message to arrive via pub-sub. + static constexpr double PUB_SUB_NOTIFICATION_TIMEOUT = 10.0; // seconds + + // Maximum allowable time to wait on a specific pub-sub notification. + static constexpr double PUB_SUB_MAXIMUM_DATA_WAIT = 60.0; // seconds + + // Pub-sub keyspace pattern + static constexpr const char *KEYSPACE_PATTERN = "__key*__:*"; + + // In Redis, by default keyspace events notifications are disabled because while not + // very sensible the feature uses some CPU power. Notifications are enabled using + // the notify-keyspace-events of redis.conf or via the CONFIG SET. + // In order to enable the feature a non-empty string is used, composed of multiple characters, + // where every character has a special meaning according to the following table: + // K - Keyspace events, published with __keyspace@__ prefix. + // E - Keyevent events, published with __keyevent@__ prefix. + // g - Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ... + // $ - String commands + // l - List commands + // s - Set commands + // h - Hash commands + // z - Sorted set commands + // x - Expired events (events generated every time a key expires) + // e - Evicted events (events generated when a key is evicted for maxmemory) + // A - Alias for g$lshzxe, so that the "AKE" string means all the events. + // ACS Redis db mainly uses hash, therefore h is selected. + static constexpr const char *KEYSPACE_EVENTS = "KEA"; + + std::unordered_map> keyspace_notification_channels; + + std::unordered_map m_redisClient; + + std::string m_unix_socket_path; + std::string m_host = "127.0.0.1"; + int m_port = 6379; +}; + +} diff --git a/common/notificationproducer.cpp b/common/notificationproducer.cpp index ca663c251..65587ffca 100644 --- a/common/notificationproducer.cpp +++ b/common/notificationproducer.cpp @@ -19,8 +19,5 @@ int64_t swss::NotificationProducer::send(const std::string &op, const std::strin SWSS_LOG_DEBUG("channel %s, publish: %s", m_channel.c_str(), msg.c_str()); - RedisCommand publish; - publish.format("PUBLISH %s %s", m_channel.c_str(), msg.c_str()); - RedisReply r(m_db, publish, REDIS_REPLY_INTEGER); - return r.getReply(); + return m_db->publish(m_channel, msg); } diff --git a/common/redisapi.h b/common/redisapi.h index d8a7c78af..720c19c5a 100644 --- a/common/redisapi.h +++ b/common/redisapi.h @@ -8,6 +8,8 @@ #include #include "logger.h" #include "rediscommand.h" +#include "redisreply.h" +#include "dbconnector.h" #ifdef HAVE_CONFIG_H #include @@ -15,13 +17,13 @@ namespace swss { -static inline std::string loadRedisScript(DBConnector* db, const std::string& script) +static inline std::string loadRedisScript(RedisContext* ctx, const std::string& script) { SWSS_LOG_ENTER(); RedisCommand loadcmd; loadcmd.format("SCRIPT LOAD %s", script.c_str()); - RedisReply r(db, loadcmd, REDIS_REPLY_STRING); + RedisReply r(ctx, loadcmd, REDIS_REPLY_STRING); std::string sha = r.getReply(); @@ -64,7 +66,7 @@ static inline std::string loadLuaScript(const std::string& path) return readTextFile("/usr/share/swss/" + path); } -static inline std::set runRedisScript(DBConnector &db, const std::string& sha, +static inline std::set runRedisScript(RedisContext &ctx, const std::string& sha, const std::vector& keys, const std::vector& argv) { SWSS_LOG_ENTER(); @@ -95,24 +97,24 @@ static inline std::set runRedisScript(DBConnector &db, const std::s std::set ret; try { - RedisReply r(&db, command); - auto ctx = r.getContext(); + RedisReply r(&ctx, command); + auto reply = r.getContext(); SWSS_LOG_DEBUG("Running lua script %s", sha.c_str()); - if (ctx->type == REDIS_REPLY_NIL) + if (reply->type == REDIS_REPLY_NIL) { - SWSS_LOG_ERROR("Got EMPTY response type from redis %d", ctx->type); + SWSS_LOG_ERROR("Got EMPTY response type from redis %d", reply->type); } - else if (ctx->type != REDIS_REPLY_ARRAY) + else if (reply->type != REDIS_REPLY_ARRAY) { - SWSS_LOG_ERROR("Got invalid response type from redis %d", ctx->type); + SWSS_LOG_ERROR("Got invalid response type from redis %d", reply->type); } else { - for (size_t i = 0; i < ctx->elements; i++) + for (size_t i = 0; i < reply->elements; i++) { - SWSS_LOG_DEBUG("Got element %zu %s", i, ctx->element[i]->str); - ret.emplace(ctx->element[i]->str); + SWSS_LOG_DEBUG("Got element %zu %s", i, reply->element[i]->str); + ret.emplace(reply->element[i]->str); } } } diff --git a/common/redisreply.cpp b/common/redisreply.cpp index 81ed92f9f..7fd90e89f 100644 --- a/common/redisreply.cpp +++ b/common/redisreply.cpp @@ -28,9 +28,9 @@ inline void guard(FUNC func, const char* command) } } -RedisReply::RedisReply(DBConnector *db, const RedisCommand& command) +RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command) { - int rc = redisAppendFormattedCommand(db->getContext(), command.c_str(), command.length()); + int rc = redisAppendFormattedCommand(ctx->getContext(), command.c_str(), command.length()); if (rc != REDIS_OK) { // The only reason of error is REDIS_ERR_OOM (Out of memory) @@ -38,17 +38,17 @@ RedisReply::RedisReply(DBConnector *db, const RedisCommand& command) throw bad_alloc(); } - rc = redisGetReply(db->getContext(), (void**)&m_reply); + rc = redisGetReply(ctx->getContext(), (void**)&m_reply); if (rc != REDIS_OK) { - throw RedisError("Failed to redisGetReply with " + string(command.c_str()), db->getContext()); + throw RedisError("Failed to redisGetReply with " + string(command.c_str()), ctx->getContext()); } guard([&]{checkReply();}, command.c_str()); } -RedisReply::RedisReply(DBConnector *db, const string &command) +RedisReply::RedisReply(RedisContext *ctx, const string& command) { - int rc = redisAppendCommand(db->getContext(), command.c_str()); + int rc = redisAppendCommand(ctx->getContext(), command.c_str()); if (rc != REDIS_OK) { // The only reason of error is REDIS_ERR_OOM (Out of memory) @@ -56,22 +56,22 @@ RedisReply::RedisReply(DBConnector *db, const string &command) throw bad_alloc(); } - rc = redisGetReply(db->getContext(), (void**)&m_reply); + rc = redisGetReply(ctx->getContext(), (void**)&m_reply); if (rc != REDIS_OK) { - throw RedisError("Failed to redisGetReply with " + command, db->getContext()); + throw RedisError("Failed to redisGetReply with " + command, ctx->getContext()); } guard([&]{checkReply();}, command.c_str()); } -RedisReply::RedisReply(DBConnector *db, const RedisCommand& command, int expectedType) - : RedisReply(db, command) +RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command, int expectedType) + : RedisReply(ctx, command) { guard([&]{checkReplyType(expectedType);}, command.c_str()); } -RedisReply::RedisReply(DBConnector *db, const string &command, int expectedType) - : RedisReply(db, command) +RedisReply::RedisReply(RedisContext *ctx, const string& command, int expectedType) + : RedisReply(ctx, command) { guard([&]{checkReplyType(expectedType);}, command.c_str()); } diff --git a/common/redisreply.h b/common/redisreply.h index 33b5b0d12..8bf8487a5 100644 --- a/common/redisreply.h +++ b/common/redisreply.h @@ -8,7 +8,7 @@ namespace swss { -class DBConnector; +class RedisContext; class RedisError : public std::runtime_error { @@ -40,8 +40,8 @@ class RedisReply * Send a new command to redis and wait for reply * No reply type specified. */ - RedisReply(DBConnector *db, const RedisCommand& command); - RedisReply(DBConnector *db, const std::string &command); + RedisReply(RedisContext *ctx, const RedisCommand& command); + RedisReply(RedisContext *ctx, const std::string& command); /* * Send a new command to redis and waits for reply * The reply must be one of REDIS_REPLY_* format (e.g. REDIS_REPLY_STATUS, @@ -49,8 +49,8 @@ class RedisReply * isFormatted - Set to true if the command is already formatted in redis * protocol */ - RedisReply(DBConnector *db, const RedisCommand& command, int expectedType); - RedisReply(DBConnector *db, const std::string &command, int expectedType); + RedisReply(RedisContext *ctx, const RedisCommand& command, int expectedType); + RedisReply(RedisContext *ctx, const std::string& command, int expectedType); /* auto_ptr for native structue (Free the memory on destructor) */ RedisReply(redisReply *reply); diff --git a/common/redisselect.cpp b/common/redisselect.cpp index 8d0283ab5..a2dec94a1 100644 --- a/common/redisselect.cpp +++ b/common/redisselect.cpp @@ -78,23 +78,18 @@ void RedisSelect::subscribe(DBConnector* db, const std::string &channelName) m_subscribe.reset(db->newConnector(SUBSCRIBE_TIMEOUT)); /* Send SUBSCRIBE #channel command */ - std::string s("SUBSCRIBE "); - s += channelName; - RedisReply r(m_subscribe.get(), s, REDIS_REPLY_ARRAY); + m_subscribe->subscribe(channelName); } /* PSUBSCRIBE */ void RedisSelect::psubscribe(DBConnector* db, const std::string &channelName) { m_subscribe.reset(db->newConnector(SUBSCRIBE_TIMEOUT)); - /* * Send PSUBSCRIBE #channel command on the * non-blocking subscriber DBConnector */ - std::string s("PSUBSCRIBE "); - s += channelName; - RedisReply r(m_subscribe.get(), s, REDIS_REPLY_ARRAY); + m_subscribe->psubscribe(channelName); } void RedisSelect::setQueueLength(long long int queueLength) diff --git a/common/redistran.cpp b/common/redistran.cpp index 73d697cea..8c3c87936 100644 --- a/common/redistran.cpp +++ b/common/redistran.cpp @@ -1,4 +1,5 @@ #include "redistran.h" +#include "dbconnector.h" namespace swss { diff --git a/common/redistran.h b/common/redistran.h index 00b852f94..e02a5deaa 100644 --- a/common/redistran.h +++ b/common/redistran.h @@ -2,7 +2,7 @@ #include #include -#include "redisreply.h" +#include "dbconnector.h" #include "rediscommand.h" #include "logger.h" diff --git a/common/sonicv2connector.cpp b/common/sonicv2connector.cpp new file mode 100644 index 000000000..8ca1af634 --- /dev/null +++ b/common/sonicv2connector.cpp @@ -0,0 +1,110 @@ +#include "sonicv2connector.h" +#include "dbconnector.h" +#include "logger.h" + +using namespace swss; + +SonicV2Connector::SonicV2Connector(bool use_unix_socket_path, const char *netns) + : m_use_unix_socket_path(use_unix_socket_path) + , m_netns(netns) +{ +} + +std::string SonicV2Connector::getNamespace() const +{ + return m_netns; +} + +void SonicV2Connector::connect(const std::string& db_name, bool retry_on) +{ + if (m_use_unix_socket_path) + { + m_dbintf.set_redis_kwargs(get_db_socket(db_name), "", 0); + } + else + { + m_dbintf.set_redis_kwargs("", get_db_hostname(db_name), get_db_port(db_name)); + } + int db_id = get_dbid(db_name); + m_dbintf.connect(db_id, db_name, retry_on); +} + +void SonicV2Connector::close(const std::string& db_name) +{ + m_dbintf.close(db_name); +} + +std::vector SonicV2Connector::get_db_list() +{ + return SonicDBConfig::getDbList(m_netns); +} + +int SonicV2Connector::get_dbid(const std::string& db_name) +{ + return SonicDBConfig::getDbId(db_name, m_netns); +} + +std::string SonicV2Connector::get_db_separator(const std::string& db_name) +{ + return SonicDBConfig::getSeparator(db_name, m_netns); +} + +DBConnector& SonicV2Connector::get_redis_client(const std::string& db_name) +{ + return m_dbintf.get_redis_client(db_name); +} + +int64_t SonicV2Connector::publish(const std::string& db_name, const std::string& channel, const std::string& message) +{ + return m_dbintf.publish(db_name, channel, message); +} + +bool SonicV2Connector::exists(const std::string& db_name, const std::string& key) +{ + return m_dbintf.exists(db_name, key); +} + +std::vector SonicV2Connector::keys(const std::string& db_name, const char *pattern) +{ + return m_dbintf.keys(db_name, pattern); +} + +std::string SonicV2Connector::get(const std::string& db_name, const std::string& _hash, const std::string& key) +{ + return m_dbintf.get(db_name, _hash, key); +} + +std::map SonicV2Connector::get_all(const std::string& db_name, const std::string& _hash) +{ + return m_dbintf.get_all(db_name, _hash); +} + +int64_t SonicV2Connector::set(const std::string& db_name, const std::string& _hash, const std::string& key, const std::string& val) +{ + return m_dbintf.set(db_name, _hash, key, val); +} + +int64_t SonicV2Connector::del(const std::string& db_name, const std::string& key) +{ + return m_dbintf.del(db_name, key); +} + +void SonicV2Connector::delete_all_by_pattern(const std::string& db_name, const std::string& pattern) +{ + m_dbintf.delete_all_by_pattern(db_name, pattern); +} + +std::string SonicV2Connector::get_db_socket(const std::string& db_name) +{ + return SonicDBConfig::getDbSock(db_name, m_netns); +} + +std::string SonicV2Connector::get_db_hostname(const std::string& db_name) +{ + return SonicDBConfig::getDbHostname(db_name, m_netns); +} + +int SonicV2Connector::get_db_port(const std::string& db_name) +{ + return SonicDBConfig::getDbPort(db_name, m_netns); +} diff --git a/common/sonicv2connector.h b/common/sonicv2connector.h new file mode 100644 index 000000000..40a98c564 --- /dev/null +++ b/common/sonicv2connector.h @@ -0,0 +1,78 @@ +#pragma once + +#include +#include + +#include "dbinterface.h" + +namespace swss +{ + +class SonicV2Connector +{ +public: + SonicV2Connector(bool use_unix_socket_path = false, const char *netns = ""); + + std::string getNamespace() const; + +#ifdef SWIG + %pythoncode %{ + __swig_getmethods__["namespace"] = getNamespace + __swig_setmethods__["namespace"] = None + if _newclass: namespace = property(getNamespace, None) + %} +#endif + + void connect(const std::string& db_name, bool retry_on = true); + + void close(const std::string& db_name); + + std::vector get_db_list(); + + int get_dbid(const std::string& db_name); + + std::string get_db_separator(const std::string& db_name); + + DBConnector& get_redis_client(const std::string& db_name); + + int64_t publish(const std::string& db_name, const std::string& channel, const std::string& message); + + bool exists(const std::string& db_name, const std::string& key); + + std::vector keys(const std::string& db_name, const char *pattern="*"); + + std::string get(const std::string& db_name, const std::string& _hash, const std::string& key); + + std::map get_all(const std::string& db_name, const std::string& _hash); + + int64_t set(const std::string& db_name, const std::string& _hash, const std::string& key, const std::string& val); + + int64_t del(const std::string& db_name, const std::string& key); + + void delete_all_by_pattern(const std::string& db_name, const std::string& pattern); + +private: + std::string get_db_socket(const std::string& db_name); + + std::string get_db_hostname(const std::string& db_name); + + int get_db_port(const std::string& db_name); + + DBInterface m_dbintf; + bool m_use_unix_socket_path; + std::string m_netns; +}; + +#ifdef SWIG +// TRICK! +// Note: there is no easy way for SWIG to map ctor parameter netns(C++) to namespace(python), +// so we use python patch to achieve this +// TODO: implement it with formal SWIG syntax, which will be target language independent +%pythoncode %{ + _old_SonicV2Connector__init__ = SonicV2Connector.__init__ + def _new_SonicV2Connector__init__(self, use_unix_socket_path = False, namespace = None): + _old_SonicV2Connector__init__(self, use_unix_socket_path = use_unix_socket_path, netns = namespace) + SonicV2Connector.__init__ = _new_SonicV2Connector__init__ +%} +#endif +} diff --git a/pyext/py2/Makefile.am b/pyext/py2/Makefile.am index abf5064a7..b01c75262 100644 --- a/pyext/py2/Makefile.am +++ b/pyext/py2/Makefile.am @@ -9,6 +9,6 @@ _swsscommon_la_LDFLAGS = -module _swsscommon_la_LIBADD = ../../common/libswsscommon.la -lpython$(PYTHON_VERSION) swsscommon_wrap.cpp: $(SWIG_SOURCES) - $(SWIG) -Wall -c++ -python -I../../common -o $@ $< + $(SWIG) -Wall -c++ -python -keyword -I../../common -o $@ $< CLEANFILES = swsscommon_wrap.cpp diff --git a/pyext/py3/Makefile.am b/pyext/py3/Makefile.am index d6f51b2be..b1a46b3d6 100644 --- a/pyext/py3/Makefile.am +++ b/pyext/py3/Makefile.am @@ -9,6 +9,6 @@ _swsscommon_la_LDFLAGS = -module _swsscommon_la_LIBADD = ../../common/libswsscommon.la $(PYTHON3_BLDLIBRARY) swsscommon_wrap.cpp: $(SWIG_SOURCES) - $(SWIG) -Wall -c++ -python -I../../common -o $@ $< + $(SWIG) -Wall -c++ -python -keyword -I../../common -o $@ $< CLEANFILES = swsscommon_wrap.cpp diff --git a/pyext/swsscommon.i b/pyext/swsscommon.i index 04aedaa3f..c0102c3c5 100644 --- a/pyext/swsscommon.i +++ b/pyext/swsscommon.i @@ -3,6 +3,8 @@ %{ #include "schema.h" #include "dbconnector.h" +#include "dbinterface.h" +#include "sonicv2connector.h" #include "select.h" #include "selectable.h" #include "rediscommand.h" @@ -24,11 +26,13 @@ %include %include %include +%include %include %include %template(FieldValuePair) std::pair; %template(FieldValuePairs) std::vector>; +%template(FieldValueMap) std::map; %template(VectorString) std::vector; %apply int *OUTPUT {int *fd}; @@ -55,6 +59,7 @@ swss::RedisSelect *CastSelectableToRedisSelectObj(swss::Selectable *temp) { %include "schema.h" %include "dbconnector.h" +%include "sonicv2connector.h" %include "selectable.h" %include "select.h" %include "rediscommand.h" @@ -64,9 +69,11 @@ swss::RedisSelect *CastSelectableToRedisSelectObj(swss::Selectable *temp) { %apply std::vector& OUTPUT {std::vector &keys}; %apply std::vector>& OUTPUT {std::vector> &ovalues}; +%apply std::string& OUTPUT {std::string &value}; %include "table.h" %clear std::vector &keys; %clear std::vector> &values; +%clear std::string &value; %include "producertable.h" %include "producerstatetable.h" @@ -93,3 +100,4 @@ swss::RedisSelect *CastSelectableToRedisSelectObj(swss::Selectable *temp) { %include "notificationproducer.h" %include "warm_restart.h" +%include "dbinterface.h" diff --git a/tests/redis_ut.cpp b/tests/redis_ut.cpp index 917020eec..9d8becf00 100644 --- a/tests/redis_ut.cpp +++ b/tests/redis_ut.cpp @@ -13,6 +13,8 @@ #include "common/selectabletimer.h" #include "common/table.h" #include "common/redisclient.h" +#include "common/dbinterface.h" +#include "common/sonicv2connector.h" using namespace std; using namespace swss; @@ -311,6 +313,21 @@ TEST(DBConnector, RedisClientName) EXPECT_EQ(db.getClientName(), client_name); } +TEST(DBConnector, DBInterface) +{ + DBInterface dbintf; + dbintf.set_redis_kwargs("", "127.0.0.1", 6379); + dbintf.connect(15, "TEST_DB"); + + SonicV2Connector db; + db.connect("TEST_DB"); + db.set("TEST_DB", "key0", "field1", "value2"); + auto fvs = db.get_all("TEST_DB", "key0"); + auto rc = fvs.find("field1"); + EXPECT_NE(rc, fvs.end()); + EXPECT_EQ(rc->second, "value2"); +} + TEST(DBConnector, RedisClient) { DBConnector db("TEST_DB", 0, true); diff --git a/tests/test_redis_ut.py b/tests/test_redis_ut.py index aecf0a19a..2db614cd3 100644 --- a/tests/test_redis_ut.py +++ b/tests/test_redis_ut.py @@ -1,7 +1,15 @@ import time +import pytest from threading import Thread from pympler.tracker import SummaryTracker from swsscommon import swsscommon +from swsscommon.swsscommon import DBInterface, SonicV2Connector, SonicDBConfig + +existing_file = "./tests/redis_multi_db_ut_config/database_config.json" + +@pytest.fixture(scope="session", autouse=True) +def prepare(request): + SonicDBConfig.initialize(existing_file) def test_ProducerTable(): db = swsscommon.DBConnector("APPL_DB", 0, True) @@ -122,3 +130,17 @@ def generator_SelectMemoryLeak(): cases.append("%s - %d objects for %d repeats" % (name, count, N)) thr.join() assert not cases + + +def test_DBInterface(): + dbintf = DBInterface() + dbintf.set_redis_kwargs("", "127.0.0.1", 6379) + dbintf.connect(15, "TEST_DB") + + db = SonicV2Connector(use_unix_socket_path=True, namespace='') + assert db.namespace == '' + db.connect("TEST_DB") + db.set("TEST_DB", "key0", "field1", "value2") + fvs = db.get_all("TEST_DB", "key0") + assert "field1" in fvs + assert fvs["field1"] == "value2"