ove/net_mqtt.rs
1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! MQTT 3.1.1 client with safe callback.
8//!
9//! [`Client`] wraps the oveRTOS MQTT handle with automatic cleanup and a
10//! safe Rust `fn(&str, &[u8])` message callback. The trampoline pattern
11//! (same as [`crate::timer::Timer`]) converts the C callback into a safe
12//! Rust function call.
13//!
14//! Works in both heap and zero-heap modes.
15
16use core::fmt;
17
18use crate::bindings;
19use crate::error::{Error, Result};
20
21// ---------------------------------------------------------------------------
22// QoS
23// ---------------------------------------------------------------------------
24
25/// MQTT Quality of Service level.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum Qos {
28 /// At most once delivery (fire-and-forget).
29 AtMostOnce = 0,
30 /// At least once delivery (acknowledged).
31 AtLeastOnce = 1,
32}
33
34// ---------------------------------------------------------------------------
35// Config
36// ---------------------------------------------------------------------------
37
38/// MQTT connection configuration.
39///
40/// String fields (`host`, `client_id`, `username`, `password`) must be
41/// null-terminated byte slices.
42pub struct Config<'a> {
43 /// Broker hostname or IP (null-terminated).
44 pub host: &'a [u8],
45 /// Broker port (typically 1883 or 8883 for TLS).
46 pub port: u16,
47 /// Client identifier (null-terminated).
48 pub client_id: &'a [u8],
49 /// Username for authentication (null-terminated, or `None`).
50 pub username: Option<&'a [u8]>,
51 /// Password for authentication (null-terminated, or `None`).
52 pub password: Option<&'a [u8]>,
53 /// Keep-alive interval in seconds.
54 pub keep_alive_s: u16,
55 /// Whether to use TLS for the connection.
56 pub use_tls: bool,
57}
58
59// ---------------------------------------------------------------------------
60// Callback trampoline
61// ---------------------------------------------------------------------------
62
63/// Message callback -- topic as `&str` (UTF-8 per MQTT spec), payload as `&[u8]`.
64pub type MessageFn = fn(&str, &[u8]);
65
66/// Internal trampoline that converts the C callback into a safe Rust call.
67///
68/// The user's `fn(&str, &[u8])` is stored as the `user_data` pointer
69/// (same pattern as [`crate::timer::Timer`]).
70unsafe extern "C" fn mqtt_trampoline(
71 topic: *const core::ffi::c_char,
72 topic_len: usize,
73 payload: *const core::ffi::c_void,
74 payload_len: usize,
75 user_data: *mut core::ffi::c_void,
76) {
77 let cb: MessageFn = unsafe { core::mem::transmute(user_data) };
78 let t = unsafe { core::slice::from_raw_parts(topic as *const u8, topic_len) };
79 let p = unsafe { core::slice::from_raw_parts(payload as *const u8, payload_len) };
80 // SAFETY: MQTT topic strings are UTF-8 per specification.
81 let topic_str = unsafe { core::str::from_utf8_unchecked(t) };
82 cb(topic_str, p);
83}
84
85// ---------------------------------------------------------------------------
86// Client
87// ---------------------------------------------------------------------------
88
89/// Backing storage for an MQTT client in zero-heap mode.
90///
91/// ```ignore
92/// let mut storage = ClientStorage::new();
93/// let mut mqtt = Client::create(&mut storage)?;
94/// ```
95pub struct ClientStorage(bindings::ove_mqtt_client_storage_t);
96
97impl ClientStorage {
98 pub fn new() -> Self {
99 Self(unsafe { core::mem::zeroed() })
100 }
101}
102
103/// MQTT 3.1.1 client.
104///
105/// Wraps `ove_mqtt_client_t` with automatic cleanup on drop.
106pub struct Client {
107 handle: bindings::ove_mqtt_client_t,
108}
109
110impl Client {
111 /// Create a new MQTT client via heap allocation (only in heap mode).
112 #[cfg(not(zero_heap))]
113 pub fn new() -> Result<Self> {
114 let mut handle: bindings::ove_mqtt_client_t = core::ptr::null_mut();
115 let rc = unsafe { bindings::ove_mqtt_client_create(&mut handle) };
116 Error::from_code(rc)?;
117 Ok(Self { handle })
118 }
119
120 /// Create from caller-provided static storage.
121 ///
122 /// # Safety
123 /// Caller must ensure `storage` outlives the `Client` and is not
124 /// shared with another primitive.
125 #[cfg(zero_heap)]
126 pub unsafe fn from_static(
127 storage: *mut bindings::ove_mqtt_client_storage_t,
128 ) -> Result<Self> {
129 let mut handle: bindings::ove_mqtt_client_t = core::ptr::null_mut();
130 let rc = unsafe { bindings::ove_mqtt_client_init(&mut handle, storage) };
131 Error::from_code(rc)?;
132 Ok(Self { handle })
133 }
134
135 /// Create a client that works in both heap and zero-heap modes.
136 pub fn create(storage: &mut ClientStorage) -> Result<Self> {
137 #[cfg(not(zero_heap))]
138 {
139 let _ = storage;
140 Self::new()
141 }
142 #[cfg(zero_heap)]
143 {
144 unsafe { Self::from_static(&mut storage.0) }
145 }
146 }
147
148 /// Connect to an MQTT broker.
149 ///
150 /// `on_message` is called for each incoming publish (topic + payload).
151 ///
152 /// # Errors
153 /// Returns an error if the connection or MQTT handshake fails.
154 pub fn connect(&mut self, cfg: &Config, on_message: MessageFn) -> Result<()> {
155 let user_data = on_message as *mut core::ffi::c_void;
156
157 let mut c: bindings::ove_mqtt_config_t = unsafe { core::mem::zeroed() };
158 c.host = cfg.host.as_ptr() as *const _;
159 c.port = cfg.port;
160 c.client_id = cfg.client_id.as_ptr() as *const _;
161 c.keep_alive_s = cfg.keep_alive_s;
162 c.use_tls = cfg.use_tls as i32;
163 c.on_message = Some(mqtt_trampoline);
164 c.user_data = user_data;
165
166 if let Some(u) = cfg.username {
167 c.username = u.as_ptr() as *const _;
168 }
169 if let Some(p) = cfg.password {
170 c.password = p.as_ptr() as *const _;
171 }
172
173 let rc = unsafe { bindings::ove_mqtt_connect(self.handle, &c) };
174 Error::from_code(rc)
175 }
176
177 /// Disconnect from the MQTT broker.
178 pub fn disconnect(&mut self) {
179 unsafe { bindings::ove_mqtt_disconnect(self.handle) }
180 }
181
182 /// Publish a message on a topic.
183 ///
184 /// `topic` must be a null-terminated byte string.
185 ///
186 /// # Errors
187 /// Returns an error if the publish fails.
188 pub fn publish(&self, topic: &[u8], payload: &[u8], qos: Qos) -> Result<()> {
189 let rc = unsafe {
190 bindings::ove_mqtt_publish(
191 self.handle,
192 topic.as_ptr() as *const _,
193 payload.as_ptr() as *const _,
194 payload.len(),
195 qos as bindings::ove_mqtt_qos_t,
196 )
197 };
198 Error::from_code(rc)
199 }
200
201 /// Subscribe to a topic filter.
202 ///
203 /// `topic` must be a null-terminated byte string.
204 ///
205 /// # Errors
206 /// Returns an error if the subscribe fails.
207 pub fn subscribe(&self, topic: &[u8], qos: Qos) -> Result<()> {
208 let rc = unsafe {
209 bindings::ove_mqtt_subscribe(
210 self.handle,
211 topic.as_ptr() as *const _,
212 qos as bindings::ove_mqtt_qos_t,
213 )
214 };
215 Error::from_code(rc)
216 }
217
218 /// Unsubscribe from a topic filter.
219 ///
220 /// `topic` must be a null-terminated byte string.
221 ///
222 /// # Errors
223 /// Returns an error if the unsubscribe fails.
224 pub fn unsubscribe(&self, topic: &[u8]) -> Result<()> {
225 let rc = unsafe {
226 bindings::ove_mqtt_unsubscribe(self.handle, topic.as_ptr() as *const _)
227 };
228 Error::from_code(rc)
229 }
230
231 /// Process incoming packets and send keep-alive pings.
232 ///
233 /// Must be called periodically (in a loop or from a timer callback).
234 ///
235 /// # Errors
236 /// Returns an error if the poll encounters a protocol or transport error.
237 pub fn poll(&self, timeout_ms: u32) -> Result<()> {
238 let rc = unsafe { bindings::ove_mqtt_loop(self.handle, timeout_ms) };
239 Error::from_code(rc)
240 }
241}
242
243impl fmt::Debug for Client {
244 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245 f.debug_struct("Client")
246 .field("handle", &format_args!("{:p}", self.handle))
247 .finish()
248 }
249}
250
251impl Drop for Client {
252 fn drop(&mut self) {
253 if self.handle.is_null() { return; }
254 #[cfg(not(zero_heap))]
255 unsafe { bindings::ove_mqtt_client_destroy(self.handle) }
256 #[cfg(zero_heap)]
257 unsafe { bindings::ove_mqtt_client_deinit(self.handle) }
258 }
259}
260
261// SAFETY: Wraps a ove handle. Publish/subscribe/poll are thread-safe RTOS
262// calls. Create/destroy are single-threaded (lifecycle guarantee).
263unsafe impl Send for Client {}