@@ -10,6 +10,25 @@ namespace consumer_test
1010{
1111 using namespace std ;
1212
13+ class TestOrch : public Orch
14+ {
15+ public:
16+ TestOrch (swss::DBConnector *db, string tableName)
17+ :Orch(db, tableName),
18+ m_notification_count (0 )
19+ {
20+ }
21+
22+ void doTask (Consumer& consumer)
23+ {
24+ std::cout << " TestOrch::doTask " << consumer.m_toSync .size () << std::endl;
25+ m_notification_count += consumer.m_toSync .size ();
26+ consumer.m_toSync .clear ();
27+ }
28+
29+ long m_notification_count;
30+ };
31+
1332 struct ConsumerTest : public ::testing::Test
1433 {
1534 shared_ptr<swss::DBConnector> m_app_db;
@@ -322,4 +341,31 @@ namespace consumer_test
322341 validate_syncmap (consumer->m_toSync , 1 , key, exp_kofv);
323342
324343 }
344+
345+ TEST_F (ConsumerTest, ConsumerPops_notification_count)
346+ {
347+ int consumer_pops_batch_size = 10 ;
348+ TestOrch test_orch (m_config_db.get (), " CFG_TEST_TABLE" );
349+ Consumer test_consumer (
350+ new swss::ConsumerStateTable (m_config_db.get (), " CFG_TEST_TABLE" , consumer_pops_batch_size, 1 ), &test_orch, " CFG_TEST_TABLE" );
351+ swss::ProducerStateTable producer_table (m_config_db.get (), " CFG_TEST_TABLE" );
352+
353+ m_config_db->flushdb ();
354+ for (int notification_count = 0 ; notification_count< consumer_pops_batch_size*2 ; notification_count++)
355+ {
356+ std::vector<FieldValueTuple> fields;
357+ FieldValueTuple t (" test_field" , " test_value" );
358+ fields.push_back (t);
359+ producer_table.set (std::to_string (notification_count), fields);
360+
361+ cout << " ConsumerPops_notification_count:: add key: " << notification_count << endl;
362+ }
363+
364+ // consumer should pops consumer_pops_batch_size notifications
365+ test_consumer.execute ();
366+ ASSERT_EQ (test_orch.m_notification_count , consumer_pops_batch_size);
367+
368+ test_consumer.execute ();
369+ ASSERT_EQ (test_orch.m_notification_count , consumer_pops_batch_size*2 );
370+ }
325371}
0 commit comments