@@ -90,7 +90,10 @@ void RecordResponse(const std::string &response_channel, const std::string &key,
9090
9191} // namespace
9292
93- ResponsePublisher::ResponsePublisher () : m_db(" APPL_STATE_DB" , 0 )
93+ ResponsePublisher::ResponsePublisher (bool buffered) :
94+ m_db(std::make_unique<swss::DBConnector>(" APPL_STATE_DB" , 0 )),
95+ m_pipe(std::make_unique<swss::RedisPipeline>(m_db.get())),
96+ m_buffered(buffered)
9497{
9598}
9699
@@ -107,17 +110,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
107110 }
108111
109112 std::string response_channel = " APPL_DB_" + table + " _RESPONSE_CHANNEL" ;
110- if (m_notifiers.find (table) == m_notifiers.end ())
111- {
112- m_notifiers[table] = std::make_unique<swss::NotificationProducer>(&m_db, response_channel);
113- }
113+ swss::NotificationProducer notificationProducer{m_pipe.get (), response_channel, m_buffered};
114114
115115 auto intent_attrs_copy = intent_attrs;
116116 // Add error message as the first field-value-pair.
117117 swss::FieldValueTuple err_str (" err_str" , PrependedComponent (status) + status.message ());
118118 intent_attrs_copy.insert (intent_attrs_copy.begin (), err_str);
119119 // Sends the response to the notification channel.
120- m_notifiers[table]-> send (status.codeStr (), key, intent_attrs_copy);
120+ notificationProducer. send (status.codeStr (), key, intent_attrs_copy);
121121 RecordResponse (response_channel, key, intent_attrs_copy, status.codeStr ());
122122}
123123
@@ -140,17 +140,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
140140void ResponsePublisher::writeToDB (const std::string &table, const std::string &key,
141141 const std::vector<swss::FieldValueTuple> &values, const std::string &op, bool replace)
142142{
143- if (m_tables.find (table) == m_tables.end ())
144- {
145- m_tables[table] = std::make_unique<swss::Table>(&m_db, table);
146- }
143+ swss::Table applStateTable{m_pipe.get (), table, m_buffered};
147144
148145 auto attrs = values;
149146 if (op == SET_COMMAND)
150147 {
151148 if (replace)
152149 {
153- m_tables[table]-> del (key);
150+ applStateTable. del (key);
154151 }
155152 if (!values.size ())
156153 {
@@ -160,9 +157,9 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
160157 // Write to DB only if the key does not exist or non-NULL attributes are
161158 // being written to the entry.
162159 std::vector<swss::FieldValueTuple> fv;
163- if (!m_tables[table]-> get (key, fv))
160+ if (!applStateTable. get (key, fv))
164161 {
165- m_tables[table]-> set (key, attrs);
162+ applStateTable. set (key, attrs);
166163 RecordDBWrite (table, key, attrs, op);
167164 return ;
168165 }
@@ -179,13 +176,23 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
179176 }
180177 if (attrs.size ())
181178 {
182- m_tables[table]-> set (key, attrs);
179+ applStateTable. set (key, attrs);
183180 RecordDBWrite (table, key, attrs, op);
184181 }
185182 }
186183 else if (op == DEL_COMMAND)
187184 {
188- m_tables[table]-> del (key);
185+ applStateTable. del (key);
189186 RecordDBWrite (table, key, {}, op);
190187 }
191188}
189+
190+ void ResponsePublisher::flush ()
191+ {
192+ m_pipe->flush ();
193+ }
194+
195+ void ResponsePublisher::setBuffered (bool buffered)
196+ {
197+ m_buffered = buffered;
198+ }
0 commit comments