Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@

jobs:
unit-tests:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
strategy:
fail-fast: false

steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.8'
- name: Unit tests
run: |
python3 setup.py install
pip install pytest
pip install mock
python3 -m pytest test

integration-tests:

Check warning

Code scanning / CodeQL

Workflow does not contain permissions Medium

Actions job or workflow does not limit the permissions of the GITHUB_TOKEN. Consider setting an explicit permissions block, using the following as a minimal starting point: {contents: read}
runs-on: ubuntu-latest
permissions:
id-token: write # This is required for requesting the JWT
Expand Down
22 changes: 19 additions & 3 deletions AWSIoTPythonSDK/core/protocol/mqtt_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS
from AWSIoTPythonSDK.core.protocol.internal.events import FixedEventMids
from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS
from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUBACK_ERROR
from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectError
from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectTimeoutException
from AWSIoTPythonSDK.exception.AWSIoTExceptions import disconnectError
Expand Down Expand Up @@ -58,6 +59,12 @@
from queue import Queue


class SubackPacket(object):
def __init__(self):
self.event = Event()
self.data = None


class MqttCore(object):

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -298,12 +305,15 @@ def subscribe(self, topic, qos, message_callback=None):
if ClientStatus.STABLE != self._client_status.get_status():
self._handle_offline_request(RequestTypes.SUBSCRIBE, (topic, qos, message_callback, None))
else:
event = Event()
rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback(event), message_callback)
if not event.wait(self._operation_timeout_sec):
suback = SubackPacket()
rc, mid = self._subscribe_async(topic, qos, self._create_blocking_suback_callback(suback), message_callback)
if not suback.event.wait(self._operation_timeout_sec):
self._internal_async_client.remove_event_callback(mid)
self._logger.error("Subscribe timed out")
raise subscribeTimeoutException()
if suback.data and suback.data[0] == MQTT_ERR_SUBACK_ERROR:
self._logger.error(f"Subscribe error: {suback.data}")
raise subscribeError(suback.data)
ret = True
return ret

Expand Down Expand Up @@ -361,6 +371,12 @@ def ack_callback(mid, data=None):
event.set()
return ack_callback

def _create_blocking_suback_callback(self, ack: SubackPacket):
def ack_callback(mid, data=None):
ack.data = data
ack.event.set()
return ack_callback

def _handle_offline_request(self, type, data):
self._logger.info("Offline request detected!")
offline_request = QueueableRequest(type, data)
Expand Down
3 changes: 3 additions & 0 deletions AWSIoTPythonSDK/core/protocol/paho/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@
MSG_QUEUEING_DROP_OLDEST = 0
MSG_QUEUEING_DROP_NEWEST = 1

# Packet Error Codes
MQTT_ERR_SUBACK_ERROR = 0x80

if sys.version_info[0] < 3:
sockpair_data = "0"
else:
Expand Down