@@ -64,26 +64,52 @@ void RecordResponse(const std::string &response_channel, const std::string &key,
6464
6565} // namespace
6666
67- ResponsePublisher::ResponsePublisher (bool buffered)
68- : m_db(std::make_unique<swss::DBConnector>(" APPL_STATE_DB" , 0 )),
69- m_pipe(std::make_unique<swss::RedisPipeline>(m_db.get())), m_buffered(buffered)
67+ ResponsePublisher::ResponsePublisher (const std::string &dbName, bool buffered, bool db_write_thread)
68+ : m_db(std::make_unique<swss::DBConnector>(dbName, 0 )), m_buffered(buffered)
7069{
70+ if (m_buffered)
71+ {
72+ m_ntf_pipe = std::make_unique<swss::RedisPipeline>(m_db.get ());
73+ m_pipe = std::make_unique<swss::RedisPipeline>(m_db.get ());
74+ }
75+ else
76+ {
77+ m_ntf_pipe = std::make_unique<swss::RedisPipeline>(m_db.get (), 1 );
78+ m_pipe = std::make_unique<swss::RedisPipeline>(m_db.get (), 1 );
79+ }
80+ if (db_write_thread)
81+ {
82+ m_update_thread = std::unique_ptr<std::thread>(new std::thread (&ResponsePublisher::dbUpdateThread, this ));
83+ }
7184}
7285
73- void ResponsePublisher::publish (const std::string &table, const std::string &key,
74- const std::vector<swss::FieldValueTuple> &intent_attrs, const ReturnCode &status,
75- const std::vector<swss::FieldValueTuple> &state_attrs, bool replace)
86+ ResponsePublisher::~ResponsePublisher ()
7687{
77- // Write to the DB only if:
78- // 1) A write operation is being performed and state attributes are specified.
79- // 2) A successful delete operation.
80- if ((intent_attrs.size () && state_attrs.size ()) || (status.ok () && !intent_attrs.size ()))
88+ if (m_update_thread != nullptr )
8189 {
82- writeToDB (table, key, state_attrs, intent_attrs.size () ? SET_COMMAND : DEL_COMMAND, replace);
90+ {
91+ std::lock_guard<std::mutex> lock (m_lock);
92+ m_queue.push (entry{
93+ .table = " " ,
94+ .key = " " ,
95+ .values = std::vector<swss::FieldValueTuple>{},
96+ .op = " " ,
97+ .replace = false ,
98+ .flush = false ,
99+ .shutdown = true ,
100+ });
101+ }
102+ m_signal.notify_one ();
103+ m_update_thread->join ();
83104 }
105+ }
84106
107+ void ResponsePublisher::publish (const std::string &table, const std::string &key,
108+ const std::vector<swss::FieldValueTuple> &intent_attrs, const ReturnCode &status,
109+ const std::vector<swss::FieldValueTuple> &state_attrs, bool replace)
110+ {
85111 std::string response_channel = " APPL_DB_" + table + " _RESPONSE_CHANNEL" ;
86- swss::NotificationProducer notificationProducer{m_pipe .get (), response_channel, m_buffered};
112+ swss::NotificationProducer notificationProducer{m_ntf_pipe .get (), response_channel, m_buffered};
87113
88114 auto intent_attrs_copy = intent_attrs;
89115 // Add error message as the first field-value-pair.
@@ -92,6 +118,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
92118 // Sends the response to the notification channel.
93119 notificationProducer.send (status.codeStr (), key, intent_attrs_copy);
94120 RecordResponse (response_channel, key, intent_attrs_copy, status.codeStr ());
121+
122+ // Write to the DB only if:
123+ // 1) A write operation is being performed and state attributes are specified.
124+ // 2) A successful delete operation.
125+ if ((intent_attrs.size () && state_attrs.size ()) || (status.ok () && !intent_attrs.size ()))
126+ {
127+ writeToDB (table, key, state_attrs, intent_attrs.size () ? SET_COMMAND : DEL_COMMAND, replace);
128+ }
95129}
96130
97131void ResponsePublisher::publish (const std::string &table, const std::string &key,
@@ -112,6 +146,33 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
112146
113147void ResponsePublisher::writeToDB (const std::string &table, const std::string &key,
114148 const std::vector<swss::FieldValueTuple> &values, const std::string &op, bool replace)
149+ {
150+ if (m_update_thread != nullptr )
151+ {
152+ {
153+ std::lock_guard<std::mutex> lock (m_lock);
154+ m_queue.push (entry{
155+ .table = table,
156+ .key = key,
157+ .values = values,
158+ .op = op,
159+ .replace = replace,
160+ .flush = false ,
161+ .shutdown = false ,
162+ });
163+ }
164+ m_signal.notify_one ();
165+ }
166+ else
167+ {
168+ writeToDBInternal (table, key, values, op, replace);
169+ }
170+ RecordDBWrite (table, key, values, op);
171+ }
172+
173+ void ResponsePublisher::writeToDBInternal (const std::string &table, const std::string &key,
174+ const std::vector<swss::FieldValueTuple> &values, const std::string &op,
175+ bool replace)
115176{
116177 swss::Table applStateTable{m_pipe.get (), table, m_buffered};
117178
@@ -133,7 +194,6 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
133194 if (!applStateTable.get (key, fv))
134195 {
135196 applStateTable.set (key, attrs);
136- RecordDBWrite (table, key, attrs, op);
137197 return ;
138198 }
139199 for (auto it = attrs.cbegin (); it != attrs.cend ();)
@@ -150,22 +210,70 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
150210 if (attrs.size ())
151211 {
152212 applStateTable.set (key, attrs);
153- RecordDBWrite (table, key, attrs, op);
154213 }
155214 }
156215 else if (op == DEL_COMMAND)
157216 {
158217 applStateTable.del (key);
159- RecordDBWrite (table, key, {}, op);
160218 }
161219}
162220
163221void ResponsePublisher::flush ()
164222{
165- m_pipe->flush ();
223+ m_ntf_pipe->flush ();
224+ if (m_update_thread != nullptr )
225+ {
226+ {
227+ std::lock_guard<std::mutex> lock (m_lock);
228+ m_queue.push (entry{
229+ .table = " " ,
230+ .key = " " ,
231+ .values = std::vector<swss::FieldValueTuple>{},
232+ .op = " " ,
233+ .replace = false ,
234+ .flush = true ,
235+ .shutdown = false ,
236+ });
237+ }
238+ m_signal.notify_one ();
239+ }
240+ else
241+ {
242+ m_pipe->flush ();
243+ }
166244}
167245
168246void ResponsePublisher::setBuffered (bool buffered)
169247{
170248 m_buffered = buffered;
171249}
250+
251+ void ResponsePublisher::dbUpdateThread ()
252+ {
253+ while (true )
254+ {
255+ entry e;
256+ {
257+ std::unique_lock<std::mutex> lock (m_lock);
258+ while (m_queue.empty ())
259+ {
260+ m_signal.wait (lock);
261+ }
262+
263+ e = m_queue.front ();
264+ m_queue.pop ();
265+ }
266+ if (e.shutdown )
267+ {
268+ break ;
269+ }
270+ if (e.flush )
271+ {
272+ m_pipe->flush ();
273+ }
274+ else
275+ {
276+ writeToDBInternal (e.table , e.key , e.values , e.op , e.replace );
277+ }
278+ }
279+ }
0 commit comments