diff --git a/common/consumer_state_table_pops.lua b/common/consumer_state_table_pops.lua index 910ea5590..d63fe2b37 100644 --- a/common/consumer_state_table_pops.lua +++ b/common/consumer_state_table_pops.lua @@ -1,9 +1,19 @@ +redis.replicate_commands() local ret = {} +local tablename = KEYS[2] +local stateprefix = ARGV[2] local keys = redis.call('SPOP', KEYS[1], ARGV[1]) local n = table.getn(keys) for i = 1, n do local key = keys[i] - local values = redis.call('HGETALL', KEYS[2] .. key) - table.insert(ret, {key, values}) + local fieldvalues = redis.call('HGETALL', stateprefix..tablename..key) + table.insert(ret, {key, fieldvalues}) + for i = 1, #fieldvalues, 2 do + redis.call('HSET', tablename..key, fieldvalues[i], fieldvalues[i + 1]) + end + redis.call('DEL', stateprefix..tablename..key) + if #fieldvalues == 0 then + redis.call('DEL', tablename..key) + end end return ret diff --git a/common/consumerstatetable.cpp b/common/consumerstatetable.cpp index 2b93421d7..e11a82ece 100644 --- a/common/consumerstatetable.cpp +++ b/common/consumerstatetable.cpp @@ -38,11 +38,12 @@ void ConsumerStateTable::pops(std::deque &vkco, const st RedisCommand command; command.format( - "EVALSHA %s 2 %s %s: %d ''", + "EVALSHA %s 2 %s %s: %d %s", sha.c_str(), getKeySetName().c_str(), getTableName().c_str(), - POP_BATCH_SIZE); + POP_BATCH_SIZE, + getStateHashPrefix().c_str()); RedisReply r(m_db, command); auto ctx0 = r.getContext(); @@ -64,7 +65,6 @@ void ConsumerStateTable::pops(std::deque &vkco, const st assert(values.empty()); auto& ctx = ctx0->element[ie]; - assert(ctx->elements == 2); assert(ctx->element[0]->type == REDIS_REPLY_STRING); std::string key = ctx->element[0]->str; kfvKey(kco) = key; diff --git a/common/producerstatetable.cpp b/common/producerstatetable.cpp index 4ef7a09a3..bdf30fdc1 100644 --- a/common/producerstatetable.cpp +++ b/common/producerstatetable.cpp @@ -65,7 +65,7 @@ void ProducerStateTable::set(const string &key, const vector &v args.emplace_back(getChannelName()); args.emplace_back(getKeySetName()); - args.insert(args.end(), values.size(), getKeyName(key)); + args.insert(args.end(), values.size(), getStateHashPrefix() + getKeyName(key)); args.emplace_back("G"); args.emplace_back(key); @@ -98,7 +98,7 @@ void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND args.emplace_back("3"); args.emplace_back(getChannelName()); args.emplace_back(getKeySetName()); - args.emplace_back(getKeyName(key)); + args.emplace_back(getStateHashPrefix() + getKeyName(key)); args.emplace_back("G"); args.emplace_back(key); args.emplace_back("''"); diff --git a/common/table.h b/common/table.h index 4f153523c..556be7894 100644 --- a/common/table.h +++ b/common/table.h @@ -192,6 +192,7 @@ class TableName_KeySet { } std::string getKeySetName() const { return m_key; } + std::string getStateHashPrefix() const { return "_"; } }; } diff --git a/tests/redis_state_ut.cpp b/tests/redis_state_ut.cpp index 4a3956e56..21e4725c5 100644 --- a/tests/redis_state_ut.cpp +++ b/tests/redis_state_ut.cpp @@ -11,6 +11,7 @@ #include "common/table.h" #include "common/producerstatetable.h" #include "common/consumerstatetable.h" +#include "common/redisclient.h" using namespace std; using namespace swss; @@ -103,6 +104,7 @@ static void consumerWorker(int index) { string tableName = "UT_REDIS_THREAD_" + to_string(index); DBConnector db(TEST_DB, "localhost", 6379, 0); + RedisClient redisClient(&db); ConsumerStateTable c(&db, tableName); Select cs; Selectable *selectcs; @@ -119,9 +121,17 @@ static void consumerWorker(int index) { numberOfKeysSet++; validateFields(kfvKey(kco), kfvFieldsValues(kco)); + + for (auto fv : kfvFieldsValues(kco)) + { + string val = *redisClient.hget(tableName + ":" + kfvKey(kco), fvField(fv)); + EXPECT_EQ(val, fvValue(fv)); + } } else if (kfvOp(kco) == "DEL") { numberOfKeyDeleted++; + auto keys = redisClient.keys(tableName + ":" + kfvKey(kco)); + EXPECT_EQ(0UL, keys.size()); } if ((i++ % 100) == 0) @@ -215,6 +225,13 @@ TEST(ConsumerStateTable, double_set) int ret = cs.select(&selectcs, 1000); EXPECT_EQ(ret, Select::TIMEOUT); } + + /* State Queue should be empty */ + RedisCommand keys; + keys.format("KEYS %s*", (c.getStateHashPrefix() + tableName).c_str()); + RedisReply r(&db, keys, REDIS_REPLY_ARRAY); + auto qlen = r.getContext()->elements; + EXPECT_EQ(qlen, 0U); } TEST(ConsumerStateTable, set_del) @@ -267,6 +284,13 @@ TEST(ConsumerStateTable, set_del) int ret = cs.select(&selectcs, 1000); EXPECT_EQ(ret, Select::TIMEOUT); } + + /* State Queue should be empty */ + RedisCommand keys; + keys.format("KEYS %s*", (p.getStateHashPrefix() + tableName).c_str()); + RedisReply r(&db, keys, REDIS_REPLY_ARRAY); + auto qlen = r.getContext()->elements; + EXPECT_EQ(qlen, 0U); } TEST(ConsumerStateTable, set_del_set) @@ -341,6 +365,13 @@ TEST(ConsumerStateTable, set_del_set) int ret = cs.select(&selectcs, 1000); EXPECT_EQ(ret, Select::TIMEOUT); } + + /* State Queue should be empty */ + RedisCommand keys; + keys.format("KEYS %s*", (c.getStateHashPrefix() + tableName).c_str()); + RedisReply r(&db, keys, REDIS_REPLY_ARRAY); + auto qlen = r.getContext()->elements; + EXPECT_EQ(qlen, 0U); } TEST(ConsumerStateTable, singlethread) @@ -415,6 +446,13 @@ TEST(ConsumerStateTable, singlethread) cout << "Done. Waiting for all job to finish " << NUMBER_OF_OPS << " jobs." << endl; + /* State Queue should be empty */ + RedisCommand keys; + keys.format("KEYS %s*", (c.getStateHashPrefix() + tableName).c_str()); + RedisReply r(&db, keys, REDIS_REPLY_ARRAY); + auto qlen = r.getContext()->elements; + EXPECT_EQ(qlen, 0U); + cout << endl << "Done." << endl; } @@ -442,6 +480,7 @@ TEST(ConsumerStateTable, test) consumerThreads[i]->join(); delete consumerThreads[i]; } + cout << endl << "Done." << endl; } @@ -449,6 +488,7 @@ TEST(ConsumerStateTable, multitable) { DBConnector db(TEST_DB, "localhost", 6379, 0); ConsumerStateTable *consumers[NUMBER_OF_THREADS]; + vector tablenames(NUMBER_OF_THREADS); thread *producerThreads[NUMBER_OF_THREADS]; KeyOpFieldsValuesTuple kco; Select cs; @@ -463,8 +503,8 @@ TEST(ConsumerStateTable, multitable) /* Starting the consumer before the producer */ for (i = 0; i < NUMBER_OF_THREADS; i++) { - consumers[i] = new ConsumerStateTable(&db, string("UT_REDIS_THREAD_") + - to_string(i)); + tablenames[i] = string("UT_REDIS_THREAD_") + to_string(i); + consumers[i] = new ConsumerStateTable(&db, tablenames[i]); producerThreads[i] = new thread(producerWorker, i); } @@ -504,6 +544,13 @@ TEST(ConsumerStateTable, multitable) delete producerThreads[i]; } + + /* State Queue should be empty */ + RedisCommand keys; + keys.format("KEYS %s*", (consumers[0]->getStateHashPrefix() + tablenames[0]).c_str()); + RedisReply r(&db, keys, REDIS_REPLY_ARRAY); + auto qlen = r.getContext()->elements; + EXPECT_EQ(qlen, 0U); + cout << endl << "Done." << endl; } -