-
Notifications
You must be signed in to change notification settings - Fork 933
Make long waiting consumer and producer API wakeable #2126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes critical usability issues where Consumer.poll() and Consumer.consume() would block indefinitely and not respond to Ctrl+C (KeyboardInterrupt) signals. The solution implements a "wakeable poll" pattern that chunks long timeouts into 200ms intervals and periodically checks for pending signals between chunks, allowing proper signal handling and graceful interruption.
Key Changes:
- Implemented helper functions
calculate_chunk_timeout()andcheck_signals_between_chunks()in C code to enable interruptible polling - Modified
Consumer.poll()andConsumer.consume()to use chunked polling with periodic signal checks - Added comprehensive test coverage for utility functions, interruptibility, edge cases, and message handling
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| tests/test_Consumer.py | Added utility helper send_sigint_after_delay() and 7 new test functions covering chunk timeout calculation, signal detection, utility function interaction, poll/consume interruptibility, and edge cases |
| src/confluent_kafka/src/Consumer.c | Implemented wakeable poll pattern with helper functions and refactored Consumer_poll() and Consumer_consume() to use chunked polling with signal checking |
| CHANGELOG.md | Documented the fix for blocking poll/consume operations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * | ||
| * Instead of a single blocking call to rd_kafka_consumer_poll() with the | ||
| * full timeout, this function: | ||
| * 1. Splits the timeout into 200ms chunks |
Copilot
AI
Nov 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The function documentation would benefit from documenting the CHUNK_TIMEOUT_MS constant value (200ms) in the description to match the implementation details mentioned in comment lines.
| * 1. Splits the timeout into 200ms chunks | |
| * 1. Splits the timeout into 200ms chunks (CHUNK_TIMEOUT_MS = 200ms) |
| return NULL; | ||
| } | ||
|
|
||
| /* Create Python list from messages */ |
Copilot
AI
Nov 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra whitespace before closing comment marker.
| /* Create Python list from messages */ | |
| /* Create Python list from messages */ |
MSeal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor comment, tested out things locally with a 30s timeout with this vs master... much better experience of always being able to interrupt.
tests/test_Consumer.py
Outdated
| time.sleep(delay_seconds) | ||
| try: | ||
| os.kill(os.getpid(), signal.SIGINT) | ||
| except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably catch the KeyboardInterrupt here instead so you don't mask other errors being raised instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
os.kill() raises OSError, not KeyboardInterrupt, i have removed try catch block as any exception raised in this function should not be masked
|
dcc1639 to
6180f01
Compare
6180f01 to
5a86d02
Compare
|
MSeal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor changes requested but up to you if you feel they are worth it. Test blocks were a bit dense/difficult to read after a long weekend losing mental context. I'm not sure how to make them easier to maintain but the next person coming along who has to fix those for future change will need to reread them a few times I think.
| /* CallState_end detected signal and cleaned up */ | ||
| return 1; /* Signal detected */ | ||
| } | ||
| return 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We simply always return 1 -- was this intentional? Seems like we'd either have a return 0 in one call path or cleanup the code to just call CallState_end and then return no matter what
| } | ||
|
|
||
| /* Re-release GIL for next iteration */ | ||
| cs->thread_state = PyEval_SaveThread(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe put this in an else block so you have clear control paths for calling PyEval_SaveThread appropriately if things get refactored. I've found those explicit conditional blocks help a lot down the line for clarity of intent after additional changes
| /**************************************************************************** | ||
| * | ||
| * | ||
| * Consumer Methods |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to have a giant comment block it maybe should be more descriptive. Though I'm not sure it's highly valuable here to do this anyway
| /* Final GIL restore and signal check */ | ||
| if (!CallState_end(self, &cs)) { | ||
| if (rkm) | ||
| rd_kafka_message_destroy(rkm); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I always found C code less risky to refactor errors with single line if's include wrapping brackets, but I don't believe we follow that convention here as of now. If we're doing that convention within a function let's be consistent to the rest of the calls in the same scope imo
| /** | ||
| * @brief Consume a batch of messages from the subscribed topics. | ||
| * | ||
| * Instead of a single blocking call to rd_kafka_consume_batch_queue() with the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing to change but commenting that just enough code differences that it's probably best left separate even though it looks similar enough to share more code.


Summary
This PR fixes a critical usability issue where
Consumer.poll(),Consumer.consume(),Producer.poll(), andProducer.flush()would block indefinitely and not respond to Ctrl+C (KeyboardInterrupt) signals.Fixes: #209 and #807
Problem
When calling blocking operations with infinite or very long timeouts:
Consumer.poll()/Consumer.consume()- would block indefinitely waiting for messagesProducer.poll()/Producer.flush()- would block indefinitely waiting for delivery callbacks or queue flushingBecause these operations release the Python Global Interpreter Lock (GIL) during blocking calls to the underlying librdkafka C library, Python's signal handling mechanism couldn't detect Ctrl+C signals, making it impossible to gracefully interrupt these operations.
Solution
The fix implements a "wakeable poll" pattern that:
Chunks long timeouts: Instead of making a single blocking call with the full timeout, the implementation breaks it into smaller 200ms chunks
Periodic signal checking: Between chunks, the code re-acquires the Python GIL and calls
PyErr_CheckSignals()to detect pending KeyboardInterrupt signalsImmediate interruption: If a signal is detected, the operation returns immediately, allowing the KeyboardInterrupt to propagate to the Python code
Impact
This fix significantly improves the developer and operational experience for applications using Python client.
Before this fix:
flush()operations couldn't be interrupted, causing issues during shutdownAfter this fix:
Testing
Consumer Tests (
tests/test_Consumer.py)Utility function tests:
test_calculate_chunk_timeout_utility_function(): Tests chunk timeout calculation logictest_check_signals_between_chunks_utility_function(): Tests signal detection between chunkstest_wakeable_poll_utility_functions_interaction(): Tests interaction between both utilitiesPoll interruptibility tests:
test_wakeable_poll_interruptibility_and_messages(): Tests poll() can be interrupted and still handles messages correctlytest_wakeable_poll_edge_cases(): Tests edge cases (zero timeout, closed consumer, short timeouts)Consume interruptibility tests:
test_wakeable_consume_interruptibility_and_messages(): Tests consume() can be interrupted and still handles messages correctlytest_wakeable_consume_edge_cases(): Tests edge cases (zero timeout, invalid parameters, short timeouts)Producer Tests (
tests/test_Producer.py)Utility function tests:
test_wakeable_poll_utility_functions_interaction(): Tests chunk calculation and signal checking work togetherPoll interruptibility tests:
test_wakeable_poll_interruptibility_and_messages(): Tests poll() can be interrupted and still handles delivery callbacks correctlytest_wakeable_poll_edge_cases(): Tests edge cases (zero timeout, closed producer, short timeouts)Flush interruptibility tests:
test_wakeable_flush_interruptibility_and_messages(): Tests flush() can be interrupted and still delivers messages correctlytest_wakeable_flush_edge_cases(): Tests edge cases (zero timeout, closed producer, short timeouts, empty queue)Shared Utility Tests (
tests/test_wakeable_utilities.py)Integration Tests
tests/integration/consumer/test_consumer_wakeable_poll_consume.py: Verifies wakeable pattern doesn't interfere with normal message consumptiontests/integration/producer/test_producer_wakeable_poll_flush.py: Verifies wakeable pattern doesn't interfere with normal message production and delivery callbacksAll tests use a helper function
TestUtils.send_sigint_after_delay()to simulate Ctrl+C in automated tests.Performance Impact
Manual Testing
Replicable code
test_wakeable_consume_interrupt.py
test_wakeable_poll_interrupt.py
test_wakeable_producer_flush_interrupt.py
test_wakeable_producer_poll_interrupt.py
Example Run