Skip to content

Conversation

@SaniyaKalamkar
Copy link

Fixes numaproj/numaflow#3061. Creates MQTT UDSource that subscribes to an MQTT broker and emits messages into Numaflow pipelines.

Added mqtt_udsource.py, main MQTT UDS implementation.
Added requirements.txt for Python dependencies
Added Dockerfile for container image

Signed-off-by: “Saniya Kalamkar  <[email protected].>
Signed-off-by: Saniya Kalamkar <[email protected]>
@vigith
Copy link
Member

vigith commented Nov 16, 2025

Could you please add a pipeline.yaml for running this?

Signed-off-by: Saniya Kalamkar <[email protected]>
Copy link
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you.. how did you deploy the mqtt in k8s? was it a deployment? if yes, do you mind putting that manifest too?

Signed-off-by: Saniya Kalamkar <[email protected]>
Signed-off-by: Saniya Kalamkar <[email protected]>
@SaniyaKalamkar
Copy link
Author

Yeah, deployed using Mosquitto MQTT broker. I created a ISB service, created docker container and loaded into k8s, deployed Mosquitto, deployed pipeline, then sent test messages and checked logs. Let me know if you need any more details.

@vigith
Copy link
Member

vigith commented Nov 17, 2025

Can you add a README.md file to explain this? also, move the .yaml files to a sub-dir called manifest.

@SaniyaKalamkar
Copy link
Author

Yeah of course, sounds good.

@vigith
Copy link
Member

vigith commented Nov 17, 2025

Lastly, please document how to send some data to the MQTT brokers so users can experiment by sending test messages.

Comment on lines +102 to +103
except asyncio.QueueEmpty:
payload = f"dummy-{idx}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create dummy payloads which is not right. You do not have to return anything if there is no data. In the README, just add how you can send some data to MQTT broker via command-line.

{"timestamp":"2025-11-17T00:23:16.613066Z","level":"INFO","message":"Payload - dummy-669498 Keys -  EventTime - 1763338996579 Headers - x-txn-id: 9cac40fb-4c59-4df1-ac23-d77128c2105d,  ID - log-sink-669499-0-0","target":"numaflow_core::sinker::sink::log"}
{"timestamp":"2025-11-17T00:23:16.613068Z","level":"INFO","message":"Payload - dummy-669499 Keys -  EventTime - 1763338996579 Headers - x-txn-id: 6a429484-9fb6-4342-ad72-d598499f413f,  ID - log-sink-669500-0-0","target":"numaflow_core::sinker::sink::log"}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement a Python User-Defined-Source for MQTT

3 participants