Skip to content

Commit 3ef9655

Browse files
committed
redispipeline publish at flush
1 parent aba0f66 commit 3ef9655

4 files changed

Lines changed: 58 additions & 22 deletions

File tree

common/Makefile.am

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ common_libswsscommon_la_SOURCES = \
6464
common/countertable.cpp \
6565
common/redisutility.cpp \
6666
common/restart_waiter.cpp \
67-
common/profileprovider.cpp \
67+
common/profileprovider.cpp \
6868
common/zmqclient.cpp \
6969
common/zmqserver.cpp \
7070
common/asyncdbupdater.cpp \

common/producerstatetable.cpp

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <sstream>
44
#include <utility>
55
#include <algorithm>
6+
#include "redispipeline.h"
67
#include "redisreply.h"
78
#include "table.h"
89
#include "redisapi.h"
@@ -13,40 +14,37 @@ using namespace std;
1314

1415
namespace swss {
1516

16-
ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName)
17-
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false)
17+
ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName, bool flushPub)
18+
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false, flushPub)
1819
{
1920
m_pipeowned = true;
2021
}
2122

22-
ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
23+
ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered, bool flushPub)
2324
: TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDBConnector()))
2425
, TableName_KeySet(tableName)
26+
, m_flushPub(flushPub)
2527
, m_buffered(buffered)
2628
, m_pipeowned(false)
2729
, m_tempViewActive(false)
2830
, m_pipe(pipeline)
2931
{
32+
if (m_flushPub) {
33+
m_pipe->addChannel(getChannelName(m_pipe->getDbId()));
34+
}
35+
3036
// num in luaSet and luaDel means number of elements that were added to the key set,
3137
// not including all the elements already present into the set.
3238
string luaSet =
3339
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
3440
"for i = 0, #KEYS - 3 do\n"
3541
" redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
3642
"end\n"
37-
" if added > 0 then \n"
38-
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
39-
"end\n";
40-
m_shaSet = m_pipe->loadRedisScript(luaSet);
4143

4244
string luaDel =
4345
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
4446
"redis.call('SADD', KEYS[4], ARGV[2])\n"
4547
"redis.call('DEL', KEYS[3])\n"
46-
"if added > 0 then \n"
47-
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
48-
"end\n";
49-
m_shaDel = m_pipe->loadRedisScript(luaDel);
5048

5149
string luaBatchedSet =
5250
"local added = 0\n"
@@ -60,10 +58,6 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
6058
" end\n"
6159
" idx = idx + tonumber(ARGV[idx]) * 2 + 1\n"
6260
"end\n"
63-
"if added > 0 then \n"
64-
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
65-
"end\n";
66-
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);
6761

6862
string luaBatchedDel =
6963
"local added = 0\n"
@@ -72,10 +66,6 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
7266
" redis.call('SADD', KEYS[3], KEYS[5 + i])\n"
7367
" redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n"
7468
"end\n"
75-
"if added > 0 then \n"
76-
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
77-
"end\n";
78-
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);
7969

8070
string luaClear =
8171
"redis.call('DEL', KEYS[1])\n"
@@ -84,6 +74,21 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
8474
" redis.call('DEL', k)\n"
8575
"end\n"
8676
"redis.call('DEL', KEYS[3])\n";
77+
78+
if (!m_flushPub) {
79+
string luaPub =
80+
"if added > 0 then \n"
81+
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
82+
"end\n";
83+
luaSet += luaPub;
84+
luaDel += luaPub;
85+
luaBatchedSet += luaPub;
86+
luaBatchedDel += luaPub;
87+
}
88+
m_shaSet = m_pipe->loadRedisScript(luaSet);
89+
m_shaDel = m_pipe->loadRedisScript(luaDel);
90+
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);
91+
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);
8792
m_shaClear = m_pipe->loadRedisScript(luaClear);
8893

8994
string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua");

common/producerstatetable.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ namespace swss {
1010
class ProducerStateTable : public TableBase, public TableName_KeySet
1111
{
1212
public:
13-
ProducerStateTable(DBConnector *db, const std::string &tableName);
14-
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
13+
ProducerStateTable(DBConnector *db, const std::string &tableName, bool flushPub = false);
14+
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false, bool flushPub = false);
1515
virtual ~ProducerStateTable();
1616

1717
void setBuffered(bool buffered);

common/redispipeline.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include <string>
44
#include <queue>
55
#include <functional>
6+
#include <chrono>
7+
#include <iostream>
68
#include "redisreply.h"
79
#include "rediscommand.h"
810
#include "dbconnector.h"
@@ -22,9 +24,11 @@ class RedisPipeline {
2224
RedisPipeline(const DBConnector *db, size_t sz = 128)
2325
: COMMAND_MAX(sz)
2426
, m_remaining(0)
27+
, m_shaPub("")
2528
{
2629
m_db = db->newConnector(NEWCONNECTOR_TIMEOUT);
2730
initializeOwnerTid();
31+
lastHeartBeat = std::chrono::steady_clock::now();
2832
}
2933

3034
~RedisPipeline() {
@@ -113,11 +117,19 @@ class RedisPipeline {
113117

114118
void flush()
115119
{
120+
lastHeartBeat = std::chrono::steady_clock::now();
121+
122+
if (m_remaining == 0) {
123+
return;
124+
}
125+
116126
while(m_remaining)
117127
{
118128
// Construct an object to use its dtor, so that resource is released
119129
RedisReply r(pop());
120130
}
131+
132+
publish();
121133
}
122134

123135
size_t size()
@@ -145,12 +157,31 @@ class RedisPipeline {
145157
m_ownerTid = gettid();
146158
}
147159

160+
void addChannel(std::string channel)
161+
{
162+
m_luaPub +=
163+
"redis.call('PUBLISH', '" + channel + "', 'G');";
164+
165+
m_shaPub = loadRedisScript(m_luaPub);
166+
}
167+
168+
int getIdleTime(std::chrono::time_point<std::chrono::steady_clock> tcurrent=std::chrono::steady_clock::now())
169+
{
170+
return static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(tcurrent - lastHeartBeat).count());
171+
}
172+
148173
private:
149174
DBConnector *m_db;
150175
std::queue<int> m_expectedTypes;
151176
size_t m_remaining;
152177
long int m_ownerTid;
153178

179+
std::string m_luaPub;
180+
std::string m_shaPub;
181+
std::chrono::time_point<std::chrono::steady_clock> lastHeartBeat;
182+
183+
void publish();
184+
154185
void mayflush()
155186
{
156187
if (m_remaining >= COMMAND_MAX)

0 commit comments

Comments
 (0)