Skip to main content

ove/
net_mqtt.rs

1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Blocking MQTT 3.1.1 client with safe callback.
8//!
9//! [`Client`] wraps the oveRTOS MQTT handle with automatic cleanup and a
10//! safe Rust `fn(&str, &[u8])` message callback.  The trampoline pattern
11//! (same as [`crate::timer::Timer`]) converts the C callback into a safe
12//! Rust function call.
13//!
14//! Works in both heap and zero-heap modes.
15//!
16//! ## Async alternative
17//!
18//! For async MQTT on top of [`crate::async_net`] use the
19//! [`rust-mqtt`](https://crates.io/crates/rust-mqtt) crate from crates.io.
20//! Supports MQTT 3.1.1 and 5.0, QoS 0/1/2, no_std mode. See
21//! [`crate::async_net`]'s module docs for the full pairing recipe.
22
23use crate::bindings;
24use crate::error::{Error, Result};
25
26// SAFETY (module-wide contract for the `unsafe { bindings::ove_*(...) }` FFI
27// calls below): any handle passed to the C API is non-null and refers to a
28// live RTOS object — wrapper constructors establish validity via
29// `Error::from_code`, and `Drop` (or an explicit `deinit`) is the only place
30// a handle is released. Pointer and slice arguments reference caller-owned
31// memory valid for the duration of the call; the C side copies whatever it
32// retains and does not alias them past return (verified against the
33// signatures in `include/ove/*.h`). Blocks that deviate — `transmute`, raw
34// pointer casts from user data, slice reconstruction via `from_raw_parts`,
35// or storing a callback across the FFI boundary — carry their own
36// `// SAFETY:` comment.
37
38// ---------------------------------------------------------------------------
39// QoS
40// ---------------------------------------------------------------------------
41
42/// MQTT Quality of Service level.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum Qos {
45    /// At most once delivery (fire-and-forget).
46    AtMostOnce = 0,
47    /// At least once delivery (acknowledged).
48    AtLeastOnce = 1,
49}
50
51// ---------------------------------------------------------------------------
52// Config
53// ---------------------------------------------------------------------------
54
55/// MQTT connection configuration.
56///
57/// String fields (`host`, `client_id`, `username`, `password`) must be
58/// null-terminated byte slices.
59pub struct Config<'a> {
60    /// Broker hostname or IP (null-terminated).
61    pub host: &'a [u8],
62    /// Broker port (typically 1883 or 8883 for TLS).
63    pub port: u16,
64    /// Client identifier (null-terminated).
65    pub client_id: &'a [u8],
66    /// Username for authentication (null-terminated, or `None`).
67    pub username: Option<&'a [u8]>,
68    /// Password for authentication (null-terminated, or `None`).
69    pub password: Option<&'a [u8]>,
70    /// Keep-alive interval in seconds.
71    pub keep_alive_s: u16,
72    /// Whether to use TLS for the connection.
73    pub use_tls: bool,
74}
75
76// ---------------------------------------------------------------------------
77// Callback trampoline
78// ---------------------------------------------------------------------------
79
80/// Message callback -- topic as `&str` (UTF-8 per MQTT spec), payload as `&[u8]`.
81pub type MessageFn = fn(&str, &[u8]);
82
83/// Internal trampoline that converts the C callback into a safe Rust call.
84///
85/// The user's `fn(&str, &[u8])` is stored as the `user_data` pointer
86/// (same pattern as [`crate::timer::Timer`]).
87unsafe extern "C" fn mqtt_trampoline(
88    topic: *const core::ffi::c_char,
89    topic_len: usize,
90    payload: *const core::ffi::c_void,
91    payload_len: usize,
92    user_data: *mut core::ffi::c_void,
93) {
94    // SAFETY: `user_data` was stored by `Client::connect` from a
95    // `MessageFn` pointer (a `fn(&str, &[u8])`). Supported targets have
96    // pointer-sized function pointers with a C-compatible ABI, so
97    // round-tripping through `*mut c_void` is sound.
98    let cb: MessageFn = unsafe { core::mem::transmute(user_data) };
99    // SAFETY: `topic`/`payload` are valid for `topic_len`/`payload_len` bytes
100    // as guaranteed by the MQTT client for the duration of this callback.
101    let t = unsafe { core::slice::from_raw_parts(topic as *const u8, topic_len) };
102    let p = unsafe { core::slice::from_raw_parts(payload as *const u8, payload_len) };
103    // SAFETY: MQTT topic strings are UTF-8 per specification.
104    let topic_str = unsafe { core::str::from_utf8_unchecked(t) };
105    cb(topic_str, p);
106}
107
108// ---------------------------------------------------------------------------
109// Client
110// ---------------------------------------------------------------------------
111
112/// Backing storage for an MQTT client in zero-heap mode.
113///
114/// ```ignore
115/// let mut storage = ClientStorage::new();
116/// let mut mqtt = Client::create(&mut storage)?;
117/// ```
118// FFI handle storage; the field is only addressed via raw pointers
119// passed to C, so clippy's `field is never read` doesn't apply.
120#[allow(dead_code)]
121pub struct ClientStorage(bindings::ove_mqtt_client_storage_t);
122
123impl Default for ClientStorage {
124    fn default() -> Self {
125        Self::new()
126    }
127}
128
129impl ClientStorage {
130    /// Zero-initialised backing storage for a [`Client`] in zero-heap
131    /// mode.  Place in a `static` and pass to [`Client::from_static`].
132    pub fn new() -> Self {
133        Self(unsafe { core::mem::zeroed() })
134    }
135}
136
137/// MQTT 3.1.1 client.
138///
139/// Wraps `ove_mqtt_client_t` with automatic cleanup on drop.
140pub struct Client {
141    handle: bindings::ove_mqtt_client_t,
142}
143
144impl Client {
145    /// Create a new MQTT client via heap allocation (only in heap mode).
146    #[cfg(not(zero_heap))]
147    pub fn new() -> Result<Self> {
148        let mut handle: bindings::ove_mqtt_client_t = core::ptr::null_mut();
149        let rc = unsafe { bindings::ove_mqtt_client_create(&mut handle) };
150        Error::from_code(rc)?;
151        Ok(Self { handle })
152    }
153
154    /// Create from caller-provided static storage.
155    ///
156    /// # Safety
157    /// Caller must ensure `storage` outlives the `Client` and is not
158    /// shared with another primitive.
159    #[cfg(zero_heap)]
160    pub unsafe fn from_static(storage: *mut bindings::ove_mqtt_client_storage_t) -> Result<Self> {
161        let mut handle: bindings::ove_mqtt_client_t = core::ptr::null_mut();
162        let rc = unsafe { bindings::ove_mqtt_client_init(&mut handle, storage) };
163        Error::from_code(rc)?;
164        Ok(Self { handle })
165    }
166
167    /// Create a client that works in both heap and zero-heap modes.
168    pub fn create(storage: &mut ClientStorage) -> Result<Self> {
169        #[cfg(not(zero_heap))]
170        {
171            let _ = storage;
172            Self::new()
173        }
174        #[cfg(zero_heap)]
175        {
176            unsafe { Self::from_static(&mut storage.0) }
177        }
178    }
179
180    /// Connect to an MQTT broker.
181    ///
182    /// `on_message` is called for each incoming publish (topic + payload).
183    ///
184    /// # Errors
185    /// Returns an error if the connection or MQTT handshake fails.
186    pub fn connect(&mut self, cfg: &Config, on_message: MessageFn) -> Result<()> {
187        let user_data = on_message as *mut core::ffi::c_void;
188
189        let mut c: bindings::ove_mqtt_config_t = unsafe { core::mem::zeroed() };
190        c.host = cfg.host.as_ptr() as *const _;
191        c.port = cfg.port;
192        c.client_id = cfg.client_id.as_ptr() as *const _;
193        c.keep_alive_s = cfg.keep_alive_s;
194        c.use_tls = cfg.use_tls as i32;
195        c.on_message = Some(mqtt_trampoline);
196        c.user_data = user_data;
197
198        if let Some(u) = cfg.username {
199            c.username = u.as_ptr() as *const _;
200        }
201        if let Some(p) = cfg.password {
202            c.password = p.as_ptr() as *const _;
203        }
204
205        let rc = unsafe { bindings::ove_mqtt_connect(self.handle, &c) };
206        Error::from_code(rc)
207    }
208
209    /// Disconnect from the MQTT broker.
210    pub fn disconnect(&mut self) {
211        unsafe { bindings::ove_mqtt_disconnect(self.handle) }
212    }
213
214    /// Publish a message on a topic.
215    ///
216    /// `topic` must be a null-terminated byte string.
217    ///
218    /// # Errors
219    /// Returns an error if the publish fails.
220    pub fn publish(&self, topic: &[u8], payload: &[u8], qos: Qos) -> Result<()> {
221        let rc = unsafe {
222            bindings::ove_mqtt_publish(
223                self.handle,
224                topic.as_ptr() as *const _,
225                payload.as_ptr() as *const _,
226                payload.len(),
227                qos as bindings::ove_mqtt_qos_t,
228            )
229        };
230        Error::from_code(rc)
231    }
232
233    /// Subscribe to a topic filter.
234    ///
235    /// `topic` must be a null-terminated byte string.
236    ///
237    /// # Errors
238    /// Returns an error if the subscribe fails.
239    pub fn subscribe(&self, topic: &[u8], qos: Qos) -> Result<()> {
240        let rc = unsafe {
241            bindings::ove_mqtt_subscribe(
242                self.handle,
243                topic.as_ptr() as *const _,
244                qos as bindings::ove_mqtt_qos_t,
245            )
246        };
247        Error::from_code(rc)
248    }
249
250    /// Unsubscribe from a topic filter.
251    ///
252    /// `topic` must be a null-terminated byte string.
253    ///
254    /// # Errors
255    /// Returns an error if the unsubscribe fails.
256    pub fn unsubscribe(&self, topic: &[u8]) -> Result<()> {
257        let rc = unsafe { bindings::ove_mqtt_unsubscribe(self.handle, topic.as_ptr() as *const _) };
258        Error::from_code(rc)
259    }
260
261    /// Process incoming packets and send keep-alive pings.
262    ///
263    /// Must be called periodically (in a loop or from a timer callback).
264    ///
265    /// # Errors
266    /// Returns an error if the poll encounters a protocol or transport error.
267    pub fn poll(&self, timeout: core::time::Duration) -> Result<()> {
268        let rc = unsafe { bindings::ove_mqtt_loop(self.handle, crate::time::dur_to_ns(timeout)) };
269        Error::from_code(rc)
270    }
271}
272
273crate::ove_handle_impl!(
274    Client,
275    ove_mqtt_client_destroy,
276    ove_mqtt_client_deinit,
277    send_only
278);