Skip to content

Commit 1d27f30

Browse files
committed
redispipeline publish at flush
1 parent 24979b0 commit 1d27f30

4 files changed

Lines changed: 95 additions & 34 deletions

File tree

common/producerstatetable.cpp

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,39 +13,37 @@ using namespace std;
1313

1414
namespace swss {
1515

16-
ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName)
17-
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false)
16+
ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName, bool flushPub)
17+
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false, flushPub)
1818
{
1919
m_pipeowned = true;
2020
}
2121

22-
ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
22+
ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered, bool flushPub)
2323
: TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDBConnector()))
2424
, TableName_KeySet(tableName)
25+
, m_flushPub(flushPub)
2526
, m_buffered(buffered)
2627
, m_pipeowned(false)
2728
, m_tempViewActive(false)
2829
, m_pipe(pipeline)
2930
{
31+
if (m_flushPub) {
32+
m_pipe->addChannel(getChannelName(m_pipe->getDbId()));
33+
}
3034
// num in luaSet and luaDel means number of elements that were added to the key set,
3135
// not including all the elements already present into the set.
3236
string luaSet =
3337
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
3438
"for i = 0, #KEYS - 3 do\n"
3539
" redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
36-
"end\n"
37-
" if added > 0 then \n"
38-
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
3940
"end\n";
4041
m_shaSet = m_pipe->loadRedisScript(luaSet);
4142

4243
string luaDel =
4344
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
4445
"redis.call('SADD', KEYS[4], ARGV[2])\n"
45-
"redis.call('DEL', KEYS[3])\n"
46-
"if added > 0 then \n"
47-
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
48-
"end\n";
46+
"redis.call('DEL', KEYS[3])\n";
4947
m_shaDel = m_pipe->loadRedisScript(luaDel);
5048

5149
string luaBatchedSet =
@@ -59,9 +57,6 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
5957
" redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n"
6058
" end\n"
6159
" idx = idx + tonumber(ARGV[idx]) * 2 + 1\n"
62-
"end\n"
63-
"if added > 0 then \n"
64-
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
6560
"end\n";
6661
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);
6762

@@ -71,9 +66,6 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
7166
" added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n"
7267
" redis.call('SADD', KEYS[3], KEYS[5 + i])\n"
7368
" redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n"
74-
"end\n"
75-
"if added > 0 then \n"
76-
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
7769
"end\n";
7870
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);
7971

@@ -88,6 +80,21 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
8880

8981
string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua");
9082
m_shaApplyView = m_pipe->loadRedisScript(luaApplyView);
83+
84+
if (!m_flushPub) {
85+
string luaPub =
86+
"if added > 0 then \n"
87+
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
88+
"end\n";
89+
luaSet += luaPub;
90+
luaDel += luaPub;
91+
luaBatchedSet += luaPub;
92+
luaBatchedDel += luaPub;
93+
m_shaSet = m_pipe->loadRedisScript(luaSet);
94+
m_shaDel = m_pipe->loadRedisScript(luaDel);
95+
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);
96+
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);
97+
}
9198
}
9299

93100
ProducerStateTable::~ProducerStateTable()

common/producerstatetable.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ namespace swss {
1010
class ProducerStateTable : public TableBase, public TableName_KeySet
1111
{
1212
public:
13-
ProducerStateTable(DBConnector *db, const std::string &tableName);
14-
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
13+
ProducerStateTable(DBConnector *db, const std::string &tableName, bool flushPub = false);
14+
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false, bool flushPub = false);
1515
virtual ~ProducerStateTable();
1616

1717
void setBuffered(bool buffered);
@@ -51,6 +51,7 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
5151

5252
void apply_temp_view();
5353
private:
54+
bool m_flushPub;
5455
bool m_buffered;
5556
bool m_pipeowned;
5657
bool m_tempViewActive;

common/redispipeline.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
#include <string>
44
#include <queue>
5+
#include <unordered_set>
56
#include <functional>
7+
#include <chrono>
8+
#include <iostream>
69
#include "redisreply.h"
710
#include "rediscommand.h"
811
#include "dbconnector.h"
@@ -22,9 +25,11 @@ class RedisPipeline {
2225
RedisPipeline(const DBConnector *db, size_t sz = 128)
2326
: COMMAND_MAX(sz)
2427
, m_remaining(0)
28+
, m_shaPub("")
2529
{
2630
m_db = db->newConnector(NEWCONNECTOR_TIMEOUT);
2731
initializeOwnerTid();
32+
lastHeartBeat = std::chrono::steady_clock::now();
2833
}
2934

3035
~RedisPipeline() {
@@ -113,11 +118,19 @@ class RedisPipeline {
113118

114119
void flush()
115120
{
121+
lastHeartBeat = std::chrono::steady_clock::now();
122+
123+
if (m_remaining == 0) {
124+
return;
125+
}
126+
116127
while(m_remaining)
117128
{
118129
// Construct an object to use its dtor, so that resource is released
119130
RedisReply r(pop());
120131
}
132+
133+
publish();
121134
}
122135

123136
size_t size()
@@ -145,12 +158,43 @@ class RedisPipeline {
145158
m_ownerTid = gettid();
146159
}
147160

161+
void addChannel(std::string channel)
162+
{
163+
if (m_channels.find(channel) != m_channels.end())
164+
return;
165+
166+
m_channels.insert(channel);
167+
m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G');";
168+
m_shaPub = loadRedisScript(m_luaPub);
169+
}
170+
171+
int getIdleTime(std::chrono::time_point<std::chrono::steady_clock> tcurrent=std::chrono::steady_clock::now())
172+
{
173+
return static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(tcurrent - lastHeartBeat).count());
174+
}
175+
176+
void publish() {
177+
if (m_shaPub == "") {
178+
return;
179+
}
180+
RedisCommand cmd;
181+
cmd.format(
182+
"EVALSHA %s 0",
183+
m_shaPub.c_str());
184+
RedisReply r(m_db, cmd);
185+
}
186+
148187
private:
149188
DBConnector *m_db;
150189
std::queue<int> m_expectedTypes;
151190
size_t m_remaining;
152191
long int m_ownerTid;
153192

193+
std::string m_luaPub;
194+
std::string m_shaPub;
195+
std::chrono::time_point<std::chrono::steady_clock> lastHeartBeat; // marks the timestamp of latest pipeline flush being invoked
196+
std::unordered_set<std::string> m_channels;
197+
154198
void mayflush()
155199
{
156200
if (m_remaining >= COMMAND_MAX)

tests/redis_piped_state_ut.cpp

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,12 @@ static inline void validateFields(const string& key, const vector<FieldValueTupl
7474
}
7575
}
7676

77-
static void producerWorker(int index)
77+
static void producerWorker(int index, bool flushPub)
7878
{
7979
string tableName = "UT_REDIS_THREAD_" + to_string(index);
8080
DBConnector db(TEST_DB, 0, true);
8181
RedisPipeline pipeline(&db);
82-
ProducerStateTable p(&pipeline, tableName, true);
82+
ProducerStateTable p(&pipeline, tableName, true, flushPub);
8383

8484
for (int i = 0; i < NUMBER_OF_OPS; i++)
8585
{
@@ -112,24 +112,27 @@ static void consumerWorker(int index)
112112
int numberOfKeysSet = 0;
113113
int numberOfKeyDeleted = 0;
114114
int ret, i = 0;
115-
KeyOpFieldsValuesTuple kco;
115+
std::deque<KeyOpFieldsValuesTuple> entries;
116116

117117
cs.addSelectable(&c);
118118
while ((ret = cs.select(&selectcs)) == Select::OBJECT)
119119
{
120-
c.pop(kco);
121-
if (kfvOp(kco) == "SET")
122-
{
123-
numberOfKeysSet++;
124-
validateFields(kfvKey(kco), kfvFieldsValues(kco));
125-
} else if (kfvOp(kco) == "DEL")
120+
c.pops(entries);
121+
122+
for (auto& kco: entries)
126123
{
127-
numberOfKeyDeleted++;
124+
if (kfvOp(kco) == "SET")
125+
{
126+
numberOfKeysSet++;
127+
validateFields(kfvKey(kco), kfvFieldsValues(kco));
128+
} else if (kfvOp(kco) == "DEL")
129+
{
130+
numberOfKeyDeleted++;
131+
}
132+
133+
if ((i++ % 100) == 0)
134+
cout << "-" << flush;
128135
}
129-
130-
if ((i++ % 100) == 0)
131-
cout << "-" << flush;
132-
133136
if (numberOfKeyDeleted == NUMBER_OF_OPS)
134137
break;
135138
}
@@ -654,7 +657,10 @@ TEST(ConsumerStateTable, async_test)
654657
for (int i = 0; i < NUMBER_OF_THREADS; i++)
655658
{
656659
consumerThreads[i] = new thread(consumerWorker, i);
657-
producerThreads[i] = new thread(producerWorker, i);
660+
if (i < NUMBER_OF_THREADS/2)
661+
producerThreads[i] = new thread(producerWorker, i, false);
662+
else
663+
producerThreads[i] = new thread(producerWorker, i, true);
658664
}
659665

660666
cout << "Done. Waiting for all job to finish " << NUMBER_OF_OPS << " jobs." << endl;
@@ -689,7 +695,10 @@ TEST(ConsumerStateTable, async_multitable)
689695
{
690696
consumers[i] = new ConsumerStateTable(&db, string("UT_REDIS_THREAD_") +
691697
to_string(i));
692-
producerThreads[i] = new thread(producerWorker, i);
698+
if (i < NUMBER_OF_THREADS/2)
699+
producerThreads[i] = new thread(producerWorker, i, false);
700+
else
701+
producerThreads[i] = new thread(producerWorker, i, true);
693702
}
694703

695704
for (i = 0; i < NUMBER_OF_THREADS; i++)

0 commit comments

Comments
 (0)