Skip to main content

ove/
queue.rs

1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Type-safe FIFO queue for oveRTOS.
8//!
9//! [`Queue<T, N>`] is a fixed-capacity, thread-safe FIFO that transfers items of
10//! type `T` (which must be `Copy`) between threads or between ISR and thread
11//! contexts. `N` is the maximum number of items the queue can hold.
12
13use core::cell::UnsafeCell;
14use core::fmt;
15use core::marker::PhantomData;
16use core::mem;
17use core::mem::MaybeUninit;
18
19use crate::bindings;
20use crate::error::{Error, Result};
21
22// SAFETY (module-wide contract for the `unsafe { bindings::ove_*(...) }` FFI
23// calls below): any handle passed to the C API is non-null and refers to a
24// live RTOS object — wrapper constructors establish validity via
25// `Error::from_code`, and `Drop` (or an explicit `deinit`) is the only place
26// a handle is released. Pointer and slice arguments reference caller-owned
27// memory valid for the duration of the call; the C side copies whatever it
28// retains and does not alias them past return (verified against the
29// signatures in `include/ove/*.h`). Blocks that deviate — `transmute`, raw
30// pointer casts from user data, slice reconstruction via `from_raw_parts`,
31// or storing a callback across the FFI boundary — carry their own
32// `// SAFETY:` comment.
33
34/// Caller-owned storage + item buffer for a [`Queue`] in zero-heap mode.
35/// Declare in a `static` and pass `&STORAGE` to [`Queue::create`]; in heap
36/// mode it is ignored.  See [`crate::MutexStorage`] for the pattern; this one
37/// additionally embeds the `[T; N]` item buffer the queue needs.
38#[allow(dead_code)]
39pub struct QueueStorage<T: Copy, const N: usize> {
40    storage: UnsafeCell<MaybeUninit<bindings::ove_queue_storage_t>>,
41    buffer: UnsafeCell<MaybeUninit<[T; N]>>,
42}
43
44impl<T: Copy, const N: usize> QueueStorage<T, N> {
45    /// Zero-initialised storage with an uninitialised item buffer.  `const`
46    /// so it can initialise a `static`.
47    #[inline]
48    pub const fn new() -> Self {
49        Self {
50            storage: UnsafeCell::new(MaybeUninit::zeroed()),
51            buffer: UnsafeCell::new(MaybeUninit::uninit()),
52        }
53    }
54}
55
56impl<T: Copy, const N: usize> Default for QueueStorage<T, N> {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62// SAFETY: see crate::MutexStorage.  The buffer is only ever addressed as a
63// raw pointer handed to C; `T: Copy` carries no interior mutability.
64unsafe impl<T: Copy, const N: usize> Sync for QueueStorage<T, N> {}
65
66/// Type-safe FIFO queue with compile-time capacity.
67///
68/// `T` must be a plain-old-data type (Copy) because items are transferred
69/// through the C API via raw byte copies. `N` is the queue capacity.
70pub struct Queue<T: Copy, const N: usize> {
71    handle: bindings::ove_queue_t,
72    _marker: PhantomData<T>,
73}
74
75impl<T: Copy, const N: usize> Queue<T, N> {
76    /// Create a queue via heap allocation (only in heap mode).
77    #[cfg(not(zero_heap))]
78    pub fn new() -> Result<Self> {
79        let mut handle: bindings::ove_queue_t = core::ptr::null_mut();
80        let rc = unsafe { bindings::ove_queue_create(&mut handle, mem::size_of::<T>(), N as u32) };
81        Error::from_code(rc)?;
82        Ok(Self {
83            handle,
84            _marker: PhantomData,
85        })
86    }
87
88    /// Create from caller-provided static storage and buffer.
89    ///
90    /// # Safety
91    /// - `storage` must outlive the `Queue` and not be shared.
92    /// - `buffer` must point to at least `N * size_of::<T>()` bytes
93    ///   and outlive the `Queue`.
94    #[cfg(zero_heap)]
95    pub unsafe fn from_static(
96        storage: *mut bindings::ove_queue_storage_t,
97        buffer: *mut core::ffi::c_void,
98    ) -> Result<Self> {
99        let mut handle: bindings::ove_queue_t = core::ptr::null_mut();
100        let rc = unsafe {
101            bindings::ove_queue_init(&mut handle, storage, buffer, mem::size_of::<T>(), N as u32)
102        };
103        Error::from_code(rc)?;
104        Ok(Self {
105            handle,
106            _marker: PhantomData,
107        })
108    }
109
110    /// Mode-agnostic constructor (see [`crate::Mutex::create`]).  Heap mode
111    /// ignores `storage`; zero-heap mode backs the queue with its storage and
112    /// embedded item buffer.
113    pub fn create(storage: &'static QueueStorage<T, N>) -> Result<Self> {
114        #[cfg(not(zero_heap))]
115        {
116            let _ = storage;
117            Self::new()
118        }
119        #[cfg(zero_heap)]
120        {
121            let sptr = UnsafeCell::raw_get(&storage.storage).cast();
122            let bptr = UnsafeCell::raw_get(&storage.buffer).cast::<core::ffi::c_void>();
123            unsafe { Self::from_static(sptr, bptr) }
124        }
125    }
126
127    /// Send an item, blocking indefinitely if the queue is full.
128    /// `std::sync::mpsc::SyncSender::send` analog.
129    ///
130    /// `item` is taken by `&T` rather than by value because the
131    /// substrate `memcpy`s the bytes — `T: Copy` is enforced at the
132    /// `Queue<T, N>` type level, so `&T` vs `T` is a wash semantically
133    /// and `&T` avoids one stack-side `memcpy` for large `T`.
134    #[inline]
135    pub fn send(&self, item: &T) -> Result<()> {
136        let rc = unsafe {
137            bindings::ove_queue_send(self.handle, item as *const T as *const _, u64::MAX)
138        };
139        Error::from_code(rc)
140    }
141
142    /// Non-blocking send.  `std::sync::mpsc::SyncSender::try_send` analog.
143    ///
144    /// # Errors
145    /// Returns [`Error::QueueFull`] if the queue is full.
146    #[inline]
147    pub fn try_send(&self, item: &T) -> Result<()> {
148        let rc = unsafe { bindings::ove_queue_send(self.handle, item as *const T as *const _, 0) };
149        Error::from_code(rc)
150    }
151
152    /// Send, waiting up to `d`.
153    ///
154    /// # Errors
155    /// Returns [`Error::QueueFull`] / [`Error::Timeout`] if the queue
156    /// stays full through `d`.
157    #[inline]
158    pub fn try_send_for(&self, item: &T, d: core::time::Duration) -> Result<()> {
159        let rc = unsafe {
160            bindings::ove_queue_send(
161                self.handle,
162                item as *const T as *const _,
163                crate::time::dur_to_ns(d),
164            )
165        };
166        Error::from_code(rc)
167    }
168
169    /// Send by the given deadline.  Use
170    /// [`Instant::FOREVER`](crate::time::Instant::FOREVER) for an
171    /// indefinite wait.
172    #[inline]
173    pub fn try_send_until(&self, item: &T, deadline: crate::time::Instant) -> Result<()> {
174        let timeout = crate::time::deadline_to_timeout_ns(deadline);
175        let rc =
176            unsafe { bindings::ove_queue_send(self.handle, item as *const T as *const _, timeout) };
177        Error::from_code(rc)
178    }
179
180    /// Receive an item, blocking indefinitely.  `std::sync::mpsc::Receiver::recv`
181    /// analog.
182    #[inline]
183    pub fn recv(&self) -> Result<T> {
184        let mut item: mem::MaybeUninit<T> = mem::MaybeUninit::uninit();
185        let rc = unsafe {
186            bindings::ove_queue_receive(self.handle, item.as_mut_ptr() as *mut _, u64::MAX)
187        };
188        Error::from_code(rc)?;
189        Ok(unsafe { item.assume_init() })
190    }
191
192    /// Non-blocking receive.  `std::sync::mpsc::Receiver::try_recv` analog.
193    ///
194    /// # Errors
195    /// Returns [`Error::QueueEmpty`] if the queue is empty.
196    #[inline]
197    pub fn try_recv(&self) -> Result<T> {
198        let mut item: mem::MaybeUninit<T> = mem::MaybeUninit::uninit();
199        let rc =
200            unsafe { bindings::ove_queue_receive(self.handle, item.as_mut_ptr() as *mut _, 0) };
201        Error::from_code(rc)?;
202        Ok(unsafe { item.assume_init() })
203    }
204
205    /// Receive, waiting up to `d`.
206    ///
207    /// # Errors
208    /// Returns [`Error::QueueEmpty`] / [`Error::Timeout`] if no item is
209    /// available within `d`.
210    #[inline]
211    pub fn try_recv_for(&self, d: core::time::Duration) -> Result<T> {
212        let mut item: mem::MaybeUninit<T> = mem::MaybeUninit::uninit();
213        let rc = unsafe {
214            bindings::ove_queue_receive(
215                self.handle,
216                item.as_mut_ptr() as *mut _,
217                crate::time::dur_to_ns(d),
218            )
219        };
220        Error::from_code(rc)?;
221        Ok(unsafe { item.assume_init() })
222    }
223
224    /// Receive by the given deadline.
225    #[inline]
226    pub fn try_recv_until(&self, deadline: crate::time::Instant) -> Result<T> {
227        let timeout = crate::time::deadline_to_timeout_ns(deadline);
228        let mut item: mem::MaybeUninit<T> = mem::MaybeUninit::uninit();
229        let rc = unsafe {
230            bindings::ove_queue_receive(self.handle, item.as_mut_ptr() as *mut _, timeout)
231        };
232        Error::from_code(rc)?;
233        Ok(unsafe { item.assume_init() })
234    }
235
236    /// Send an item from an ISR context (non-blocking, returns immediately if full).
237    ///
238    /// # Errors
239    /// Returns [`Error::QueueFull`] if the queue has no space.
240    #[inline]
241    pub fn send_from_isr(&self, item: &T) -> Result<()> {
242        let rc =
243            unsafe { bindings::ove_queue_send_from_isr(self.handle, item as *const T as *const _) };
244        Error::from_code(rc)
245    }
246
247    /// Receive an item from an ISR context (non-blocking, returns immediately if empty).
248    ///
249    /// # Errors
250    /// Returns [`Error::QueueEmpty`] if the queue is empty.
251    #[inline]
252    pub fn receive_from_isr(&self) -> Result<T> {
253        let mut item: mem::MaybeUninit<T> = mem::MaybeUninit::uninit();
254        let rc = unsafe {
255            bindings::ove_queue_receive_from_isr(self.handle, item.as_mut_ptr() as *mut _)
256        };
257        Error::from_code(rc)?;
258        Ok(unsafe { item.assume_init() })
259    }
260
261    /// Register a notify callback fired after every successful send.
262    /// Wraps the C-level `ove_queue_set_notify`. Reach for
263    /// [`AsyncQueue`](crate::async_runtime::AsyncQueue) instead of using
264    /// this directly — it hides the lifetime + ISR-safety constraints
265    /// behind a safe API.
266    ///
267    /// # Safety
268    /// Same contract as [`crate::Stream::set_notify`]: `user_data` must
269    /// outlive the registration, and `cb` must be ISR-safe.
270    #[cfg(has_async)]
271    #[inline]
272    pub unsafe fn set_notify(
273        &self,
274        cb: Option<unsafe extern "C" fn(*mut core::ffi::c_void)>,
275        user_data: *mut core::ffi::c_void,
276    ) -> Result<()> {
277        let rc = unsafe { bindings::ove_queue_set_notify(self.handle, cb, user_data) };
278        Error::from_code(rc)
279    }
280}
281
282impl<T: Copy, const N: usize> fmt::Debug for Queue<T, N> {
283    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284        f.debug_struct("Queue")
285            .field("handle", &format_args!("{:p}", self.handle))
286            .finish()
287    }
288}
289
290impl<T: Copy, const N: usize> Drop for Queue<T, N> {
291    fn drop(&mut self) {
292        if self.handle.is_null() {
293            return;
294        }
295        #[cfg(not(zero_heap))]
296        unsafe {
297            bindings::ove_queue_destroy(self.handle);
298        }
299        #[cfg(zero_heap)]
300        unsafe {
301            bindings::ove_queue_deinit(self.handle);
302        }
303    }
304}
305
306// SAFETY: `Queue<T, N>` wraps an `ove_queue_t` FIFO.  The substrate
307// serialises producer/consumer access via internal locks; the `T: Copy +
308// Send` bound ensures items can be safely transferred across threads.
309unsafe impl<T: Copy + Send, const N: usize> Send for Queue<T, N> {}
310unsafe impl<T: Copy + Send, const N: usize> Sync for Queue<T, N> {}