@@ -52,6 +52,7 @@ def __init__(self, username, key, service_host='io.adafruit.com', secure=True):
5252 self .on_connect = None
5353 self .on_disconnect = None
5454 self .on_message = None
55+ self .on_subscribe = None
5556 # Initialize MQTT client.
5657 self ._client = mqtt .Client ()
5758 if secure :
@@ -66,6 +67,7 @@ def __init__(self, username, key, service_host='io.adafruit.com', secure=True):
6667 self ._client .on_message = self ._mqtt_message
6768 self ._connected = False
6869
70+
6971 def _mqtt_connect (self , client , userdata , flags , rc ):
7072 logger .debug ('Client on_connect called.' )
7173 # Check if the result code is success (0) or some error (non-zero) and
@@ -75,7 +77,7 @@ def _mqtt_connect(self, client, userdata, flags, rc):
7577 self ._connected = True
7678 print ('Connected to Adafruit IO!' )
7779 else :
78- # handle RC errors within `errors.py`'s MQTTError class
80+ # handle RC errors within MQTTError class
7981 raise MQTTError (rc )
8082 # Call the on_connect callback if available.
8183 if self .on_connect is not None :
@@ -88,6 +90,7 @@ def _mqtt_disconnect(self, client, userdata, rc):
8890 # log the RC as an error. Continue on to call any disconnect handler
8991 # so clients can potentially recover gracefully.
9092 if rc != 0 :
93+ print ("Unexpected disconnection." )
9194 raise MQTTError (rc )
9295 print ('Disconnected from Adafruit IO!' )
9396 # Call the on_disconnect callback if available.
@@ -99,13 +102,17 @@ def _mqtt_message(self, client, userdata, msg):
99102 # Parse out the feed id and call on_message callback.
100103 # Assumes topic looks like "username/feeds/id"
101104 parsed_topic = msg .topic .split ('/' )
102- if self .on_message is not None and self . _username == parsed_topic [ 0 ] :
105+ if self .on_message is not None :
103106 feed = parsed_topic [2 ]
104107 payload = '' if msg .payload is None else msg .payload .decode ('utf-8' )
105108 elif self .on_message is not None and parsed_topic [0 ] == 'time' :
106109 feed = parsed_topic [0 ]
107110 payload = msg .payload .decode ('utf-8' )
108111 self .on_message (self , feed , payload )
112+
113+ def _mqtt_subscribe (client , userdata , mid , granted_qos ):
114+ """Called when broker responds to a subscribe request."""
115+
109116
110117 def connect (self , ** kwargs ):
111118 """Connect to the Adafruit.IO service. Must be called before any loop
@@ -162,16 +169,24 @@ def loop(self, timeout_sec=1.0):
162169 """
163170 self ._client .loop (timeout = timeout_sec )
164171
165- def subscribe (self , feed_id ):
172+ def subscribe (self , feed_id , feed_user = None ):
166173 """Subscribe to changes on the specified feed. When the feed is updated
167174 the on_message function will be called with the feed_id and new value.
175+
176+ Params:
177+ - feed_id: The id of the feed to update.
178+ - feed_user (optional): The user id of the feed. Used for feed sharing.
168179 """
169- self ._client .subscribe ('{0}/feeds/{1}' .format (self ._username , feed_id ))
180+ if feed_user is not None :
181+ (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (feed_user , feed_id ))
182+ else :
183+ (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (self ._username , feed_id ))
184+ return res , mid
170185
171186 def subscribe_time (self , time ):
172187 """Subscribe to changes on the Adafruit IO time feeds. When the feed is
173188 updated, the on_message function will be called and publish a new value:
174- time =
189+ time feeds:
175190 millis: milliseconds
176191 seconds: seconds
177192 iso: ISO-8601 (https://en.wikipedia.org/wiki/ISO_8601)
@@ -181,15 +196,27 @@ def subscribe_time(self, time):
181196 elif time == 'iso' :
182197 self ._client .subscribe ('time/ISO-8601' )
183198 else :
184- print ( 'ERROR: Invalid time type specified ' )
199+ raise TypeError ( ' Invalid Time Feed Specified. ' )
185200 return
201+
202+ def unsubscribe (self , feed_id ):
203+ """Unsubscribes from a specified MQTT feed.
204+ Note: this does not prevent publishing to a feed, it will unsubscribe
205+ from receiving messages via on_message.
206+ """
207+ (res , mid ) = self ._client .unsubscribe ('{0}/feeds/{1}' .format (self ._username , feed_id ))
186208
187- def publish (self , feed_id , value ):
209+ def publish (self , feed_id , value = None , feed_user = None ):
188210 """Publish a value to a specified feed.
189211
190- Required parameters :
212+ Params :
191213 - feed_id: The id of the feed to update.
214+ - feed_user (optional): The user id of the feed. Used for feed sharing.
192215 - value: The new value to publish to the feed.
193216 """
194- self ._client .publish ('{0}/feeds/{1}' .format (self ._username , feed_id ),
195- payload = value )
217+ if feed_user is not None :
218+ (res , self ._pub_mid ) = self ._client .publish ('{0}/feeds/{1}' .format (feed_user , feed_id ),
219+ payload = value )
220+ else :
221+ (res , self ._pub_mid ) = self ._client .publish ('{0}/feeds/{1}' .format (self ._username , feed_id ),
222+ payload = value )
0 commit comments