Skip to content

Commit 559d85f

Browse files
committed
release 0.4.0
1 parent 14d13ad commit 559d85f

File tree

5 files changed

+98
-104
lines changed

5 files changed

+98
-104
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.4.0] - 2024-09-26
9+
10+
- every MQTT client is a component now
11+
- can spawn a `SubscribeTopic` component to subscribe to a topic and observe the messages
12+
813
## [0.3.3] - 2024-09-24
914

1015
- Update MQTT example and library handling: Refactor error handling, subscribe method, and event types for clarity and

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "bevy_mqtt"
33
description = "A simple MQTT client for Bevy ECS"
4-
version = "0.3.3"
4+
version = "0.4.0"
55
edition = "2021"
66
readme = "README.md"
77
repository = "https://github.com/foxzool/bevy_mqtt"
@@ -21,7 +21,6 @@ bevy_app = { version = "0.14.2" }
2121
bevy_ecs = { version = "0.14.2" }
2222
bevy_log = { version = "0.14.2" }
2323
bevy_hierarchy = { version = "0.14.2" }
24-
bevy_derive = { version = "0.14.2" }
2524

2625
rumqttc = { version = "0.24.0" }
2726
flume = { version = "0.11.0" }

README.md

Lines changed: 76 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -15,71 +15,85 @@ first run as mqtt broker like [mosquitto](https://mosquitto.org/)
1515
then run the example
1616

1717
```rust
18-
use std::time::SystemTime;
19-
2018
use bevy::{prelude::*, time::common_conditions::on_timer};
2119
use bevy_log::LogPlugin;
2220
use bevy_mqtt::{
23-
rumqttc::{MqttOptions, QoS},
24-
MqttClient, MqttClientError, MqttClientState, MqttConnectError, MqttEvent, MqttPlugin,
25-
MqttPublishOutgoing, MqttSetting, SubscribeTopic, TopicMessage,
21+
rumqttc::QoS, MqttClient, MqttClientConnected, MqttClientError, MqttConnectError, MqttEvent,
22+
MqttPlugin, MqttSetting, SubscribeTopic, TopicMessage,
2623
};
27-
use bevy_state::prelude::OnEnter;
28-
use bincode::ErrorKind;
29-
use serde::{Deserialize, Serialize};
30-
31-
#[derive(Serialize, Deserialize, Debug)]
32-
struct Message {
33-
i: usize,
34-
time: SystemTime,
35-
}
36-
37-
impl From<&Message> for Vec<u8> {
38-
fn from(value: &Message) -> Self {
39-
bincode::serialize(value).unwrap()
40-
}
41-
}
42-
43-
impl From<Message> for Vec<u8> {
44-
fn from(value: Message) -> Self {
45-
bincode::serialize(&value).unwrap()
46-
}
47-
}
48-
49-
impl TryFrom<&[u8]> for Message {
50-
type Error = Box<ErrorKind>;
51-
52-
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
53-
bincode::deserialize(value)
54-
}
55-
}
24+
use rumqttc::{MqttOptions, Transport};
25+
use std::time::Duration;
5626

5727
fn main() {
5828
App::new()
59-
.insert_resource(MqttSetting {
60-
mqtt_options: MqttOptions::new("mqtt-serde", "127.0.0.1", 1883),
61-
cap: 10,
62-
})
6329
.add_plugins((MinimalPlugins, MqttPlugin, LogPlugin::default()))
64-
.add_systems(Update, (handle_message, handle_error))
65-
.add_systems(
66-
OnEnter(MqttClientState::Connected),
67-
(sub_topic_direct, sub_topic_by_component),
68-
)
30+
.add_systems(Startup, setup_clients)
31+
.add_systems(Update, (sub_topic, handle_message, handle_error))
6932
.add_systems(
7033
Update,
7134
publish_message.run_if(on_timer(std::time::Duration::from_secs(1))),
7235
)
7336
.run();
7437
}
7538

39+
fn setup_clients(mut commands: Commands) {
40+
commands.spawn(MqttSetting {
41+
mqtt_options: MqttOptions::new("bevy-mqtt-client", "127.0.0.1", 1883),
42+
cap: 10,
43+
});
44+
45+
let mut mqtt_options = MqttOptions::new("mqtt-ws-client", "ws://127.0.0.1:8080", 8080);
46+
mqtt_options.set_transport(Transport::Ws);
47+
// mqtt_options.set_credentials("username", "password");
48+
mqtt_options.set_keep_alive(Duration::from_secs(5));
49+
50+
commands.spawn((
51+
MqttSetting {
52+
mqtt_options,
53+
cap: 10,
54+
},
55+
WebsocketMqttClient,
56+
));
57+
}
58+
59+
#[derive(Component)]
60+
struct WebsocketMqttClient;
61+
62+
/// this is a system that subscribes to a topic and handle the incoming messages
63+
fn sub_topic(
64+
mqtt_client: Query<(Entity, &MqttClient, &MqttSetting), Added<MqttClientConnected>>,
65+
mut commands: Commands,
66+
) {
67+
for (entity, client, setting) in mqtt_client.iter() {
68+
client
69+
.subscribe("hello".to_string(), QoS::AtMostOnce)
70+
.unwrap();
71+
72+
let setting = setting.clone();
73+
let child_id = commands
74+
.spawn(SubscribeTopic::new("+/mqtt", QoS::AtMostOnce))
75+
.observe(move |topic_message: Trigger<TopicMessage>| {
76+
println!(
77+
"{:?}: Topic: '+/mqtt' received : {:?}",
78+
setting.mqtt_options.broker_address().clone(),
79+
topic_message.event().payload
80+
);
81+
})
82+
.id();
83+
commands.entity(entity).add_child(child_id);
84+
}
85+
}
86+
87+
/// this is global handler for all incoming messages
7688
fn handle_message(mut mqtt_event: EventReader<MqttEvent>) {
7789
for event in mqtt_event.read() {
78-
match &event.0 {
90+
match &event.event {
7991
rumqttc::Event::Incoming(income) => match income {
8092
rumqttc::Incoming::Publish(publish) => {
81-
let message: Message = bincode::deserialize(&publish.payload).unwrap();
82-
println!("Received Publish: {:?}", message);
93+
println!(
94+
"Topic Component: {} Received: {:?}",
95+
publish.topic, publish.payload
96+
);
8397
}
8498
_ => {
8599
println!("Incoming: {:?}", income);
@@ -103,56 +117,25 @@ fn handle_error(
103117
}
104118
}
105119

106-
/// there are two ways to subscribe to a topic
107-
/// 1. Directly subscribe to a topic
108-
/// 2. Subscribe to a topic by component
109-
fn sub_topic_direct(client: Res<MqttClient>) {
110-
client
111-
.try_subscribe("hello/mqtt", QoS::AtMostOnce)
112-
.expect("subscribe failed");
113-
}
114-
115-
fn sub_topic_by_component(mut commands: Commands) {
116-
commands
117-
.spawn(SubscribeTopic::new("+/mqtt", QoS::AtMostOnce))
118-
.observe(|topic_message: Trigger<TopicMessage>| {
119-
println!(
120-
"topic: {} received : {:?}",
121-
topic_message.event().topic,
122-
topic_message.event().payload
123-
);
124-
});
125-
}
126-
127-
fn publish_message(mut pub_events: EventWriter<MqttPublishOutgoing>) {
128-
let mut list = vec![];
129-
for i in 0..3 {
130-
let message = Message {
131-
i,
132-
time: SystemTime::now(),
133-
};
134-
135-
list.push(MqttPublishOutgoing {
136-
topic: "hello/mqtt".to_string(),
137-
qos: QoS::AtLeastOnce,
138-
retain: false,
139-
payload: message.into(),
140-
});
141-
list.push(MqttPublishOutgoing {
142-
topic: "bevy/mqtt".to_string(),
143-
qos: QoS::AtLeastOnce,
144-
retain: false,
145-
payload: Message {
146-
i: 999,
147-
time: SystemTime::now(),
148-
}
149-
.into(),
150-
});
120+
fn publish_message(mqtt_client: Query<&MqttClient, With<MqttClientConnected>>) {
121+
for client in mqtt_client.iter() {
122+
client
123+
.publish(
124+
"hello".to_string(),
125+
QoS::AtMostOnce,
126+
false,
127+
"mqtt".as_bytes(),
128+
)
129+
.unwrap();
130+
for i in 0..3 {
131+
client
132+
.publish(format!("{}/mqtt", i), QoS::AtMostOnce, false, b"hello")
133+
.unwrap();
134+
}
151135
}
152-
153-
pub_events.send_batch(list);
154136
}
155137

138+
156139
```
157140

158141
## Supported Versions

examples/pub_and_sub.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,21 @@ struct WebsocketMqttClient;
4444

4545
/// this is a system that subscribes to a topic and handle the incoming messages
4646
fn sub_topic(
47-
mqtt_client: Query<(Entity, &MqttClient), Added<MqttClientConnected>>,
47+
mqtt_client: Query<(Entity, &MqttClient, &MqttSetting), Added<MqttClientConnected>>,
4848
mut commands: Commands,
4949
) {
50-
for (entity, client) in mqtt_client.iter() {
50+
for (entity, client, setting) in mqtt_client.iter() {
5151
client
5252
.subscribe("hello".to_string(), QoS::AtMostOnce)
5353
.unwrap();
5454

55+
let setting = setting.clone();
5556
let child_id = commands
5657
.spawn(SubscribeTopic::new("+/mqtt", QoS::AtMostOnce))
57-
.observe(|topic_message: Trigger<TopicMessage>| {
58+
.observe(move |topic_message: Trigger<TopicMessage>| {
5859
println!(
59-
"Topic: '+/mqtt' received : {:?}",
60+
"{:?}: Topic: '+/mqtt' received : {:?}",
61+
setting.mqtt_options.broker_address().clone(),
6062
topic_message.event().payload
6163
);
6264
})
@@ -68,7 +70,7 @@ fn sub_topic(
6870
/// this is global handler for all incoming messages
6971
fn handle_message(mut mqtt_event: EventReader<MqttEvent>) {
7072
for event in mqtt_event.read() {
71-
match &event.0 {
73+
match &event.event {
7274
rumqttc::Event::Incoming(income) => match income {
7375
rumqttc::Incoming::Publish(publish) => {
7476
println!(

src/lib.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! A Bevy plugin for MQTT
22
33
use bevy_app::{App, Plugin, Update};
4-
use bevy_derive::{Deref, DerefMut};
54
use bevy_ecs::prelude::*;
65
use bevy_hierarchy::Parent;
76
use bevy_log::{debug, trace};
@@ -73,8 +72,11 @@ impl DerefMut for MqttClient {
7372
pub struct MqttClientConnected;
7473

7574
/// A wrapper around rumqttc::Event
76-
#[derive(Debug, Clone, PartialEq, Eq, Deref, DerefMut, Event)]
77-
pub struct MqttEvent(pub rumqttc::Event);
75+
#[derive(Debug, Clone, PartialEq, Eq, Event)]
76+
pub struct MqttEvent {
77+
pub entity: Entity,
78+
pub event: rumqttc::Event,
79+
}
7880

7981
/// A wrapper around rumqttc::ConnectionError
8082
#[derive(Debug, Event)]
@@ -144,7 +146,10 @@ fn handle_mqtt_events(
144146
}
145147
rumqttc::Event::Incoming(_) | rumqttc::Event::Outgoing(_) => {}
146148
}
147-
mqtt_events.send(MqttEvent(event));
149+
mqtt_events.send(MqttEvent {
150+
entity,
151+
event: event.clone(),
152+
});
148153
}
149154

150155
while let Ok(error) = client.from_async_error.try_recv() {

0 commit comments

Comments
 (0)