Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ libswsscommon_la_SOURCES = \
macaddress.cpp \
netdispatcher.cpp \
netlink.cpp \
notificationconsumer.cpp \
notificationproducer.cpp \
linkcache.cpp

libswsscommon_la_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON)
Expand Down
140 changes: 140 additions & 0 deletions common/notificationconsumer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#include "notificationconsumer.h"

#include <iostream>

#define NOTIFICATION_SUBSCRIBE_TIMEOUT (1000)
#define REDIS_PUBLISH_MESSAGE_INDEX (2)
#define REDIS_PUBLISH_MESSAGE_ELEMNTS (3)

swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, std::string channel):
m_db(db),
m_subscribe(NULL),
m_channel(channel)
{
SWSS_LOG_ENTER();

while (true)
{
try
{
subscribe();
break;
}
catch(...)
{
delete m_subscribe;

SWSS_LOG_ERROR("failed to subscribe on %s", m_channel.c_str());
}
}
}

swss::NotificationConsumer::~NotificationConsumer()
{
delete m_subscribe;
}

void swss::NotificationConsumer::subscribe()
{
SWSS_LOG_ENTER();

/* Create new new context to DB */
if (m_db->getContext()->connection_type == REDIS_CONN_TCP)
m_subscribe = new DBConnector(m_db->getDB(),
m_db->getContext()->tcp.host,
m_db->getContext()->tcp.port,
NOTIFICATION_SUBSCRIBE_TIMEOUT);
else
m_subscribe = new DBConnector(m_db->getDB(),
m_db->getContext()->unix_sock.path,
NOTIFICATION_SUBSCRIBE_TIMEOUT);

std::string s = "SUBSCRIBE " + m_channel;

RedisReply r(m_subscribe, s, REDIS_REPLY_ARRAY);

SWSS_LOG_INFO("subscribed to %s", m_channel.c_str());
}

void swss::NotificationConsumer::addFd(fd_set *fd)
{
SWSS_LOG_ENTER();

FD_SET(m_subscribe->getContext()->fd, fd);
}

int swss::NotificationConsumer::readCache()
{
SWSS_LOG_ENTER();

redisReply *reply = NULL;

if (redisGetReplyFromReader(m_subscribe->getContext(), (void**)&reply) != REDIS_OK)
{
SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str());

return Selectable::ERROR;
}
else if (reply != NULL)
{
freeReplyObject(reply);

return Selectable::DATA;
}

return Selectable::NODATA;
}

void swss::NotificationConsumer::readMe()
{
SWSS_LOG_ENTER();

redisReply *reply = NULL;

if (redisGetReply(m_subscribe->getContext(), (void**)&reply) != REDIS_OK)
{
SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str());

throw std::runtime_error("Unable to read redis reply");
}

if (reply->type != REDIS_REPLY_ARRAY)
{
SWSS_LOG_ERROR("expected ARRAY redis reply on channel %s, got: %d", m_channel.c_str(), reply->type);

throw std::runtime_error("getRedisReply operation failed");
}

if (reply->elements != REDIS_PUBLISH_MESSAGE_ELEMNTS)
{
SWSS_LOG_ERROR("expected %d elements in redis reply on channel %s, got: %u",
REDIS_PUBLISH_MESSAGE_ELEMNTS,
m_channel.c_str(),
reply->elements);

throw std::runtime_error("getRedisReply operation failed");
}

m_msg = std::string(reply->element[REDIS_PUBLISH_MESSAGE_INDEX]->str);

SWSS_LOG_DEBUG("got message: %s", m_msg.c_str());

freeReplyObject(reply);
}

bool swss::NotificationConsumer::isMe(fd_set *fd)
{
return FD_ISSET(m_subscribe->getContext()->fd, fd);
}

void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector<FieldValueTuple> &values)
{
JSon::readJson(m_msg, values);

FieldValueTuple fvt = values.at(0);

op = fvField(fvt);
data = fvValue(fvt);

values.erase(values.begin());
}
49 changes: 49 additions & 0 deletions common/notificationconsumer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifndef __NOTIFICATIONCONSUMER__
#define __NOTIFICATIONCONSUMER__

#include <string>
#include <vector>

#include <hiredis/hiredis.h>

#include "dbconnector.h"
#include "json.h"
#include "logger.h"
#include "redisreply.h"
#include "scheme.h"
#include "selectable.h"
#include "table.h"

namespace swss {

class NotificationConsumer : public Selectable
{
public:
NotificationConsumer(swss::DBConnector *db, std::string channel);

void pop(std::string &op, std::string &data, std::vector<FieldValueTuple> &values);

virtual ~NotificationConsumer();

virtual void addFd(fd_set *fd);
virtual bool isMe(fd_set *fd);
virtual int readCache();
virtual void readMe();

private:

NotificationConsumer(const NotificationConsumer &other);
NotificationConsumer& operator = (const NotificationConsumer &other);

void subscribe();

swss::DBConnector *m_db;
swss::DBConnector *m_subscribe;
std::string m_channel;
std::string m_msg;
};

}

#endif // __NOTIFICATIONCONSUMER__

39 changes: 39 additions & 0 deletions common/notificationproducer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include "notificationproducer.h"

swss::NotificationProducer::NotificationProducer(swss::DBConnector *db, std::string channel):
m_db(db), m_channel(channel)
{
}

void swss::NotificationProducer::send(std::string op, std::string data, std::vector<FieldValueTuple> &values)
{
SWSS_LOG_ENTER();

FieldValueTuple opdata(op, data);

values.insert(values.begin(), opdata);

std::string msg = JSon::buildJson(values);

values.erase(values.begin());

SWSS_LOG_DEBUG("channel %s, publish: %s", m_channel.c_str(), msg.c_str());

char *temp;
int len = redisFormatCommand(&temp, "PUBLISH %s %s", m_channel.c_str(), msg.c_str());
std::string publish(temp, len);
free(temp);

RedisReply r(m_db, publish, REDIS_REPLY_INTEGER, true);

if (r.getContext()->type != REDIS_REPLY_INTEGER)
{
SWSS_LOG_ERROR("PUBLISH %s '%s' failed, got type %d, expected %d",
m_channel.c_str(),
msg.c_str(),
r.getContext()->type,
REDIS_REPLY_INTEGER);

throw std::runtime_error("PUBLISH operation failed");
}
}
35 changes: 35 additions & 0 deletions common/notificationproducer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#ifndef __NOTIFICATIONPRODUCER__
#define __NOTIFICATIONPRODUCER__

#include <string>
#include <vector>

#include <hiredis/hiredis.h>
#include "dbconnector.h"
#include "scheme.h"
#include "logger.h"
#include "table.h"
#include "redisreply.h"
#include "json.h"

namespace swss {

class NotificationProducer
{
public:
NotificationProducer(swss::DBConnector *db, std::string channel);

void send(std::string op, std::string data, std::vector<FieldValueTuple> &values);

private:

NotificationProducer(const NotificationProducer &other);
NotificationProducer& operator = (const NotificationProducer &other);

swss::DBConnector *m_db;
std::string m_channel;
};

}

#endif // __NOTIFICATIONPRODUCER__