@@ -51,20 +51,20 @@ Message RoundRobinPollStrategy::poll() {
5151
5252Message RoundRobinPollStrategy::poll (milliseconds timeout) {
5353 // Always give priority to group and global events
54- Message message = get_consumer_queue ().queue_ .consume (milliseconds (0 ));
54+ Message message = get_consumer_queue ().queue .consume (milliseconds (0 ));
5555 if (message) {
5656 return message;
5757 }
5858 size_t num_queues = get_partition_queues ().size ();
5959 while (num_queues--) {
6060 // consume the next partition (non-blocking)
61- message = get_next_queue ().queue_ .consume (milliseconds (0 ));
61+ message = get_next_queue ().queue .consume (milliseconds (0 ));
6262 if (message) {
6363 return message;
6464 }
6565 }
6666 // We still don't have a valid message so we block on the event queue
67- return get_consumer_queue ().queue_ .consume (timeout);
67+ return get_consumer_queue ().queue .consume (timeout);
6868}
6969
7070MessageList RoundRobinPollStrategy::poll_batch (size_t max_batch_size) {
@@ -76,25 +76,24 @@ MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size, millisecon
7676 ssize_t count = max_batch_size;
7777
7878 // batch from the group event queue first (non-blocking)
79- consume_batch (get_consumer_queue ().queue_ , messages, count, milliseconds (0 ));
79+ consume_batch (get_consumer_queue ().queue , messages, count, milliseconds (0 ));
8080 size_t num_queues = get_partition_queues ().size ();
8181 while ((count > 0 ) && (num_queues--)) {
8282 // batch from the next partition (non-blocking)
83- consume_batch (get_next_queue ().queue_ , messages, count, milliseconds (0 ));
83+ consume_batch (get_next_queue ().queue , messages, count, milliseconds (0 ));
8484 }
8585 // we still have space left in the buffer
8686 if (count > 0 ) {
8787 // wait on the event queue until timeout
88- consume_batch (get_consumer_queue ().queue_ , messages, count, timeout);
88+ consume_batch (get_consumer_queue ().queue , messages, count, timeout);
8989 }
9090 return messages;
9191}
9292
9393void RoundRobinPollStrategy::consume_batch (Queue& queue,
9494 MessageList& messages,
9595 ssize_t & count,
96- milliseconds timeout)
97- {
96+ milliseconds timeout) {
9897 MessageList queue_messages = queue.consume_batch (count, timeout);
9998 if (queue_messages.empty ()) {
10099 return ;
@@ -111,11 +110,11 @@ void RoundRobinPollStrategy::consume_batch(Queue& queue,
111110void RoundRobinPollStrategy::restore_forwarding () {
112111 // forward all partition queues
113112 for (const auto & toppar : get_partition_queues ()) {
114- toppar.second .queue_ .forward_to_queue (get_consumer_queue ().queue_ );
113+ toppar.second .queue .forward_to_queue (get_consumer_queue ().queue );
115114 }
116115}
117116
118- QueueData& RoundRobinPollStrategy::get_next_queue (void * opaque ) {
117+ QueueData& RoundRobinPollStrategy::get_next_queue () {
119118 if (get_partition_queues ().empty ()) {
120119 throw QueueException (RD_KAFKA_RESP_ERR__STATE);
121120 }
0 commit comments