diff --git a/common/Makefile.am b/common/Makefile.am index d6ac711f3..f28eda096 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -5,6 +5,7 @@ lib_LTLIBRARIES = libswsscommon.la EXTRA_DIST = \ consumer_state_table_pops.lua \ consumer_table_pops.lua \ + producer_state_table_apply_view.lua \ table_dump.lua swssdir = $(datadir)/swss diff --git a/common/producer_state_table_apply_view.lua b/common/producer_state_table_apply_view.lua new file mode 100644 index 000000000..211a66d09 --- /dev/null +++ b/common/producer_state_table_apply_view.lua @@ -0,0 +1,39 @@ +--[[ +Sample args format: +KEYS: + SAMPLE_CHANNEL + SAMPLE_KEY_SET + SAMPLE_DEL_KEY_SET + _SAMPLE:key_0 + _SAMPLE:key_1 + ARGV: + G (String to be published to channel) + 2 (Count of objects to set) + key_0 + key_1 + 0 (Count of objects to del) + 2 (Count of A/V pair of object 0) + attribute_0 + value_0 + attribute_1 + value_1 + 1 (Count of A/V pair of object 1) + attribute_0 + value_0 +]] +local arg_start = 2 +for i = 1, ARGV[arg_start] do + redis.call('SADD', KEYS[2], ARGV[arg_start + i]) +end +arg_start = arg_start + ARGV[arg_start] + 1 +for i = 1, ARGV[arg_start] do + redis.call('SADD', KEYS[3], ARGV[arg_start + i]) +end +arg_start = arg_start + ARGV[arg_start] + 1 +for j = 4, #KEYS do + for i = 1, ARGV[arg_start] do + redis.call('HSET', KEYS[j], ARGV[arg_start + i * 2 - 1], ARGV[arg_start + i * 2]) + end + arg_start = arg_start + 2 * ARGV[arg_start] + 1 +end +redis.call('PUBLISH', KEYS[1], ARGV[1]) diff --git a/common/producerstatetable.cpp b/common/producerstatetable.cpp index ba313c555..fab16ba09 100644 --- a/common/producerstatetable.cpp +++ b/common/producerstatetable.cpp @@ -24,6 +24,7 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta , TableName_KeySet(tableName) , m_buffered(buffered) , m_pipeowned(false) + , m_tempViewActive(false) , m_pipe(pipeline) { string luaSet = @@ -49,6 +50,9 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta "end\n" "redis.call('DEL', KEYS[3])\n"; m_shaClear = m_pipe->loadRedisScript(luaClear); + + string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua"); + m_shaApplyView = m_pipe->loadRedisScript(luaApplyView); } ProducerStateTable::~ProducerStateTable() @@ -67,6 +71,16 @@ void ProducerStateTable::setBuffered(bool buffered) void ProducerStateTable::set(const string &key, const vector &values, const string &op /*= SET_COMMAND*/, const string &prefix) { + if (m_tempViewActive) + { + // Write to temp view instead of DB + for (const auto& iv: values) + { + m_tempViewState[key][fvField(iv)] = fvValue(iv); + } + return; + } + // Assembly redis command args into a string vector vector args; args.emplace_back("EVALSHA"); @@ -101,6 +115,13 @@ void ProducerStateTable::set(const string &key, const vector &v void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND*/, const string &prefix) { + if (m_tempViewActive) + { + // Write to temp view instead of DB + m_tempViewState.erase(key); + return; + } + // Assembly redis command args into a string vector vector args; args.emplace_back("EVALSHA"); @@ -168,4 +189,167 @@ void ProducerStateTable::clear() m_pipe->flush(); } +void ProducerStateTable::create_temp_view() +{ + if (m_tempViewActive) + { + SWSS_LOG_WARN("create_temp_view() called for table %s when another temp view is under work, %zd objects in existing temp view will be discarded.", getTableName().c_str(), m_tempViewState.size()); + } + m_tempViewActive = true; + m_tempViewState.clear(); +} + +void ProducerStateTable::apply_temp_view() +{ + if (!m_tempViewActive) + { + SWSS_LOG_THROW("apply_temp_view() called for table %s, however no temp view was created.", getTableName().c_str()); + } + + // Drop all pending operation first + clear(); + + TableDump currentState; + { + Table mainTable(m_pipe, getTableName(), false); + mainTable.dump(currentState); + } + + // Print content of current view and temp view as debug log + SWSS_LOG_INFO("View switch of table %s required.", getTableName().c_str()); + SWSS_LOG_INFO("Objects in current view:"); + for (auto const & kfvPair : currentState) + { + SWSS_LOG_INFO(" %s: %zd fields;", kfvPair.first.c_str(), kfvPair.second.size()); + } + SWSS_LOG_INFO("Objects in target view:"); + for (auto const & kfvPair : m_tempViewState) + { + SWSS_LOG_INFO(" %s: %zd fields;", kfvPair.first.c_str(), kfvPair.second.size()); + } + + + std::vector keysToSet; + std::vector keysToDel; + + // Compare based on existing objects. + // Please note that this comparation is literal not contextual - + // e.g. {nexthop: 10.1.1.1, 10.1.1.2} and {nexthop: 10.1.1.2, 10.1.1.1} will be treated as different. + // Application will need to handle it, to make sure contextually identical field values also literally identical. + for (auto const & kfvPair : currentState) + { + const string& key = kfvPair.first; + const TableMap& fieldValueMap = kfvPair.second; + // DEL is needed if object does not exist in new state, or any field is not presented in new state + // SET is almost always needed, unless old state and new state exactly match each other + // (All old fields exists in new state, values match, and there is no additional field in new state) + if (m_tempViewState.find(key) == m_tempViewState.end()) // Key does not exist in new view + { + keysToDel.emplace_back(key); + keysToSet.emplace_back(key); + continue; + } + const TableMap& newFieldValueMap = m_tempViewState[key]; + bool needDel = false; + bool needSet = false; + for (auto const& fvPair : fieldValueMap) + { + const string& field = fvPair.first; + const string& value = fvPair.second; + if (newFieldValueMap.find(field) == newFieldValueMap.end()) // Field does not exist in new view + { + needDel = true; + needSet = true; + break; + } + if (newFieldValueMap.at(field) != value) // Field value changed + { + needSet = true; + } + } + if (newFieldValueMap.size() > fieldValueMap.size()) // New field added + { + needSet = true; + } + + if (needDel) + { + keysToDel.emplace_back(key); + } + if (needSet) + { + keysToSet.emplace_back(key); + } + else // If exactly match, no need to sync new state to StateHash in DB + { + m_tempViewState.erase(key); + } + } + // Objects that do not exist currently need to be created + for (auto const & kfvPair : m_tempViewState) + { + const string& key = kfvPair.first; + if (currentState.find(key) == currentState.end()) + { + keysToSet.emplace_back(key); + } + } + + // Assembly redis command args into a string vector + // See comment in producer_state_table_apply_view.lua for argument format + vector args; + args.emplace_back("EVALSHA"); + args.emplace_back(m_shaApplyView); + args.emplace_back(to_string(m_tempViewState.size() + 3)); + args.emplace_back(getChannelName()); + args.emplace_back(getKeySetName()); + args.emplace_back(getDelKeySetName()); + + vector argvs; + argvs.emplace_back("G"); + argvs.emplace_back(to_string(keysToSet.size())); + argvs.insert(argvs.end(), keysToSet.begin(), keysToSet.end()); + argvs.emplace_back(to_string(keysToDel.size())); + argvs.insert(argvs.end(), keysToDel.begin(), keysToDel.end()); + for (auto const & kfvPair : m_tempViewState) + { + const string& key = kfvPair.first; + const TableMap& fieldValueMap = kfvPair.second; + args.emplace_back(getStateHashPrefix() + getKeyName(key)); + argvs.emplace_back(to_string(fieldValueMap.size())); + for (auto const& fvPair : fieldValueMap) + { + const string& field = fvPair.first; + const string& value = fvPair.second; + argvs.emplace_back(field); + argvs.emplace_back(value); + } + } + args.insert(args.end(), argvs.begin(), argvs.end()); + + // Log arguments for debug + { + std::stringstream ss; + for (auto const & item : args) + { + ss << item << " "; + } + SWSS_LOG_DEBUG("apply_view.lua is called with following argument list: %s", ss.str().c_str()); + } + + // Transform data structure + vector args1; + transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } ); + + // Invoke redis command + RedisCommand command; + command.formatArgv((int)args1.size(), &args1[0], NULL); + m_pipe->push(command, REDIS_REPLY_NIL); + m_pipe->flush(); + + // Clear state, temp view operation is now finished + m_tempViewState.clear(); + m_tempViewActive = false; +} + } diff --git a/common/producerstatetable.h b/common/producerstatetable.h index ee84f8053..990dcdcb1 100644 --- a/common/producerstatetable.h +++ b/common/producerstatetable.h @@ -29,13 +29,20 @@ class ProducerStateTable : public TableBase, public TableName_KeySet int64_t count(); void clear(); + + void create_temp_view(); + + void apply_temp_view(); private: bool m_buffered; bool m_pipeowned; + bool m_tempViewActive; RedisPipeline *m_pipe; std::string m_shaSet; std::string m_shaDel; std::string m_shaClear; + std::string m_shaApplyView; + TableDump m_tempViewState; }; } diff --git a/common/table.cpp b/common/table.cpp index 3a9965cf1..3eefc60ed 100644 --- a/common/table.cpp +++ b/common/table.cpp @@ -179,11 +179,6 @@ void Table::dump(TableDump& tableDump) { SWSS_LOG_ENTER(); - // note that this function is not efficient - // it can take ~100ms for entire asic dump - // but it's not intended to be efficient - // since it will not be used many times - static std::string luaScript = loadLuaScript("table_dump.lua"); static std::string sha = m_pipe->loadRedisScript(luaScript); diff --git a/common/table_dump.lua b/common/table_dump.lua index c11cb587e..3c8c1bb53 100644 --- a/common/table_dump.lua +++ b/common/table_dump.lua @@ -1,17 +1,15 @@ -local keys = redis.call("keys", KEYS[1] .. ":*") +local keys = redis.call("KEYS", KEYS[1] .. ":*") local res = {} for i,k in pairs(keys) do - - local skeys = redis.call("HKEYS", k) local sres={} - for j,sk in pairs(skeys) do - sres[sk] = redis.call("HGET", k, sk) + local flat_map = redis.call('HGETALL', k) + for j = 1, #flat_map, 2 do + sres[flat_map[j]] = flat_map[j + 1] end res[k] = sres - end return cjson.encode(res) diff --git a/tests/redis_state_ut.cpp b/tests/redis_state_ut.cpp index 43e6a40a7..ea046f584 100644 --- a/tests/redis_state_ut.cpp +++ b/tests/redis_state_ut.cpp @@ -502,6 +502,450 @@ TEST(ConsumerStateTable, set_pop_del_set_pop_get) EXPECT_EQ(qlen, 0U); } +TEST(ConsumerStateTable, view_switch) +{ + clearDB(); + + // Prepare producer + int index = 0; + string tableName = "UT_REDIS_THREAD_" + to_string(index); + DBConnector db(TEST_DB, "localhost", 6379, 0); + ProducerStateTable p(&db, tableName); + Table table(&db, tableName); + + int numOfKeys = 20; + int numOfKeysToDel = 10; + int maxNumOfFields = 2; + + p.create_temp_view(); + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFields; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key(i), fields); + } + // Set opeartions should go into temp view + EXPECT_EQ(p.count(), 0); + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFields; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key(i), fields); + } + p.apply_temp_view(); + EXPECT_EQ(p.count(), numOfKeys); + + // Test object deletion + clearDB(); + numOfKeys = 20; + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFields; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + table.set(key(i), fields); + } + p.create_temp_view(); + p.apply_temp_view(); + EXPECT_EQ(p.count(), numOfKeys); + RedisCommand cmdDelCount; + cmdDelCount.format("SCARD %s", p.getDelKeySetName().c_str()); + RedisReply r(&db, cmdDelCount, REDIS_REPLY_INTEGER); + EXPECT_EQ(r.getReply(), (long long int) numOfKeys); + + // When there is less field. objects need to be deleted and recreated + clearDB(); + int maxNumOfFieldsNew = 1; + p.create_temp_view(); + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFields; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + table.set(key(i), fields); + fields.clear(); + for (int j = 0; j < maxNumOfFieldsNew; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key(i), fields); + } + p.apply_temp_view(); + EXPECT_EQ(p.count(), numOfKeys); + RedisReply r2(&db, cmdDelCount, REDIS_REPLY_INTEGER); + EXPECT_EQ(r2.getReply(), (long long int) numOfKeys); + + // When there is more field. objects does not need to be deleted + clearDB(); + maxNumOfFieldsNew = 3; + p.create_temp_view(); + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFields; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + table.set(key(i), fields); + fields.clear(); + for (int j = 0; j < maxNumOfFieldsNew; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key(i), fields); + } + p.apply_temp_view(); + EXPECT_EQ(p.count(), numOfKeys); + RedisReply r3(&db, cmdDelCount, REDIS_REPLY_INTEGER); + EXPECT_EQ(r3.getReply(), (long long int) 0); +} + +TEST(ConsumerStateTable, view_switch_abnormal_sequence) +{ + clearDB(); + + // Prepare producer + int index = 0; + string tableName = "UT_REDIS_THREAD_" + to_string(index); + DBConnector db(TEST_DB, "localhost", 6379, 0); + ProducerStateTable p(&db, tableName); + Table table(&db, tableName); + + int numOfKeys = 20; + int numOfKeysNew = 10; + int maxNumOfFields = 2; + + // Double create - only the content of second view should be applied + p.create_temp_view(); + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFields; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key(i), fields); + } + p.create_temp_view(); + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFields; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key(i), fields); + } + p.apply_temp_view(); + EXPECT_EQ(p.count(), numOfKeysNew); + + // Double apply - should throw exception + clearDB(); + p.create_temp_view(); + p.apply_temp_view(); + EXPECT_ANY_THROW({ + p.apply_temp_view(); + }); +} + +TEST(ConsumerStateTable, view_switch_with_consumer) +{ + clearDB(); + int numOfKeys = 20; + int numOfKeysNew = 5; + int maxNumOfFields = 2; + int maxNumOfFieldsNew = 1; + + // Prepare producer + int index = 0; + string tableName = "UT_REDIS_THREAD_" + to_string(index); + DBConnector db(TEST_DB, "localhost", 6379, 0); + ProducerStateTable p(&db, tableName); + Table table(&db, tableName); + + // Set up previous view + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFields; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key(i), fields); + } + + // Set up consumer to sync state + ConsumerStateTable c(&db, tableName); + Select cs; + Selectable *selectcs; + cs.addSelectable(&c); + int ret; + do + { + ret = cs.select(&selectcs, 1000); + if (ret == Select::OBJECT) + { + KeyOpFieldsValuesTuple kco; + c.pop(kco); + } + } + while (ret != Select::TIMEOUT); + + // State Queue should be empty + RedisCommand keys; + keys.format("KEYS %s*", (c.getStateHashPrefix() + tableName).c_str()); + RedisReply r(&db, keys, REDIS_REPLY_ARRAY); + EXPECT_EQ(r.getContext()->elements, (size_t) 0); + // Verify number of objects + keys.format("KEYS %s:*", tableName.c_str()); + RedisReply r2(&db, keys, REDIS_REPLY_ARRAY); + EXPECT_EQ(r2.getContext()->elements, (size_t) numOfKeys); + // Verify number of fields + RedisCommand hlen; + hlen.format("HLEN %s", (c.getKeyName(key(0))).c_str()); + RedisReply r3(&db, hlen, REDIS_REPLY_INTEGER); + EXPECT_EQ(r3.getReply(), (long long int) maxNumOfFields); + + // Apply temp view with different number of objects and different number of fields + p.create_temp_view(); + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFieldsNew; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key(i), fields); + } + p.apply_temp_view(); + + do + { + ret = cs.select(&selectcs, 1000); + if (ret == Select::OBJECT) + { + KeyOpFieldsValuesTuple kco; + c.pop(kco); + } + } + while (ret != Select::TIMEOUT); + + // State Queue should be empty + keys.format("KEYS %s*", (c.getStateHashPrefix() + tableName).c_str()); + RedisReply r4(&db, keys, REDIS_REPLY_ARRAY); + EXPECT_EQ(r4.getContext()->elements, (size_t) 0); + // Verify number of objects + keys.format("KEYS %s:*", tableName.c_str()); + RedisReply r5(&db, keys, REDIS_REPLY_ARRAY); + EXPECT_EQ(r5.getContext()->elements, (size_t) numOfKeysNew); + // Verify number of fields + hlen.format("HLEN %s", (c.getKeyName(key(0))).c_str()); + RedisReply r6(&db, hlen, REDIS_REPLY_INTEGER); + EXPECT_EQ(r6.getReply(), (long long int) maxNumOfFieldsNew); +} + +TEST(ConsumerStateTable, view_switch_delete_with_consumer) +{ + clearDB(); + int numOfKeys = 20; + int maxNumOfFields = 2; + + // Prepare producer + int index = 0; + string tableName = "UT_REDIS_THREAD_" + to_string(index); + DBConnector db(TEST_DB, "localhost", 6379, 0); + ProducerStateTable p(&db, tableName); + Table table(&db, tableName); + + // Set up previous view + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFields; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key(i), fields); + } + + // Set up consumer to sync state + ConsumerStateTable c(&db, tableName); + Select cs; + Selectable *selectcs; + cs.addSelectable(&c); + int ret; + do + { + ret = cs.select(&selectcs, 1000); + if (ret == Select::OBJECT) + { + KeyOpFieldsValuesTuple kco; + c.pop(kco); + } + } + while (ret != Select::TIMEOUT); + + // State Queue should be empty + RedisCommand keys; + keys.format("KEYS %s*", (c.getStateHashPrefix() + tableName).c_str()); + RedisReply r(&db, keys, REDIS_REPLY_ARRAY); + EXPECT_EQ(r.getContext()->elements, (size_t) 0); + // Verify number of objects + keys.format("KEYS %s:*", tableName.c_str()); + RedisReply r2(&db, keys, REDIS_REPLY_ARRAY); + EXPECT_EQ(r2.getContext()->elements, (size_t) numOfKeys); + // Verify number of fields + RedisCommand hlen; + hlen.format("HLEN %s", (c.getKeyName(key(0))).c_str()); + RedisReply r3(&db, hlen, REDIS_REPLY_INTEGER); + EXPECT_EQ(r3.getReply(), (long long int) maxNumOfFields); + + // Apply empty temp view + p.create_temp_view(); + p.apply_temp_view(); + + do + { + ret = cs.select(&selectcs, 1000); + if (ret == Select::OBJECT) + { + KeyOpFieldsValuesTuple kco; + c.pop(kco); + } + } + while (ret != Select::TIMEOUT); + + // State Queue should be empty + keys.format("KEYS %s*", (c.getStateHashPrefix() + tableName).c_str()); + RedisReply r4(&db, keys, REDIS_REPLY_ARRAY); + EXPECT_EQ(r4.getContext()->elements, (size_t) 0); + // Table should be empty + keys.format("KEYS %s:*", tableName.c_str()); + RedisReply r5(&db, keys, REDIS_REPLY_ARRAY); + EXPECT_EQ(r5.getContext()->elements, (size_t) 0); +} + +TEST(ConsumerStateTable, view_switch_delete_with_consumer_2) +{ + clearDB(); + int numOfKeys = 20; + int maxNumOfFields = 2; + + // Prepare producer + int index = 0; + string tableName = "UT_REDIS_THREAD_" + to_string(index); + DBConnector db(TEST_DB, "localhost", 6379, 0); + ProducerStateTable p(&db, tableName); + Table table(&db, tableName); + + // Set up consumer to sync state + ConsumerStateTable c(&db, tableName); + Select cs; + Selectable *selectcs; + cs.addSelectable(&c); + int ret; + + // Set up initial view again + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFields; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key(i), fields); + } + do + { + ret = cs.select(&selectcs, 1000); + if (ret == Select::OBJECT) + { + KeyOpFieldsValuesTuple kco; + c.pop(kco); + } + } + while (ret != Select::TIMEOUT); + // State Queue should be empty + RedisCommand keys; + keys.format("KEYS %s*", (c.getStateHashPrefix() + tableName).c_str()); + RedisReply r6(&db, keys, REDIS_REPLY_ARRAY); + EXPECT_EQ(r6.getContext()->elements, (size_t) 0); + + p.create_temp_view(); + // set and del in temp view + for (int i=0; i fields; + for (int j = 0; j < maxNumOfFields; ++j) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key(i), fields); + } + for (int i=0; ielements, (size_t) 0); + // Table should be empty + keys.format("KEYS %s:*", tableName.c_str()); + RedisReply r8(&db, keys, REDIS_REPLY_ARRAY); + EXPECT_EQ(r8.getContext()->elements, (size_t) 0); +} + TEST(ConsumerStateTable, singlethread) { clearDB();