-
-
Notifications
You must be signed in to change notification settings - Fork 980
Description
Hi team 👋 I've been a long-time supported of Celery and now find myself having to integrate with SQS. So what better package to use than Kombu! Thank you for the dedication over the years and for the excellent implementations.
In #2226 support was added for transport_options.fetch_message_attributes to receive all MessageAttributes from a ReceiveMessage request.
However, this is only applied in the async flow of AsyncSQSConnection.
In kombu.transport.SQS.Channel._get or kombu.transport.SQS.Channel._get_bulk where self.sqs.receive_message is used the self.sqs object is a boto3.Client and those fetch_message_attributes are not passed.
The result is that we can send MessageAttributes / MessageAttributesNames with publish (by doing product.publish(...message_attributes={...})) but we cannot read back.
How best to implement this I'm not sure but I at least wanted to put it on your radar. I'm happy to take on the work if you point me in the right direction.
For now I'm monkeypatching _get and _get_bulk with
resp = self.sqs(queue=queue).receive_message(
QueueUrl=q_url, MaxNumberOfMessages=max_count,
MessageAttributeNames=self.fetch_message_attributes,
WaitTimeSeconds=self.wait_time_seconds)
as that returns all MessageAttributes I want to read.