diff --git a/fpmsyncd/fpmlink.cpp b/fpmsyncd/fpmlink.cpp index 9dfedb6c1b6..93e9b0fc3b4 100644 --- a/fpmsyncd/fpmlink.cpp +++ b/fpmsyncd/fpmlink.cpp @@ -40,7 +40,7 @@ FpmLink::FpmLink(int port) : memset (&addr, 0, sizeof (addr)); addr.sin_family = AF_INET; - addr.sin_port = htons(port); + addr.sin_port = htons((uint16_t)port); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); if (bind(m_server_socket, (struct sockaddr *)&addr, sizeof(addr)) < 0) diff --git a/orchagent/Makefile.am b/orchagent/Makefile.am index 41916c56ba0..90c7d3a5ef4 100644 --- a/orchagent/Makefile.am +++ b/orchagent/Makefile.am @@ -21,6 +21,7 @@ orchagent_SOURCES = \ main.cpp \ port.cpp \ orchdaemon.cpp \ + orchbase.cpp \ orch.cpp \ notifications.cpp \ routeorch.cpp \ @@ -47,6 +48,7 @@ orchagent_SOURCES = \ neighorch.h \ notifications.h \ observer.h \ + orchbase.h \ orch.h \ orchdaemon.h \ pfcactionhandler.h \ diff --git a/orchagent/main.cpp b/orchagent/main.cpp index 768400f5d24..8d087e4fdfb 100644 --- a/orchagent/main.cpp +++ b/orchagent/main.cpp @@ -34,7 +34,6 @@ sai_object_id_t gUnderlayIfId; sai_object_id_t gSwitchId = SAI_NULL_OBJECT_ID; MacAddress gMacAddress; -#define DEFAULT_BATCH_SIZE 128 int gBatchSize = DEFAULT_BATCH_SIZE; bool gSairedisRecord = true; diff --git a/orchagent/orch.cpp b/orchagent/orch.cpp index 1a69b6e12fa..6f46cc7c50f 100644 --- a/orchagent/orch.cpp +++ b/orchagent/orch.cpp @@ -4,7 +4,6 @@ #include #include "orch.h" - #include "subscriberstatetable.h" #include "portsorch.h" #include "tokenize.h" @@ -25,128 +24,33 @@ extern string gRecordFile; extern string getTimestamp(); Orch::Orch(DBConnector *db, string tableName) + :OrchBase(db, tableName) { - addConsumer(db, tableName); } Orch::Orch(DBConnector *db, vector &tableNames) + :OrchBase(db, tableNames) { - for(auto it : tableNames) - { - addConsumer(db, it); - } } Orch::Orch(const vector& tables) + :OrchBase(tables) { - for (auto it : tables) - { - addConsumer(it.first, it.second); - } } Orch::~Orch() { - for(auto &it : m_consumerMap) - delete it.second.m_consumer; - if (gRecordOfs.is_open()) { gRecordOfs.close(); } } -vector Orch::getSelectables() -{ - vector selectables; - for(auto it : m_consumerMap) { - selectables.push_back(it.second.m_consumer); - } - return selectables; -} - -bool Orch::hasSelectable(TableConsumable *selectable) const -{ - for(auto it : m_consumerMap) { - if (it.second.m_consumer == selectable) { - return true; - } - } - return false; -} - bool Orch::execute(string tableName) { - SWSS_LOG_ENTER(); - lock_guard lock(gDbMutex); + return OrchBase::execute(tableName); - auto consumer_it = m_consumerMap.find(tableName); - if (consumer_it == m_consumerMap.end()) - { - SWSS_LOG_ERROR("Unrecognized tableName:%s\n", tableName.c_str()); - return false; - } - Consumer& consumer = consumer_it->second; - - std::deque entries; - consumer.m_consumer->pops(entries); - - /* Nothing popped */ - if (entries.empty()) - { - return true; - } - - for (auto entry: entries) - { - string key = kfvKey(entry); - string op = kfvOp(entry); - - /* Record incoming tasks */ - if (gSwssRecord) - { - recordTuple(consumer, entry); - } - - /* If a new task comes or if a DEL task comes, we directly put it into consumer.m_toSync map */ - if (consumer.m_toSync.find(key) == consumer.m_toSync.end() || op == DEL_COMMAND) - { - consumer.m_toSync[key] = entry; - } - /* If an old task is still there, we combine the old task with new task */ - else - { - KeyOpFieldsValuesTuple existing_data = consumer.m_toSync[key]; - - auto new_values = kfvFieldsValues(entry); - auto existing_values = kfvFieldsValues(existing_data); - - - for (auto it : new_values) - { - string field = fvField(it); - string value = fvValue(it); - - auto iu = existing_values.begin(); - while (iu != existing_values.end()) - { - string ofield = fvField(*iu); - if (field == ofield) - iu = existing_values.erase(iu); - else - iu++; - } - existing_values.push_back(FieldValueTuple(field, value)); - } - consumer.m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values); - } - } - - if (!consumer.m_toSync.empty()) - doTask(consumer); - - return true; } /* @@ -236,11 +140,7 @@ void Orch::doTask() if (!gPortsOrch->isInitDone()) return; - for(auto &it : m_consumerMap) - { - if (!it.second.m_toSync.empty()) - doTask(it.second); - } + OrchBase::doTask(); } void Orch::logfileReopen() @@ -264,6 +164,11 @@ void Orch::logfileReopen() void Orch::recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple) { + if (!gSwssRecord) + { + return; + } + string s = consumer.m_consumer->getTableName() + ":" + kfvKey(tuple) + "|" + kfvOp(tuple); for (auto i = kfvFieldsValues(tuple).begin(); i != kfvFieldsValues(tuple).end(); i++) @@ -364,12 +269,5 @@ bool Orch::parseIndexRange(const string &input, sai_uint32_t &range_low, sai_uin void Orch::addConsumer(DBConnector *db, string tableName) { - if (db->getDB() == CONFIG_DB) - { - Consumer consumer(new SubscriberStateTable(db, tableName)); - m_consumerMap.insert(ConsumerMapPair(tableName, consumer)); - } else { - Consumer consumer(new ConsumerStateTable(db, tableName, gBatchSize)); - m_consumerMap.insert(ConsumerMapPair(tableName, consumer)); - } + OrchBase::addConsumer(db, tableName, gBatchSize); } diff --git a/orchagent/orch.h b/orchagent/orch.h index 473f3a4b79e..2d22b0c4f09 100644 --- a/orchagent/orch.h +++ b/orchagent/orch.h @@ -1,18 +1,12 @@ #ifndef SWSS_ORCH_H #define SWSS_ORCH_H -#include -#include - extern "C" { #include "sai.h" #include "saistatus.h" } -#include "dbconnector.h" -#include "table.h" -#include "consumertable.h" -#include "consumerstatetable.h" +#include "orchbase.h" using namespace std; using namespace swss; @@ -42,16 +36,6 @@ typedef pair object_map_pair; typedef map type_map; typedef pair type_map_pair; -typedef map SyncMap; -struct Consumer { - Consumer(TableConsumable* consumer) : m_consumer(consumer) { } - TableConsumable* m_consumer; - /* Store the latest 'golden' status */ - SyncMap m_toSync; -}; -typedef pair ConsumerMapPair; -typedef map ConsumerMap; - typedef enum { success, @@ -61,20 +45,13 @@ typedef enum failure } ref_resolve_status; -typedef pair TableConnector; -typedef pair> TablesConnector; - -class Orch +class Orch : public OrchBase { public: Orch(DBConnector *db, string tableName); Orch(DBConnector *db, vector &tableNames); Orch(const vector& tables); virtual ~Orch(); - - vector getSelectables(); - bool hasSelectable(TableConsumable* s) const; - bool execute(string tableName); /* Iterate all consumers in m_consumerMap and run doTask(Consumer) */ void doTask(); diff --git a/orchagent/orchbase.cpp b/orchagent/orchbase.cpp new file mode 100644 index 00000000000..6bf5116a640 --- /dev/null +++ b/orchagent/orchbase.cpp @@ -0,0 +1,202 @@ +#include +#include +#include +#include + +#include "orchbase.h" +#include "subscriberstatetable.h" +#include "tokenize.h" +#include "logger.h" +#include "consumerstatetable.h" + +using namespace swss; + +OrchBase::OrchBase(DBConnector *db, string tableName) +{ + addConsumer(db, tableName); +} + +OrchBase::OrchBase(DBConnector *db, const vector &tableNames) +{ + for(auto it : tableNames) + { + addConsumer(db, it); + } +} + +OrchBase::OrchBase(const vector& tables) +{ + for (auto it : tables) + { + addConsumer(it.first, it.second); + } +} + +OrchBase::~OrchBase() +{ + for(auto &it : m_consumerMap) + delete it.second.m_consumer; +} + +vector OrchBase::getSelectables() +{ + vector selectables; + for(auto it : m_consumerMap) { + selectables.push_back(it.second.m_consumer); + } + return selectables; +} + +bool OrchBase::hasSelectable(TableConsumable *selectable) const +{ + for(auto it : m_consumerMap) { + if (it.second.m_consumer == selectable) { + return true; + } + } + return false; +} + +bool OrchBase::syncDB(string tableName, Table &tableConsumer) +{ + SWSS_LOG_ENTER(); + + auto consumer_it = m_consumerMap.find(tableName); + if (consumer_it == m_consumerMap.end()) + { + SWSS_LOG_ERROR("Unrecognized tableName:%s\n", tableName.c_str()); + return false; + } + Consumer& consumer = consumer_it->second; + + vector tuples; + + tableConsumer.getTableContent(tuples); + for (auto tuple : tuples) + { + string key = kfvKey(tuple); + /* Directly put it into consumer.m_toSync map */ + if (consumer.m_toSync.find(key) == consumer.m_toSync.end()) + { + consumer.m_toSync[key] = make_tuple(key, SET_COMMAND, kfvFieldsValues(tuple)); + } + /* + * Syncing from DB directly, don't expect duplicate keys. + * Or there is pending task from consumber state pipe, in this case just skip it. + */ + else + { + SWSS_LOG_WARN("Duplicate key %s found in tableName:%s\n", key.c_str(), tableName.c_str()); + continue; + } + SWSS_LOG_DEBUG("%s", (dumpTuple(consumer, tuple)).c_str()); + doTask(consumer); + } + return true; +} + +bool OrchBase::execute(string tableName) +{ + SWSS_LOG_ENTER(); + + auto consumer_it = m_consumerMap.find(tableName); + if (consumer_it == m_consumerMap.end()) + { + SWSS_LOG_ERROR("Unrecognized tableName:%s\n", tableName.c_str()); + return false; + } + Consumer& consumer = consumer_it->second; + + std::deque entries; + consumer.m_consumer->pops(entries); + + /* Nothing popped */ + if (entries.empty()) + { + return true; + } + + for (auto entry: entries) + { + string key = kfvKey(entry); + string op = kfvOp(entry); + + recordTuple(consumer, entry); + + /* If a new task comes or if a DEL task comes, we directly put it into consumer.m_toSync map */ + if (consumer.m_toSync.find(key) == consumer.m_toSync.end() || op == DEL_COMMAND) + { + consumer.m_toSync[key] = entry; + } + /* If an old task is still there, we combine the old task with new task */ + else + { + KeyOpFieldsValuesTuple existing_data = consumer.m_toSync[key]; + + auto new_values = kfvFieldsValues(entry); + auto existing_values = kfvFieldsValues(existing_data); + + + for (auto it : new_values) + { + string field = fvField(it); + string value = fvValue(it); + + auto iu = existing_values.begin(); + while (iu != existing_values.end()) + { + string ofield = fvField(*iu); + if (field == ofield) + iu = existing_values.erase(iu); + else + iu++; + } + existing_values.push_back(FieldValueTuple(field, value)); + } + consumer.m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values); + } + } + + if (!consumer.m_toSync.empty()) + doTask(consumer); + + return true; +} + +void OrchBase::recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple) +{ + return; +} + +void OrchBase::doTask() +{ + for(auto &it : m_consumerMap) + { + if (!it.second.m_toSync.empty()) + doTask(it.second); + } +} + +string OrchBase::dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple) +{ + string s = consumer.m_consumer->getTableName() + ":" + kfvKey(tuple) + + "|" + kfvOp(tuple); + for (auto i = kfvFieldsValues(tuple).begin(); i != kfvFieldsValues(tuple).end(); i++) + { + s += "|" + fvField(*i) + ":" + fvValue(*i); + } + + return s; +} + +void OrchBase::addConsumer(DBConnector *db, string tableName, int batchSize) +{ + if (db->getDB() == CONFIG_DB) + { + Consumer consumer(new SubscriberStateTable(db, tableName)); + m_consumerMap.insert(ConsumerMapPair(tableName, consumer)); + } else { + Consumer consumer(new ConsumerStateTable(db, tableName, batchSize)); + m_consumerMap.insert(ConsumerMapPair(tableName, consumer)); + } +} diff --git a/orchagent/orchbase.h b/orchagent/orchbase.h new file mode 100644 index 00000000000..09588c79e7e --- /dev/null +++ b/orchagent/orchbase.h @@ -0,0 +1,58 @@ +#ifndef SWSS_ORCH_BASE_H +#define SWSS_ORCH_BASE_H + +#include +#include + +#include "dbconnector.h" +#include "table.h" +#include "consumertable.h" +#include "consumerstatetable.h" + +using namespace std; +using namespace swss; + +#define DEFAULT_BATCH_SIZE 128 +#define DEFAULT_KEY_SEPARATOR ":" +#define CONFIGDB_KEY_SEPARATOR "|" + +typedef map SyncMap; +struct Consumer { + Consumer(TableConsumable* consumer) : m_consumer(consumer) { } + TableConsumable* m_consumer; + /* Store the latest 'golden' status */ + SyncMap m_toSync; +}; +typedef pair ConsumerMapPair; +typedef map ConsumerMap; + +typedef pair TableConnector; +typedef pair> TablesConnector; + +class OrchBase +{ +public: + OrchBase(DBConnector *db, string tableName); + OrchBase(DBConnector *db, const vector &tableNames); + OrchBase(const vector& tables); + virtual ~OrchBase(); + + vector getSelectables(); + bool hasSelectable(TableConsumable* s) const; + + virtual bool execute(string tableName); + /* Iterate all consumers in m_consumerMap and run doTask(Consumer) */ + virtual void doTask(); + +protected: + ConsumerMap m_consumerMap; + + /* Run doTask against a specific consumer */ + virtual void doTask(Consumer &consumer) = 0; + virtual void recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple); + string dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple); + void addConsumer(DBConnector *db, string tableName, int batchSize = DEFAULT_BATCH_SIZE); + bool syncDB(string tableName, Table &tableConsumer); +}; + +#endif /* SWSS_ORCH_BASE_H */ diff --git a/tests/Makefile.am b/tests/Makefile.am index d96e3f5c3ef..60932420d6b 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -12,7 +12,7 @@ endif CFLAGS_GTEST = LDADD_GTEST = -tests_SOURCES = swssnet_ut.cpp +tests_SOURCES = swssnet_ut.cpp orchbase_ut.cpp $(top_srcdir)/orchagent/orchbase.cpp tests_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) tests_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) diff --git a/tests/orchbase_ut.cpp b/tests/orchbase_ut.cpp new file mode 100644 index 00000000000..e5c4f80ee93 --- /dev/null +++ b/tests/orchbase_ut.cpp @@ -0,0 +1,387 @@ +#include +#include +#include +#include +#include +#include "dbconnector.h" +#include "select.h" +#include "schema.h" +#include "exec.h" +#include "orchbase.h" +#include "producerstatetable.h" + +using namespace std; +using namespace swss; + +/* + * In the following unit test: + * OrchBase supports orchestrating notifications from three tables in TEST_CONFIG_DB, + * CFGTABLE_A_1, CFGTABLE_A_2 & CFGTABLE_B with client type of keyspace SubscriberStateTable, + * and one table in TEST_APP_DB, APPTABLE_C with client type of ConsumerStateTable, + * and directs the notification to each result table in TEST_RESULT_DB + */ +#define TEST_CONFIG_DB (CONFIG_DB) +#define TEST_APP_DB (7) +#define TEST_RESULT_DB (8) + +#define SELECT_TIMEOUT 1000 + +const string cfgTableNameA1 = "CFGTABLE_A_1"; +const string cfgTableNameA2 = "CFGTABLE_A_2"; +const string cfgTableNameB = "CFGTABLE_B"; +const string appTableNameC = "APPTABLE_C"; + +const string keySuffix = "_Key"; +const string keyA1 = cfgTableNameA1 + keySuffix; +const string keyA2 = cfgTableNameA2 + keySuffix; +const string keyB = cfgTableNameB + keySuffix; +const string keyC = appTableNameC + keySuffix; + +static inline void clearDB(int db) +{ + DBConnector dbc(db, "localhost", 6379, 0); + RedisReply r(&dbc, "FLUSHDB", REDIS_REPLY_STATUS); + r.checkStatusOK(); +} + +class CfgAgentA : public OrchBase +{ +public: + CfgAgentA(DBConnector *cfgDb, DBConnector *resultDb, vector tableNames): + OrchBase(cfgDb, tableNames), + m_cfgTableA1(cfgDb, cfgTableNameA1, CONFIGDB_TABLE_NAME_SEPARATOR), + m_cfgTableA2(cfgDb, cfgTableNameA2, CONFIGDB_TABLE_NAME_SEPARATOR), + m_resultTableA1(resultDb, cfgTableNameA1), + m_resultTableA2(resultDb, cfgTableNameA2) + { + } + + void syncCfgDB() + { + OrchBase::syncDB(cfgTableNameA1, m_cfgTableA1); + OrchBase::syncDB(cfgTableNameA2, m_cfgTableA2); + } + +private: + Table m_cfgTableA1, m_cfgTableA2; + Table m_resultTableA1, m_resultTableA2; + + void doTask(Consumer &consumer) + { + string table_name = consumer.m_consumer->getTableName(); + string key_expected = table_name + keySuffix; + + EXPECT_TRUE(table_name == cfgTableNameA1 || table_name == cfgTableNameA2); + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple t = it->second; + + string key = kfvKey(t); + ASSERT_STREQ(key.c_str(), key_expected.c_str()); + + string op = kfvOp(t); + if (op == SET_COMMAND) + { + if (table_name == cfgTableNameA1) + { + m_resultTableA1.set(kfvKey(t), kfvFieldsValues(t)); + } + else + { + m_resultTableA2.set(kfvKey(t), kfvFieldsValues(t)); + } + } + else if (op == DEL_COMMAND) + { + if (table_name == cfgTableNameA1) + { + m_resultTableA1.del(kfvKey(t)); + } + else + { + m_resultTableA2.del(kfvKey(t)); + } + } + it = consumer.m_toSync.erase(it); + continue; + } + } +}; + +class CfgAgentB : public OrchBase +{ +public: + CfgAgentB(DBConnector *cfgDb, DBConnector *resultDb, vector tableNames): + OrchBase(cfgDb, tableNames), + m_cfgTableB(cfgDb, cfgTableNameB, CONFIGDB_TABLE_NAME_SEPARATOR), + m_resultTableB(resultDb, cfgTableNameB) + { + + } + + void syncCfgDB() + { + OrchBase::syncDB(cfgTableNameB, m_cfgTableB); + } + +private: + Table m_cfgTableB; + Table m_resultTableB; + + void doTask(Consumer &consumer) + { + string table_name = consumer.m_consumer->getTableName(); + string key_expected = table_name + keySuffix; + + EXPECT_TRUE(table_name == cfgTableNameB); + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple t = it->second; + + string key = kfvKey(t); + ASSERT_STREQ(key.c_str(), key_expected.c_str()); + + string op = kfvOp(t); + if (op == SET_COMMAND) + { + m_resultTableB.set(kfvKey(t), kfvFieldsValues(t)); + } + else if (op == DEL_COMMAND) + { + m_resultTableB.del(kfvKey(t)); + } + it = consumer.m_toSync.erase(it); + continue; + } + } +}; + +class AppAgentC : public OrchBase +{ +public: + AppAgentC(DBConnector *appDb, DBConnector *resultDb, vector tableNames): + OrchBase(appDb, tableNames), + m_appTableC(appDb, appTableNameC), + m_resultTableC(resultDb, appTableNameC) + { + + } + + void syncAppDB() + { + OrchBase::syncDB(appTableNameC, m_appTableC); + } + +private: + Table m_appTableC; + Table m_resultTableC; + + void doTask(Consumer &consumer) + { + string table_name = consumer.m_consumer->getTableName(); + string key_expected = table_name + keySuffix; + + EXPECT_TRUE(table_name == appTableNameC); + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple t = it->second; + + string key = kfvKey(t); + ASSERT_STREQ(key.c_str(), key_expected.c_str()); + + string op = kfvOp(t); + if (op == SET_COMMAND) + { + m_resultTableC.set(kfvKey(t), kfvFieldsValues(t)); + } + else if (op == DEL_COMMAND) + { + m_resultTableC.del(kfvKey(t)); + } + it = consumer.m_toSync.erase(it); + continue; + } + } +}; + +// Thread for both publisher and producer +static void publisherProducerWorkerSet() +{ + // To trigger keyspace notification + DBConnector cfgDb(TEST_CONFIG_DB, "localhost", 6379, 0); + Table tableA1(&cfgDb, cfgTableNameA1, CONFIGDB_TABLE_NAME_SEPARATOR); + Table tableA2(&cfgDb, cfgTableNameA2, CONFIGDB_TABLE_NAME_SEPARATOR); + Table tableB(&cfgDb, cfgTableNameB, CONFIGDB_TABLE_NAME_SEPARATOR); + + // TO trigger producer state notifiation + DBConnector appDb(TEST_APP_DB, "localhost", 6379, 0); + ProducerStateTable tableC(&appDb, appTableNameC); + + vector fields; + FieldValueTuple t("field", "value"); + fields.push_back(t); + + tableA1.set(keyA1, fields); + tableA2.set(keyA2, fields); + tableB.set(keyB, fields); + tableC.set(keyC, fields); +} + +static void publisherProducerWorkerDel() +{ + DBConnector cfgDb(TEST_CONFIG_DB, "localhost", 6379, 0); + Table tableA1(&cfgDb, cfgTableNameA1, CONFIGDB_TABLE_NAME_SEPARATOR); + Table tableA2(&cfgDb, cfgTableNameA2, CONFIGDB_TABLE_NAME_SEPARATOR); + Table tableB(&cfgDb, cfgTableNameB, CONFIGDB_TABLE_NAME_SEPARATOR); + + DBConnector appDb(TEST_APP_DB, "localhost", 6379, 0); + ProducerStateTable tableC(&appDb, appTableNameC); + + tableA1.del(keyA1); + tableA2.del(keyA2); + tableB.del(keyB); + tableC.del(keyC); +} + +// worker thread for both keyspace subscriber and state consumer +static void subscriberConsumerWorker(std::vector cfgOrchList) +{ + swss::Select s; + for (OrchBase *o : cfgOrchList) + { + s.addSelectables(o->getSelectables()); + } + + while (true) + { + Selectable *sel; + int fd, ret; + + ret = s.select(&sel, &fd, SELECT_TIMEOUT); + if (ret == Select::TIMEOUT) + { + static int maxWait = 10; + maxWait--; + if (maxWait < 0) + { + // This unit testing should finish in 10 seconds! + break;; + } + continue; + } + EXPECT_EQ(ret, Select::OBJECT); + + for (OrchBase *o : cfgOrchList) + { + TableConsumable *c = (TableConsumable *)sel; + if (o->hasSelectable(c)) + { + o->execute(c->getTableName()); + } + } + } +} + +TEST(OrchBase, test) +{ + thread *publisherThread; + thread *subscriberThread; + vector values; + + vector agent_a_tables = { + cfgTableNameA1, + cfgTableNameA2, + }; + vector agent_b_tables = { + cfgTableNameB, + }; + + vector agent_c_tables = { + appTableNameC, + }; + + cout << "Starting OrchBase testing" << endl; + clearDB(TEST_CONFIG_DB); + clearDB(TEST_APP_DB); + clearDB(TEST_RESULT_DB); + + string result; + string kea_cmd = "redis-cli config set notify-keyspace-events KEA"; + int ret = exec(kea_cmd, result); + EXPECT_TRUE(ret == 0); + + // Connection to configDB will work as keysapce SubscriberStateTable client in OrchBase. + // Note that TEST_CONFIG_DB must be defined as CONFIG_DB + DBConnector cfgDb(TEST_CONFIG_DB, "localhost", 6379, 0); + + // Connection to other DB will work as ConsumerStateTable client in OrchBase. + DBConnector appDb(TEST_APP_DB, "localhost", 6379, 0); + + // connection to result DB is just for checking the processing result. + DBConnector resultDb(TEST_RESULT_DB, "localhost", 6379, 0); + + CfgAgentA agent_a(&cfgDb, &resultDb, agent_a_tables); + CfgAgentB agent_b(&cfgDb, &resultDb, agent_b_tables); + AppAgentC agent_c(&appDb, &resultDb, agent_c_tables); + + std::vector cfgOrchList = {&agent_a, &agent_b, &agent_c}; + + cout << "- Step 1. Provision TEST_CONFIG_DB and TEST_APP_DB" << endl; + + subscriberThread = new thread(subscriberConsumerWorker, cfgOrchList); + + publisherThread = new thread(publisherProducerWorkerSet); + publisherThread->join(); + delete publisherThread; + + sleep(1); + + cout << "- Step 2. Verify TEST_RESULT_DB content" << endl; + Table resultTableA1(&resultDb, cfgTableNameA1); + Table resultTableA2(&resultDb, cfgTableNameA2); + Table resultTableB(&resultDb, cfgTableNameB); + Table resultTableC(&resultDb, appTableNameC); + ASSERT_EQ(resultTableA1.get(keyA1, values), true); + EXPECT_EQ(resultTableA2.get(keyA2, values), true); + EXPECT_EQ(resultTableB.get(keyB, values), true); + EXPECT_EQ(resultTableC.get(keyC, values), true); + + cout << "- Step 3. Flush TEST_RESULT_DB" << endl; + clearDB(TEST_RESULT_DB); + EXPECT_EQ(resultTableA1.get(keyA1, values), false); + EXPECT_EQ(resultTableA2.get(keyA2, values), false); + EXPECT_EQ(resultTableB.get(keyB, values), false); + EXPECT_EQ(resultTableC.get(keyC, values), false); + + cout << "- Step 4. Sync from TEST_CONFIG_DB and TEST_APP_DB" << endl; + agent_a.syncCfgDB(); + agent_b.syncCfgDB(); + agent_c.syncAppDB(); + + cout << "- Step 5. Verify TEST_RESULT_DB content" << endl; + EXPECT_EQ(resultTableA1.get(keyA1, values), true); + EXPECT_EQ(resultTableA2.get(keyA2, values), true); + EXPECT_EQ(resultTableB.get(keyB, values), true); + EXPECT_EQ(resultTableC.get(keyC, values), true); + + cout << "- Step 6. Clean TEST_CONFIG_DB and TEST_APP_DB" << endl; + publisherThread = new thread(publisherProducerWorkerDel); + publisherThread->join(); + delete publisherThread; + sleep(1); + cout << "- Step 7. Verify TEST_RESULT_DB content is empty" << endl; + EXPECT_EQ(resultTableA1.get(keyA1, values), false); + EXPECT_EQ(resultTableA2.get(keyA2, values), false); + EXPECT_EQ(resultTableB.get(keyB, values), false); + EXPECT_EQ(resultTableC.get(keyC, values), false); + + subscriberThread->join(); + delete subscriberThread; + cout << "Done." << endl; +}