diff --git a/cfgorch/cfgorch.cpp b/cfgorch/cfgorch.cpp new file mode 100644 index 00000000000..3cd5a83e981 --- /dev/null +++ b/cfgorch/cfgorch.cpp @@ -0,0 +1,176 @@ +#include "cfgorch.h" +#include "logger.h" +#include "subscriberstatetable.h" + +using namespace swss; + +CfgOrch::CfgOrch(DBConnector *db, string tableName) : + m_db(db) +{ + Consumer consumer(new SubscriberStateTable(m_db, tableName)); + m_consumerMap.insert(ConsumerMapPair(tableName, consumer)); +} + +CfgOrch::CfgOrch(DBConnector *db, vector &tableNames) : + m_db(db) +{ + for(auto it : tableNames) + { + Consumer consumer(new SubscriberStateTable(m_db, it)); + m_consumerMap.insert(ConsumerMapPair(it, consumer)); + } +} + +CfgOrch::~CfgOrch() +{ + for(auto &it : m_consumerMap) + delete it.second.m_consumer; +} + +vector CfgOrch::getSelectables() +{ + vector selectables; + for(auto it : m_consumerMap) { + selectables.push_back(it.second.m_consumer); + } + return selectables; +} + +bool CfgOrch::hasSelectable(TableConsumable *selectable) const +{ + for(auto it : m_consumerMap) { + if (it.second.m_consumer == selectable) { + return true; + } + } + return false; +} + +bool CfgOrch::syncCfgDB(string tableName, Table &tableConsumer) +{ + SWSS_LOG_ENTER(); + + auto consumer_it = m_consumerMap.find(tableName); + if (consumer_it == m_consumerMap.end()) + { + SWSS_LOG_ERROR("Unrecognized tableName:%s\n", tableName.c_str()); + return false; + } + Consumer& consumer = consumer_it->second; + + vector tuples; + + tableConsumer.getTableContent(tuples); + for (auto tuple : tuples) + { + string key = kfvKey(tuple); + /* Directly put it into consumer.m_toSync map */ + if (consumer.m_toSync.find(key) == consumer.m_toSync.end()) + { + consumer.m_toSync[key] = make_tuple(key, SET_COMMAND, kfvFieldsValues(tuple)); + SWSS_LOG_DEBUG("%s", (dumpTuple(consumer, tuple)).c_str()); + } + /* + * Syncing from DB directly, don't expect duplicate keys. + * Or there is pending task from consumber state pipe, in this case just skip it. + */ + else + { + SWSS_LOG_WARN("Duplicate key %s found in tableName:%s\n", key.c_str(), tableName.c_str()); + continue; + } + doTask(consumer); + } + return true; +} + +bool CfgOrch::execute(string tableName) +{ + SWSS_LOG_ENTER(); + + auto consumer_it = m_consumerMap.find(tableName); + if (consumer_it == m_consumerMap.end()) + { + SWSS_LOG_ERROR("Unrecognized tableName:%s\n", tableName.c_str()); + return false; + } + Consumer& consumer = consumer_it->second; + + int data_popped = 0; + while (1) + { + KeyOpFieldsValuesTuple new_data; + consumer.m_consumer->pop(new_data); + + string key = kfvKey(new_data); + string op = kfvOp(new_data); + /* + * Done with all new data. Or + * possible nothing popped, ie. the oparation is already merged with other operations + */ + if (op.empty()) + { + SWSS_LOG_DEBUG("Number of kfv data popped: %d\n", data_popped); + break; + } + data_popped++; + SWSS_LOG_DEBUG("%s", (dumpTuple(consumer, new_data)).c_str()); + + /* If a new task comes or if a DEL task comes, we directly put it into consumer.m_toSync map */ + if (consumer.m_toSync.find(key) == consumer.m_toSync.end() || op == DEL_COMMAND) + { + consumer.m_toSync[key] = new_data; + } + /* If an old task is still there, we combine the old task with new task */ + else + { + KeyOpFieldsValuesTuple existing_data = consumer.m_toSync[key]; + + auto new_values = kfvFieldsValues(new_data); + auto existing_values = kfvFieldsValues(existing_data); + + for (auto it : new_values) + { + string field = fvField(it); + string value = fvValue(it); + + auto iu = existing_values.begin(); + while (iu != existing_values.end()) + { + string ofield = fvField(*iu); + if (field == ofield) + iu = existing_values.erase(iu); + else + iu++; + } + existing_values.push_back(FieldValueTuple(field, value)); + } + consumer.m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values); + } + } + if (!consumer.m_toSync.empty()) + doTask(consumer); + + return true; +} + +void CfgOrch::doTask() +{ + for(auto &it : m_consumerMap) + { + if (!it.second.m_toSync.empty()) + doTask(it.second); + } +} + +string CfgOrch::dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple) +{ + string s = consumer.m_consumer->getTableName() + ":" + kfvKey(tuple) + + "|" + kfvOp(tuple); + for (auto i = kfvFieldsValues(tuple).begin(); i != kfvFieldsValues(tuple).end(); i++) + { + s += "|" + fvField(*i) + ":" + fvValue(*i); + } + + return s; +} diff --git a/cfgorch/cfgorch.h b/cfgorch/cfgorch.h new file mode 100644 index 00000000000..e6583aaf4f9 --- /dev/null +++ b/cfgorch/cfgorch.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include "dbconnector.h" +#include "table.h" +#include "consumertable.h" + +using namespace std; +using namespace swss; + +#define DEFAULT_KEY_SEPARATOR ":" +#define CONFIGDB_KEY_SEPARATOR "|" + +typedef map SyncMap; +struct Consumer { + Consumer(TableConsumable* consumer) : m_consumer(consumer) { } + TableConsumable* m_consumer; + /* Store the latest 'golden' status */ + SyncMap m_toSync; +}; +typedef pair ConsumerMapPair; +typedef map ConsumerMap; + +class CfgOrch +{ +public: + CfgOrch(DBConnector *db, string tableName); + CfgOrch(DBConnector *db, vector &tableNames); + virtual ~CfgOrch(); + + vector getSelectables(); + bool hasSelectable(TableConsumable* s) const; + + bool execute(string tableName); + /* Iterate all consumers in m_consumerMap and run doTask(Consumer) */ + void doTask(); + +protected: + DBConnector *m_db; + ConsumerMap m_consumerMap; + + /* Run doTask against a specific consumer */ + virtual void doTask(Consumer &consumer) = 0; + string dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple); + bool syncCfgDB(string tableName, Table &tableConsumer); +}; + diff --git a/debian/rules b/debian/rules index 9e606567300..2109854ee75 100755 --- a/debian/rules +++ b/debian/rules @@ -29,6 +29,7 @@ include /usr/share/dpkg/default.mk override_dh_auto_configure: dh_auto_configure -- + dh_auto_configure -- --enable-gtest override_dh_auto_install: dh_auto_install --destdir=debian/swss diff --git a/tests/Makefile.am b/tests/Makefile.am index d96e3f5c3ef..d71491f219d 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -1,5 +1,5 @@ CFLAGS_SAI = -I /usr/include/sai -INCLUDES = -I ../orchagent +INCLUDES = -I ../orchagent -I $(top_srcdir)/cfgorch bin_PROGRAMS = tests @@ -9,12 +9,13 @@ else DBGFLAGS = -g -DNDEBUG endif -CFLAGS_GTEST = -LDADD_GTEST = +CFLAGS_GTEST = -I $(top_srcdir)/../sonic-swss-common/googletest/googletest/include +LDADD_GTEST = $(top_srcdir)/../sonic-swss-common/googletest/build/googlemock/gtest/libgtest_main.a \ + $(top_srcdir)/../sonic-swss-common/googletest/build/googlemock/gtest/libgtest.a -tests_SOURCES = swssnet_ut.cpp +tests_SOURCES = swssnet_ut.cpp cfgorch_ut.cpp $(top_srcdir)/cfgorch/cfgorch.cpp tests_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) tests_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) tests_LDADD = $(LDADD_GTEST) -lnl-genl-3 -lhiredis -lhiredis -lpthread \ - -lswsscommon -lswsscommon -lgtest -lgtest_main + -lswsscommon -lswsscommon diff --git a/tests/cfgorch_ut.cpp b/tests/cfgorch_ut.cpp new file mode 100644 index 00000000000..5a6f576d28f --- /dev/null +++ b/tests/cfgorch_ut.cpp @@ -0,0 +1,299 @@ +#include +#include +#include "dbconnector.h" +#include "select.h" +#include "schema.h" +#include "exec.h" +#include "cfgorch.h" +#include "gtest/gtest.h" + +using namespace std; +using namespace swss; + +/* + * In the following unit test: + * cfgOrch supports orchestrating notifications from three tables in TEST_CONFIG_DB, + * and directs the notification to each target table in TEST_APP_DB + */ +#define TEST_CONFIG_DB (7) +#define TEST_APP_DB (8) + +#define SELECT_TIMEOUT 1000 + +const string cfgTableNameA1 = "CFGTABLE_A_1"; +const string cfgTableNameA2 = "CFGTABLE_A_2"; +const string cfgTableNameB = "CFGTABLE_B"; + +const string keySuffix = "_Key"; +const string keyA1 = cfgTableNameA1 + keySuffix; +const string keyA2 = cfgTableNameA2 + keySuffix; +const string keyB = cfgTableNameB + keySuffix; + +static inline void clearDB(int db) +{ + DBConnector dbc(db, "localhost", 6379, 0); + RedisReply r(&dbc, "FLUSHDB", REDIS_REPLY_STATUS); + r.checkStatusOK(); +} + +class CfgAgentA : public CfgOrch +{ +public: + CfgAgentA(DBConnector *cfgDb, DBConnector *appDb, vector tableNames): + CfgOrch(cfgDb, tableNames), + m_cfgTableA1(cfgDb, cfgTableNameA1, CONFIGDB_TABLE_NAME_SEPARATOR), + m_cfgTableA2(cfgDb, cfgTableNameA2, CONFIGDB_TABLE_NAME_SEPARATOR), + m_appTableA1(appDb, cfgTableNameA1), + m_appTableA2(appDb, cfgTableNameA2) + { + } + + void syncCfgDB() + { + CfgOrch::syncCfgDB(cfgTableNameA1, m_cfgTableA1); + CfgOrch::syncCfgDB(cfgTableNameA2, m_cfgTableA2); + } + +private: + Table m_cfgTableA1, m_cfgTableA2; + Table m_appTableA1, m_appTableA2; + + void doTask(Consumer &consumer) + { + string table_name = consumer.m_consumer->getTableName(); + string key_expected = table_name + keySuffix; + + EXPECT_TRUE(table_name == cfgTableNameA1 || table_name == cfgTableNameA2); + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple t = it->second; + + string key = kfvKey(t); + ASSERT_STREQ(key.c_str(), key_expected.c_str()); + + string op = kfvOp(t); + if (op == SET_COMMAND) + { + if (table_name == cfgTableNameA1) + { + m_appTableA1.set(kfvKey(t), kfvFieldsValues(t)); + } + else + { + m_appTableA2.set(kfvKey(t), kfvFieldsValues(t)); + } + } + else if (op == DEL_COMMAND) + { + if (table_name == cfgTableNameA1) + { + m_appTableA1.del(kfvKey(t)); + } + else + { + m_appTableA2.del(kfvKey(t)); + } + } + it = consumer.m_toSync.erase(it); + continue; + } + } +}; + +class CfgAgentB : public CfgOrch +{ +public: + CfgAgentB(DBConnector *cfgDb, DBConnector *appDb, vector tableNames): + CfgOrch(cfgDb, tableNames), + m_cfgTableB(cfgDb, cfgTableNameB, CONFIGDB_TABLE_NAME_SEPARATOR), + m_appTableB(appDb, cfgTableNameB) + { + + } + + void syncCfgDB() + { + CfgOrch::syncCfgDB(cfgTableNameB, m_cfgTableB); + } + +private: + Table m_cfgTableB; + Table m_appTableB; + + void doTask(Consumer &consumer) + { + string table_name = consumer.m_consumer->getTableName(); + string key_expected = table_name + keySuffix; + + EXPECT_TRUE(table_name == cfgTableNameB); + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple t = it->second; + + string key = kfvKey(t); + ASSERT_STREQ(key.c_str(), key_expected.c_str()); + + string op = kfvOp(t); + if (op == SET_COMMAND) + { + m_appTableB.set(kfvKey(t), kfvFieldsValues(t)); + } + else if (op == DEL_COMMAND) + { + m_appTableB.del(kfvKey(t)); + } + it = consumer.m_toSync.erase(it); + continue; + } + } +}; + +static void publisherWorkerSet() +{ + DBConnector cfgDb(TEST_CONFIG_DB, "localhost", 6379, 0); + Table tableA1(&cfgDb, cfgTableNameA1, CONFIGDB_TABLE_NAME_SEPARATOR); + Table tableA2(&cfgDb, cfgTableNameA2, CONFIGDB_TABLE_NAME_SEPARATOR); + Table tableB(&cfgDb, cfgTableNameB, CONFIGDB_TABLE_NAME_SEPARATOR); + + vector fields; + FieldValueTuple t("field", "value"); + fields.push_back(t); + + tableA1.set(keyA1, fields); + tableA2.set(keyA2, fields); + tableB.set(keyB, fields); +} + +static void publisherWorkerDel() +{ + DBConnector cfgDb(TEST_CONFIG_DB, "localhost", 6379, 0); + Table tableA1(&cfgDb, cfgTableNameA1, CONFIGDB_TABLE_NAME_SEPARATOR); + Table tableA2(&cfgDb, cfgTableNameA2, CONFIGDB_TABLE_NAME_SEPARATOR); + Table tableB(&cfgDb, cfgTableNameB, CONFIGDB_TABLE_NAME_SEPARATOR); + + tableA1.del(keyA1); + tableA2.del(keyA2); + tableB.del(keyB); +} + + +static void subscriberWorker(std::vector cfgOrchList) +{ + swss::Select s; + for (CfgOrch *o : cfgOrchList) + { + s.addSelectables(o->getSelectables()); + } + + while (true) + { + Selectable *sel; + int fd, ret; + + ret = s.select(&sel, &fd, SELECT_TIMEOUT); + if (ret == Select::TIMEOUT) + { + static int maxWait = 10; + maxWait--; + if (maxWait < 0) + { + // This unit testing should finish in 10 seconds! + break;; + } + continue; + } + EXPECT_EQ(ret, Select::OBJECT); + + for (CfgOrch *o : cfgOrchList) + { + TableConsumable *c = (TableConsumable *)sel; + if (o->hasSelectable(c)) + { + o->execute(c->getTableName()); + } + } + } +} + +TEST(CfgOrch, test) +{ + thread *publisherThread; + thread *subscriberThread; + vector values; + + vector agent_a_tables = { + cfgTableNameA1, + cfgTableNameA2, + }; + vector agent_b_tables = { + cfgTableNameB, + }; + + cout << "Starting cfgOrch testing" << endl; + clearDB(TEST_CONFIG_DB); + clearDB(TEST_APP_DB); + + string result; + string kea_cmd = "redis-cli config set notify-keyspace-events KEA"; + int ret = exec(kea_cmd, result); + EXPECT_TRUE(ret == 0); + + DBConnector cfgDb(TEST_CONFIG_DB, "localhost", 6379, 0); + DBConnector appDb(TEST_APP_DB, "localhost", 6379, 0); + + CfgAgentA agent_a(&cfgDb, &appDb, agent_a_tables); + CfgAgentB agent_b(&cfgDb, &appDb, agent_b_tables); + + std::vector cfgOrchList = {&agent_a, &agent_b}; + + cout << "- Step 1. Provision TEST_CONFIG_DB" << endl; + + subscriberThread = new thread(subscriberWorker, cfgOrchList); + + publisherThread = new thread(publisherWorkerSet); + publisherThread->join(); + delete publisherThread; + + sleep(1); + + cout << "- Step 2. Verify TEST_APP_DB content" << endl; + Table appTableA1(&appDb, cfgTableNameA1); + Table appTableA2(&appDb, cfgTableNameA2); + Table appTableB(&appDb, cfgTableNameB); + ASSERT_EQ(appTableA1.get(keyA1, values), true); + EXPECT_EQ(appTableA2.get(keyA2, values), true); + EXPECT_EQ(appTableB.get(keyB, values), true); + + cout << "- Step 3. Flush TEST_APP_DB" << endl; + clearDB(TEST_APP_DB); + EXPECT_EQ(appTableA1.get(keyA1, values), false); + EXPECT_EQ(appTableA2.get(keyA2, values), false); + EXPECT_EQ(appTableB.get(keyB, values), false); + + cout << "- Step 4. Sync from TEST_CONFIG_DB" << endl; + agent_a.syncCfgDB(); + agent_b.syncCfgDB(); + + cout << "- Step 5. Verify TEST_APP_DB content" << endl; + EXPECT_EQ(appTableA1.get(keyA1, values), true); + EXPECT_EQ(appTableA2.get(keyA2, values), true); + EXPECT_EQ(appTableB.get(keyB, values), true); + + cout << "- Step 6. Clean TEST_CONFIG_DB" << endl; + publisherThread = new thread(publisherWorkerDel); + publisherThread->join(); + delete publisherThread; + sleep(1); + cout << "- Step 7. Verify TEST_APP_DB content is empty" << endl; + EXPECT_EQ(appTableA1.get(keyA1, values), false); + EXPECT_EQ(appTableA2.get(keyA2, values), false); + EXPECT_EQ(appTableB.get(keyB, values), false); + + subscriberThread->join(); + delete subscriberThread; + cout << "Done." << endl; +}