ove/
net_mqtt.rs

1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! MQTT 3.1.1 client with safe callback.
8//!
9//! [`Client`] wraps the oveRTOS MQTT handle with automatic cleanup and a
10//! safe Rust `fn(&str, &[u8])` message callback.  The trampoline pattern
11//! (same as [`crate::timer::Timer`]) converts the C callback into a safe
12//! Rust function call.
13//!
14//! Works in both heap and zero-heap modes.
15
16use core::fmt;
17
18use crate::bindings;
19use crate::error::{Error, Result};
20
21// ---------------------------------------------------------------------------
22// QoS
23// ---------------------------------------------------------------------------
24
25/// MQTT Quality of Service level.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum Qos {
28    /// At most once delivery (fire-and-forget).
29    AtMostOnce = 0,
30    /// At least once delivery (acknowledged).
31    AtLeastOnce = 1,
32}
33
34// ---------------------------------------------------------------------------
35// Config
36// ---------------------------------------------------------------------------
37
38/// MQTT connection configuration.
39///
40/// String fields (`host`, `client_id`, `username`, `password`) must be
41/// null-terminated byte slices.
42pub struct Config<'a> {
43    /// Broker hostname or IP (null-terminated).
44    pub host: &'a [u8],
45    /// Broker port (typically 1883 or 8883 for TLS).
46    pub port: u16,
47    /// Client identifier (null-terminated).
48    pub client_id: &'a [u8],
49    /// Username for authentication (null-terminated, or `None`).
50    pub username: Option<&'a [u8]>,
51    /// Password for authentication (null-terminated, or `None`).
52    pub password: Option<&'a [u8]>,
53    /// Keep-alive interval in seconds.
54    pub keep_alive_s: u16,
55    /// Whether to use TLS for the connection.
56    pub use_tls: bool,
57}
58
59// ---------------------------------------------------------------------------
60// Callback trampoline
61// ---------------------------------------------------------------------------
62
63/// Message callback -- topic as `&str` (UTF-8 per MQTT spec), payload as `&[u8]`.
64pub type MessageFn = fn(&str, &[u8]);
65
66/// Internal trampoline that converts the C callback into a safe Rust call.
67///
68/// The user's `fn(&str, &[u8])` is stored as the `user_data` pointer
69/// (same pattern as [`crate::timer::Timer`]).
70unsafe extern "C" fn mqtt_trampoline(
71    topic: *const core::ffi::c_char,
72    topic_len: usize,
73    payload: *const core::ffi::c_void,
74    payload_len: usize,
75    user_data: *mut core::ffi::c_void,
76) {
77    let cb: MessageFn = unsafe { core::mem::transmute(user_data) };
78    let t = unsafe { core::slice::from_raw_parts(topic as *const u8, topic_len) };
79    let p = unsafe { core::slice::from_raw_parts(payload as *const u8, payload_len) };
80    // SAFETY: MQTT topic strings are UTF-8 per specification.
81    let topic_str = unsafe { core::str::from_utf8_unchecked(t) };
82    cb(topic_str, p);
83}
84
85// ---------------------------------------------------------------------------
86// Client
87// ---------------------------------------------------------------------------
88
89/// Backing storage for an MQTT client in zero-heap mode.
90///
91/// ```ignore
92/// let mut storage = ClientStorage::new();
93/// let mut mqtt = Client::create(&mut storage)?;
94/// ```
95pub struct ClientStorage(bindings::ove_mqtt_client_storage_t);
96
97impl ClientStorage {
98    pub fn new() -> Self {
99        Self(unsafe { core::mem::zeroed() })
100    }
101}
102
103/// MQTT 3.1.1 client.
104///
105/// Wraps `ove_mqtt_client_t` with automatic cleanup on drop.
106pub struct Client {
107    handle: bindings::ove_mqtt_client_t,
108}
109
110impl Client {
111    /// Create a new MQTT client via heap allocation (only in heap mode).
112    #[cfg(not(zero_heap))]
113    pub fn new() -> Result<Self> {
114        let mut handle: bindings::ove_mqtt_client_t = core::ptr::null_mut();
115        let rc = unsafe { bindings::ove_mqtt_client_create(&mut handle) };
116        Error::from_code(rc)?;
117        Ok(Self { handle })
118    }
119
120    /// Create from caller-provided static storage.
121    ///
122    /// # Safety
123    /// Caller must ensure `storage` outlives the `Client` and is not
124    /// shared with another primitive.
125    #[cfg(zero_heap)]
126    pub unsafe fn from_static(
127        storage: *mut bindings::ove_mqtt_client_storage_t,
128    ) -> Result<Self> {
129        let mut handle: bindings::ove_mqtt_client_t = core::ptr::null_mut();
130        let rc = unsafe { bindings::ove_mqtt_client_init(&mut handle, storage) };
131        Error::from_code(rc)?;
132        Ok(Self { handle })
133    }
134
135    /// Create a client that works in both heap and zero-heap modes.
136    pub fn create(storage: &mut ClientStorage) -> Result<Self> {
137        #[cfg(not(zero_heap))]
138        {
139            let _ = storage;
140            Self::new()
141        }
142        #[cfg(zero_heap)]
143        {
144            unsafe { Self::from_static(&mut storage.0) }
145        }
146    }
147
148    /// Connect to an MQTT broker.
149    ///
150    /// `on_message` is called for each incoming publish (topic + payload).
151    ///
152    /// # Errors
153    /// Returns an error if the connection or MQTT handshake fails.
154    pub fn connect(&mut self, cfg: &Config, on_message: MessageFn) -> Result<()> {
155        let user_data = on_message as *mut core::ffi::c_void;
156
157        let mut c: bindings::ove_mqtt_config_t = unsafe { core::mem::zeroed() };
158        c.host = cfg.host.as_ptr() as *const _;
159        c.port = cfg.port;
160        c.client_id = cfg.client_id.as_ptr() as *const _;
161        c.keep_alive_s = cfg.keep_alive_s;
162        c.use_tls = cfg.use_tls as i32;
163        c.on_message = Some(mqtt_trampoline);
164        c.user_data = user_data;
165
166        if let Some(u) = cfg.username {
167            c.username = u.as_ptr() as *const _;
168        }
169        if let Some(p) = cfg.password {
170            c.password = p.as_ptr() as *const _;
171        }
172
173        let rc = unsafe { bindings::ove_mqtt_connect(self.handle, &c) };
174        Error::from_code(rc)
175    }
176
177    /// Disconnect from the MQTT broker.
178    pub fn disconnect(&mut self) {
179        unsafe { bindings::ove_mqtt_disconnect(self.handle) }
180    }
181
182    /// Publish a message on a topic.
183    ///
184    /// `topic` must be a null-terminated byte string.
185    ///
186    /// # Errors
187    /// Returns an error if the publish fails.
188    pub fn publish(&self, topic: &[u8], payload: &[u8], qos: Qos) -> Result<()> {
189        let rc = unsafe {
190            bindings::ove_mqtt_publish(
191                self.handle,
192                topic.as_ptr() as *const _,
193                payload.as_ptr() as *const _,
194                payload.len(),
195                qos as bindings::ove_mqtt_qos_t,
196            )
197        };
198        Error::from_code(rc)
199    }
200
201    /// Subscribe to a topic filter.
202    ///
203    /// `topic` must be a null-terminated byte string.
204    ///
205    /// # Errors
206    /// Returns an error if the subscribe fails.
207    pub fn subscribe(&self, topic: &[u8], qos: Qos) -> Result<()> {
208        let rc = unsafe {
209            bindings::ove_mqtt_subscribe(
210                self.handle,
211                topic.as_ptr() as *const _,
212                qos as bindings::ove_mqtt_qos_t,
213            )
214        };
215        Error::from_code(rc)
216    }
217
218    /// Unsubscribe from a topic filter.
219    ///
220    /// `topic` must be a null-terminated byte string.
221    ///
222    /// # Errors
223    /// Returns an error if the unsubscribe fails.
224    pub fn unsubscribe(&self, topic: &[u8]) -> Result<()> {
225        let rc = unsafe {
226            bindings::ove_mqtt_unsubscribe(self.handle, topic.as_ptr() as *const _)
227        };
228        Error::from_code(rc)
229    }
230
231    /// Process incoming packets and send keep-alive pings.
232    ///
233    /// Must be called periodically (in a loop or from a timer callback).
234    ///
235    /// # Errors
236    /// Returns an error if the poll encounters a protocol or transport error.
237    pub fn poll(&self, timeout_ms: u32) -> Result<()> {
238        let rc = unsafe { bindings::ove_mqtt_loop(self.handle, timeout_ms) };
239        Error::from_code(rc)
240    }
241}
242
243impl fmt::Debug for Client {
244    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245        f.debug_struct("Client")
246            .field("handle", &format_args!("{:p}", self.handle))
247            .finish()
248    }
249}
250
251impl Drop for Client {
252    fn drop(&mut self) {
253        if self.handle.is_null() { return; }
254        #[cfg(not(zero_heap))]
255        unsafe { bindings::ove_mqtt_client_destroy(self.handle) }
256        #[cfg(zero_heap)]
257        unsafe { bindings::ove_mqtt_client_deinit(self.handle) }
258    }
259}
260
261// SAFETY: Wraps a ove handle. Publish/subscribe/poll are thread-safe RTOS
262// calls. Create/destroy are single-threaded (lifecycle guarantee).
263unsafe impl Send for Client {}