@@ -66,14 +66,14 @@ vector<Selectable *> Orch::getSelectables()
6666 return selectables;
6767}
6868
69- void Consumer::addToSync (std::deque<KeyOpFieldsValuesTuple> &entries)
69+ size_t Consumer::addToSync (std::deque<KeyOpFieldsValuesTuple> &entries)
7070{
7171 SWSS_LOG_ENTER ();
7272
7373 /* Nothing popped */
7474 if (entries.empty ())
7575 {
76- return ;
76+ return 0 ;
7777 }
7878
7979 for (auto & entry: entries)
@@ -120,10 +120,11 @@ void Consumer::addToSync(std::deque<KeyOpFieldsValuesTuple> &entries)
120120 m_toSync[key] = KeyOpFieldsValuesTuple (key, op, existing_values);
121121 }
122122 }
123+ return entries.size ();
123124}
124125
125126// TODO: Table should be const
126- void Consumer::refillToSync (Table* table)
127+ size_t Consumer::refillToSync (Table* table)
127128{
128129 std::deque<KeyOpFieldsValuesTuple> entries;
129130 vector<string> keys;
@@ -142,15 +143,28 @@ void Consumer::refillToSync(Table* table)
142143 entries.push_back (kco);
143144 }
144145
145- addToSync (entries);
146+ return addToSync (entries);
146147}
147148
148- void Consumer::refillToSync ()
149+ size_t Consumer::refillToSync ()
149150{
150- auto db = getConsumerTable ()->getDbConnector ();
151- string tableName = getConsumerTable ()->getTableName ();
152- auto table = Table (db, tableName);
153- refillToSync (&table);
151+ ConsumerTableBase *consumerTable = getConsumerTable ();
152+
153+ auto subTable = dynamic_cast <SubscriberStateTable *>(consumerTable);
154+ if (subTable != NULL )
155+ {
156+ std::deque<KeyOpFieldsValuesTuple> entries;
157+ subTable->pops (entries);
158+ return addToSync (entries);
159+ }
160+ else
161+ {
162+ // consumerTable is either ConsumerStateTable or ConsumerTable
163+ auto db = consumerTable->getDbConnector ();
164+ string tableName = consumerTable->getTableName ();
165+ auto table = Table (db, tableName);
166+ return refillToSync (&table);
167+ }
154168}
155169
156170void Consumer::execute ()
@@ -171,31 +185,50 @@ void Consumer::drain()
171185 m_orch->doTask (*this );
172186}
173187
174- bool Orch::addExistingData (const string& tableName)
188+ size_t Orch::addExistingData (const string& tableName)
175189{
176- Consumer* consumer = dynamic_cast <Consumer *>(getExecutor (tableName));
190+ auto consumer = dynamic_cast <Consumer *>(getExecutor (tableName));
177191 if (consumer == NULL )
178192 {
179193 SWSS_LOG_ERROR (" No consumer %s in Orch" , tableName.c_str ());
180- return false ;
194+ return 0 ;
181195 }
182196
183- consumer->refillToSync ();
184- return true ;
197+ return consumer->refillToSync ();
185198}
186199
187200// TODO: Table should be const
188- bool Orch::addExistingData (Table *table)
201+ size_t Orch::addExistingData (Table *table)
189202{
190203 string tableName = table->getTableName ();
191204 Consumer* consumer = dynamic_cast <Consumer *>(getExecutor (tableName));
192205 if (consumer == NULL )
193206 {
194207 SWSS_LOG_ERROR (" No consumer %s in Orch" , tableName.c_str ());
195- return false ;
208+ return 0 ;
209+ }
210+
211+ return consumer->refillToSync (table);
212+ }
213+
214+ bool Orch::bake ()
215+ {
216+ SWSS_LOG_ENTER ();
217+
218+ for (auto &it : m_consumerMap)
219+ {
220+ string executorName = it.first ;
221+ auto executor = it.second ;
222+ auto consumer = dynamic_cast <Consumer *>(executor.get ());
223+ if (consumer == NULL )
224+ {
225+ continue ;
226+ }
227+
228+ size_t refilled = consumer->refillToSync ();
229+ SWSS_LOG_NOTICE (" Add warm input: %s, %zd" , executorName.c_str (), refilled);
196230 }
197231
198- consumer->refillToSync (table);
199232 return true ;
200233}
201234
0 commit comments