diff --git a/common/Makefile.am b/common/Makefile.am index 5559cdadb..9df9bc76c 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -23,6 +23,8 @@ libswsscommon_la_SOURCES = \ macaddress.cpp \ netdispatcher.cpp \ netlink.cpp \ + notificationconsumer.cpp \ + notificationproducer.cpp \ linkcache.cpp libswsscommon_la_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) diff --git a/common/notificationconsumer.cpp b/common/notificationconsumer.cpp new file mode 100644 index 000000000..b2a948058 --- /dev/null +++ b/common/notificationconsumer.cpp @@ -0,0 +1,140 @@ +#include "notificationconsumer.h" + +#include + +#define NOTIFICATION_SUBSCRIBE_TIMEOUT (1000) +#define REDIS_PUBLISH_MESSAGE_INDEX (2) +#define REDIS_PUBLISH_MESSAGE_ELEMNTS (3) + +swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, std::string channel): + m_db(db), + m_subscribe(NULL), + m_channel(channel) +{ + SWSS_LOG_ENTER(); + + while (true) + { + try + { + subscribe(); + break; + } + catch(...) + { + delete m_subscribe; + + SWSS_LOG_ERROR("failed to subscribe on %s", m_channel.c_str()); + } + } +} + +swss::NotificationConsumer::~NotificationConsumer() +{ + delete m_subscribe; +} + +void swss::NotificationConsumer::subscribe() +{ + SWSS_LOG_ENTER(); + + /* Create new new context to DB */ + if (m_db->getContext()->connection_type == REDIS_CONN_TCP) + m_subscribe = new DBConnector(m_db->getDB(), + m_db->getContext()->tcp.host, + m_db->getContext()->tcp.port, + NOTIFICATION_SUBSCRIBE_TIMEOUT); + else + m_subscribe = new DBConnector(m_db->getDB(), + m_db->getContext()->unix_sock.path, + NOTIFICATION_SUBSCRIBE_TIMEOUT); + + std::string s = "SUBSCRIBE " + m_channel; + + RedisReply r(m_subscribe, s, REDIS_REPLY_ARRAY); + + SWSS_LOG_INFO("subscribed to %s", m_channel.c_str()); +} + +void swss::NotificationConsumer::addFd(fd_set *fd) +{ + SWSS_LOG_ENTER(); + + FD_SET(m_subscribe->getContext()->fd, fd); +} + +int swss::NotificationConsumer::readCache() +{ + SWSS_LOG_ENTER(); + + redisReply *reply = NULL; + + if (redisGetReplyFromReader(m_subscribe->getContext(), (void**)&reply) != REDIS_OK) + { + SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str()); + + return Selectable::ERROR; + } + else if (reply != NULL) + { + freeReplyObject(reply); + + return Selectable::DATA; + } + + return Selectable::NODATA; +} + +void swss::NotificationConsumer::readMe() +{ + SWSS_LOG_ENTER(); + + redisReply *reply = NULL; + + if (redisGetReply(m_subscribe->getContext(), (void**)&reply) != REDIS_OK) + { + SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str()); + + throw std::runtime_error("Unable to read redis reply"); + } + + if (reply->type != REDIS_REPLY_ARRAY) + { + SWSS_LOG_ERROR("expected ARRAY redis reply on channel %s, got: %d", m_channel.c_str(), reply->type); + + throw std::runtime_error("getRedisReply operation failed"); + } + + if (reply->elements != REDIS_PUBLISH_MESSAGE_ELEMNTS) + { + SWSS_LOG_ERROR("expected %d elements in redis reply on channel %s, got: %u", + REDIS_PUBLISH_MESSAGE_ELEMNTS, + m_channel.c_str(), + reply->elements); + + throw std::runtime_error("getRedisReply operation failed"); + } + + m_msg = std::string(reply->element[REDIS_PUBLISH_MESSAGE_INDEX]->str); + + SWSS_LOG_DEBUG("got message: %s", m_msg.c_str()); + + freeReplyObject(reply); +} + +bool swss::NotificationConsumer::isMe(fd_set *fd) +{ + return FD_ISSET(m_subscribe->getContext()->fd, fd); +} + +void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector &values) +{ + JSon::readJson(m_msg, values); + + FieldValueTuple fvt = values.at(0); + + op = fvField(fvt); + data = fvValue(fvt); + + values.erase(values.begin()); +} diff --git a/common/notificationconsumer.h b/common/notificationconsumer.h new file mode 100644 index 000000000..497b3eccd --- /dev/null +++ b/common/notificationconsumer.h @@ -0,0 +1,49 @@ +#ifndef __NOTIFICATIONCONSUMER__ +#define __NOTIFICATIONCONSUMER__ + +#include +#include + +#include + +#include "dbconnector.h" +#include "json.h" +#include "logger.h" +#include "redisreply.h" +#include "scheme.h" +#include "selectable.h" +#include "table.h" + +namespace swss { + +class NotificationConsumer : public Selectable +{ +public: + NotificationConsumer(swss::DBConnector *db, std::string channel); + + void pop(std::string &op, std::string &data, std::vector &values); + + virtual ~NotificationConsumer(); + + virtual void addFd(fd_set *fd); + virtual bool isMe(fd_set *fd); + virtual int readCache(); + virtual void readMe(); + +private: + + NotificationConsumer(const NotificationConsumer &other); + NotificationConsumer& operator = (const NotificationConsumer &other); + + void subscribe(); + + swss::DBConnector *m_db; + swss::DBConnector *m_subscribe; + std::string m_channel; + std::string m_msg; +}; + +} + +#endif // __NOTIFICATIONCONSUMER__ + diff --git a/common/notificationproducer.cpp b/common/notificationproducer.cpp new file mode 100644 index 000000000..239c0ff59 --- /dev/null +++ b/common/notificationproducer.cpp @@ -0,0 +1,39 @@ +#include "notificationproducer.h" + +swss::NotificationProducer::NotificationProducer(swss::DBConnector *db, std::string channel): + m_db(db), m_channel(channel) +{ +} + +void swss::NotificationProducer::send(std::string op, std::string data, std::vector &values) +{ + SWSS_LOG_ENTER(); + + FieldValueTuple opdata(op, data); + + values.insert(values.begin(), opdata); + + std::string msg = JSon::buildJson(values); + + values.erase(values.begin()); + + SWSS_LOG_DEBUG("channel %s, publish: %s", m_channel.c_str(), msg.c_str()); + + char *temp; + int len = redisFormatCommand(&temp, "PUBLISH %s %s", m_channel.c_str(), msg.c_str()); + std::string publish(temp, len); + free(temp); + + RedisReply r(m_db, publish, REDIS_REPLY_INTEGER, true); + + if (r.getContext()->type != REDIS_REPLY_INTEGER) + { + SWSS_LOG_ERROR("PUBLISH %s '%s' failed, got type %d, expected %d", + m_channel.c_str(), + msg.c_str(), + r.getContext()->type, + REDIS_REPLY_INTEGER); + + throw std::runtime_error("PUBLISH operation failed"); + } +} diff --git a/common/notificationproducer.h b/common/notificationproducer.h new file mode 100644 index 000000000..f649d2e59 --- /dev/null +++ b/common/notificationproducer.h @@ -0,0 +1,35 @@ +#ifndef __NOTIFICATIONPRODUCER__ +#define __NOTIFICATIONPRODUCER__ + +#include +#include + +#include +#include "dbconnector.h" +#include "scheme.h" +#include "logger.h" +#include "table.h" +#include "redisreply.h" +#include "json.h" + +namespace swss { + +class NotificationProducer +{ +public: + NotificationProducer(swss::DBConnector *db, std::string channel); + + void send(std::string op, std::string data, std::vector &values); + +private: + + NotificationProducer(const NotificationProducer &other); + NotificationProducer& operator = (const NotificationProducer &other); + + swss::DBConnector *m_db; + std::string m_channel; +}; + +} + +#endif // __NOTIFICATIONPRODUCER__