ove/net_mqtt.rs
1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Blocking MQTT 3.1.1 client with safe callback.
8//!
9//! [`Client`] wraps the oveRTOS MQTT handle with automatic cleanup and a
10//! safe Rust `fn(&str, &[u8])` message callback. The trampoline pattern
11//! (same as [`crate::timer::Timer`]) converts the C callback into a safe
12//! Rust function call.
13//!
14//! Works in both heap and zero-heap modes.
15//!
16//! ## Async alternative
17//!
18//! For async MQTT on top of [`crate::async_net`] use the
19//! [`rust-mqtt`](https://crates.io/crates/rust-mqtt) crate from crates.io.
20//! Supports MQTT 3.1.1 and 5.0, QoS 0/1/2, no_std mode. See
21//! [`crate::async_net`]'s module docs for the full pairing recipe.
22
23use crate::bindings;
24use crate::error::{Error, Result};
25
26// SAFETY (module-wide contract for the `unsafe { bindings::ove_*(...) }` FFI
27// calls below): any handle passed to the C API is non-null and refers to a
28// live RTOS object — wrapper constructors establish validity via
29// `Error::from_code`, and `Drop` (or an explicit `deinit`) is the only place
30// a handle is released. Pointer and slice arguments reference caller-owned
31// memory valid for the duration of the call; the C side copies whatever it
32// retains and does not alias them past return (verified against the
33// signatures in `include/ove/*.h`). Blocks that deviate — `transmute`, raw
34// pointer casts from user data, slice reconstruction via `from_raw_parts`,
35// or storing a callback across the FFI boundary — carry their own
36// `// SAFETY:` comment.
37
38// ---------------------------------------------------------------------------
39// QoS
40// ---------------------------------------------------------------------------
41
42/// MQTT Quality of Service level.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum Qos {
45 /// At most once delivery (fire-and-forget).
46 AtMostOnce = 0,
47 /// At least once delivery (acknowledged).
48 AtLeastOnce = 1,
49}
50
51// ---------------------------------------------------------------------------
52// Config
53// ---------------------------------------------------------------------------
54
55/// MQTT connection configuration.
56///
57/// String fields (`host`, `client_id`, `username`, `password`) must be
58/// null-terminated byte slices.
59pub struct Config<'a> {
60 /// Broker hostname or IP (null-terminated).
61 pub host: &'a [u8],
62 /// Broker port (typically 1883 or 8883 for TLS).
63 pub port: u16,
64 /// Client identifier (null-terminated).
65 pub client_id: &'a [u8],
66 /// Username for authentication (null-terminated, or `None`).
67 pub username: Option<&'a [u8]>,
68 /// Password for authentication (null-terminated, or `None`).
69 pub password: Option<&'a [u8]>,
70 /// Keep-alive interval in seconds.
71 pub keep_alive_s: u16,
72 /// Whether to use TLS for the connection.
73 pub use_tls: bool,
74}
75
76// ---------------------------------------------------------------------------
77// Callback trampoline
78// ---------------------------------------------------------------------------
79
80/// Message callback -- topic as `&str` (UTF-8 per MQTT spec), payload as `&[u8]`.
81pub type MessageFn = fn(&str, &[u8]);
82
83/// Internal trampoline that converts the C callback into a safe Rust call.
84///
85/// The user's `fn(&str, &[u8])` is stored as the `user_data` pointer
86/// (same pattern as [`crate::timer::Timer`]).
87unsafe extern "C" fn mqtt_trampoline(
88 topic: *const core::ffi::c_char,
89 topic_len: usize,
90 payload: *const core::ffi::c_void,
91 payload_len: usize,
92 user_data: *mut core::ffi::c_void,
93) {
94 // SAFETY: `user_data` was stored by `Client::connect` from a
95 // `MessageFn` pointer (a `fn(&str, &[u8])`). Supported targets have
96 // pointer-sized function pointers with a C-compatible ABI, so
97 // round-tripping through `*mut c_void` is sound.
98 let cb: MessageFn = unsafe { core::mem::transmute(user_data) };
99 // SAFETY: `topic`/`payload` are valid for `topic_len`/`payload_len` bytes
100 // as guaranteed by the MQTT client for the duration of this callback.
101 let t = unsafe { core::slice::from_raw_parts(topic as *const u8, topic_len) };
102 let p = unsafe { core::slice::from_raw_parts(payload as *const u8, payload_len) };
103 // SAFETY: MQTT topic strings are UTF-8 per specification.
104 let topic_str = unsafe { core::str::from_utf8_unchecked(t) };
105 cb(topic_str, p);
106}
107
108// ---------------------------------------------------------------------------
109// Client
110// ---------------------------------------------------------------------------
111
112/// Backing storage for an MQTT client in zero-heap mode.
113///
114/// ```ignore
115/// let mut storage = ClientStorage::new();
116/// let mut mqtt = Client::create(&mut storage)?;
117/// ```
118// FFI handle storage; the field is only addressed via raw pointers
119// passed to C, so clippy's `field is never read` doesn't apply.
120#[allow(dead_code)]
121pub struct ClientStorage(bindings::ove_mqtt_client_storage_t);
122
123impl Default for ClientStorage {
124 fn default() -> Self {
125 Self::new()
126 }
127}
128
129impl ClientStorage {
130 /// Zero-initialised backing storage for a [`Client`] in zero-heap
131 /// mode. Place in a `static` and pass to [`Client::from_static`].
132 pub fn new() -> Self {
133 Self(unsafe { core::mem::zeroed() })
134 }
135}
136
137/// MQTT 3.1.1 client.
138///
139/// Wraps `ove_mqtt_client_t` with automatic cleanup on drop.
140pub struct Client {
141 handle: bindings::ove_mqtt_client_t,
142}
143
144impl Client {
145 /// Create a new MQTT client via heap allocation (only in heap mode).
146 #[cfg(not(zero_heap))]
147 pub fn new() -> Result<Self> {
148 let mut handle: bindings::ove_mqtt_client_t = core::ptr::null_mut();
149 let rc = unsafe { bindings::ove_mqtt_client_create(&mut handle) };
150 Error::from_code(rc)?;
151 Ok(Self { handle })
152 }
153
154 /// Create from caller-provided static storage.
155 ///
156 /// # Safety
157 /// Caller must ensure `storage` outlives the `Client` and is not
158 /// shared with another primitive.
159 #[cfg(zero_heap)]
160 pub unsafe fn from_static(storage: *mut bindings::ove_mqtt_client_storage_t) -> Result<Self> {
161 let mut handle: bindings::ove_mqtt_client_t = core::ptr::null_mut();
162 let rc = unsafe { bindings::ove_mqtt_client_init(&mut handle, storage) };
163 Error::from_code(rc)?;
164 Ok(Self { handle })
165 }
166
167 /// Create a client that works in both heap and zero-heap modes.
168 pub fn create(storage: &mut ClientStorage) -> Result<Self> {
169 #[cfg(not(zero_heap))]
170 {
171 let _ = storage;
172 Self::new()
173 }
174 #[cfg(zero_heap)]
175 {
176 unsafe { Self::from_static(&mut storage.0) }
177 }
178 }
179
180 /// Connect to an MQTT broker.
181 ///
182 /// `on_message` is called for each incoming publish (topic + payload).
183 ///
184 /// # Errors
185 /// Returns an error if the connection or MQTT handshake fails.
186 pub fn connect(&mut self, cfg: &Config, on_message: MessageFn) -> Result<()> {
187 let user_data = on_message as *mut core::ffi::c_void;
188
189 let mut c: bindings::ove_mqtt_config_t = unsafe { core::mem::zeroed() };
190 c.host = cfg.host.as_ptr() as *const _;
191 c.port = cfg.port;
192 c.client_id = cfg.client_id.as_ptr() as *const _;
193 c.keep_alive_s = cfg.keep_alive_s;
194 c.use_tls = cfg.use_tls as i32;
195 c.on_message = Some(mqtt_trampoline);
196 c.user_data = user_data;
197
198 if let Some(u) = cfg.username {
199 c.username = u.as_ptr() as *const _;
200 }
201 if let Some(p) = cfg.password {
202 c.password = p.as_ptr() as *const _;
203 }
204
205 let rc = unsafe { bindings::ove_mqtt_connect(self.handle, &c) };
206 Error::from_code(rc)
207 }
208
209 /// Disconnect from the MQTT broker.
210 pub fn disconnect(&mut self) {
211 unsafe { bindings::ove_mqtt_disconnect(self.handle) }
212 }
213
214 /// Publish a message on a topic.
215 ///
216 /// `topic` must be a null-terminated byte string.
217 ///
218 /// # Errors
219 /// Returns an error if the publish fails.
220 pub fn publish(&self, topic: &[u8], payload: &[u8], qos: Qos) -> Result<()> {
221 let rc = unsafe {
222 bindings::ove_mqtt_publish(
223 self.handle,
224 topic.as_ptr() as *const _,
225 payload.as_ptr() as *const _,
226 payload.len(),
227 qos as bindings::ove_mqtt_qos_t,
228 )
229 };
230 Error::from_code(rc)
231 }
232
233 /// Subscribe to a topic filter.
234 ///
235 /// `topic` must be a null-terminated byte string.
236 ///
237 /// # Errors
238 /// Returns an error if the subscribe fails.
239 pub fn subscribe(&self, topic: &[u8], qos: Qos) -> Result<()> {
240 let rc = unsafe {
241 bindings::ove_mqtt_subscribe(
242 self.handle,
243 topic.as_ptr() as *const _,
244 qos as bindings::ove_mqtt_qos_t,
245 )
246 };
247 Error::from_code(rc)
248 }
249
250 /// Unsubscribe from a topic filter.
251 ///
252 /// `topic` must be a null-terminated byte string.
253 ///
254 /// # Errors
255 /// Returns an error if the unsubscribe fails.
256 pub fn unsubscribe(&self, topic: &[u8]) -> Result<()> {
257 let rc = unsafe { bindings::ove_mqtt_unsubscribe(self.handle, topic.as_ptr() as *const _) };
258 Error::from_code(rc)
259 }
260
261 /// Process incoming packets and send keep-alive pings.
262 ///
263 /// Must be called periodically (in a loop or from a timer callback).
264 ///
265 /// # Errors
266 /// Returns an error if the poll encounters a protocol or transport error.
267 pub fn poll(&self, timeout: core::time::Duration) -> Result<()> {
268 let rc = unsafe { bindings::ove_mqtt_loop(self.handle, crate::time::dur_to_ns(timeout)) };
269 Error::from_code(rc)
270 }
271}
272
273crate::ove_handle_impl!(
274 Client,
275 ove_mqtt_client_destroy,
276 ove_mqtt_client_deinit,
277 send_only
278);