ove/queue.rs
1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Type-safe FIFO queue for oveRTOS.
8//!
9//! [`Queue<T, N>`] is a fixed-capacity, thread-safe FIFO that transfers items of
10//! type `T` (which must be `Copy`) between threads or between ISR and thread
11//! contexts. `N` is the maximum number of items the queue can hold.
12
13use core::cell::UnsafeCell;
14use core::fmt;
15use core::marker::PhantomData;
16use core::mem;
17use core::mem::MaybeUninit;
18
19use crate::bindings;
20use crate::error::{Error, Result};
21
22// SAFETY (module-wide contract for the `unsafe { bindings::ove_*(...) }` FFI
23// calls below): any handle passed to the C API is non-null and refers to a
24// live RTOS object — wrapper constructors establish validity via
25// `Error::from_code`, and `Drop` (or an explicit `deinit`) is the only place
26// a handle is released. Pointer and slice arguments reference caller-owned
27// memory valid for the duration of the call; the C side copies whatever it
28// retains and does not alias them past return (verified against the
29// signatures in `include/ove/*.h`). Blocks that deviate — `transmute`, raw
30// pointer casts from user data, slice reconstruction via `from_raw_parts`,
31// or storing a callback across the FFI boundary — carry their own
32// `// SAFETY:` comment.
33
34/// Caller-owned storage + item buffer for a [`Queue`] in zero-heap mode.
35/// Declare in a `static` and pass `&STORAGE` to [`Queue::create`]; in heap
36/// mode it is ignored. See [`crate::MutexStorage`] for the pattern; this one
37/// additionally embeds the `[T; N]` item buffer the queue needs.
38#[allow(dead_code)]
39pub struct QueueStorage<T: Copy, const N: usize> {
40 storage: UnsafeCell<MaybeUninit<bindings::ove_queue_storage_t>>,
41 buffer: UnsafeCell<MaybeUninit<[T; N]>>,
42}
43
44impl<T: Copy, const N: usize> QueueStorage<T, N> {
45 /// Zero-initialised storage with an uninitialised item buffer. `const`
46 /// so it can initialise a `static`.
47 #[inline]
48 pub const fn new() -> Self {
49 Self {
50 storage: UnsafeCell::new(MaybeUninit::zeroed()),
51 buffer: UnsafeCell::new(MaybeUninit::uninit()),
52 }
53 }
54}
55
56impl<T: Copy, const N: usize> Default for QueueStorage<T, N> {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62// SAFETY: see crate::MutexStorage. The buffer is only ever addressed as a
63// raw pointer handed to C; `T: Copy` carries no interior mutability.
64unsafe impl<T: Copy, const N: usize> Sync for QueueStorage<T, N> {}
65
66/// Type-safe FIFO queue with compile-time capacity.
67///
68/// `T` must be a plain-old-data type (Copy) because items are transferred
69/// through the C API via raw byte copies. `N` is the queue capacity.
70pub struct Queue<T: Copy, const N: usize> {
71 handle: bindings::ove_queue_t,
72 _marker: PhantomData<T>,
73}
74
75impl<T: Copy, const N: usize> Queue<T, N> {
76 /// Create a queue via heap allocation (only in heap mode).
77 #[cfg(not(zero_heap))]
78 pub fn new() -> Result<Self> {
79 let mut handle: bindings::ove_queue_t = core::ptr::null_mut();
80 let rc = unsafe { bindings::ove_queue_create(&mut handle, mem::size_of::<T>(), N as u32) };
81 Error::from_code(rc)?;
82 Ok(Self {
83 handle,
84 _marker: PhantomData,
85 })
86 }
87
88 /// Create from caller-provided static storage and buffer.
89 ///
90 /// # Safety
91 /// - `storage` must outlive the `Queue` and not be shared.
92 /// - `buffer` must point to at least `N * size_of::<T>()` bytes
93 /// and outlive the `Queue`.
94 #[cfg(zero_heap)]
95 pub unsafe fn from_static(
96 storage: *mut bindings::ove_queue_storage_t,
97 buffer: *mut core::ffi::c_void,
98 ) -> Result<Self> {
99 let mut handle: bindings::ove_queue_t = core::ptr::null_mut();
100 let rc = unsafe {
101 bindings::ove_queue_init(&mut handle, storage, buffer, mem::size_of::<T>(), N as u32)
102 };
103 Error::from_code(rc)?;
104 Ok(Self {
105 handle,
106 _marker: PhantomData,
107 })
108 }
109
110 /// Mode-agnostic constructor (see [`crate::Mutex::create`]). Heap mode
111 /// ignores `storage`; zero-heap mode backs the queue with its storage and
112 /// embedded item buffer.
113 pub fn create(storage: &'static QueueStorage<T, N>) -> Result<Self> {
114 #[cfg(not(zero_heap))]
115 {
116 let _ = storage;
117 Self::new()
118 }
119 #[cfg(zero_heap)]
120 {
121 let sptr = UnsafeCell::raw_get(&storage.storage).cast();
122 let bptr = UnsafeCell::raw_get(&storage.buffer).cast::<core::ffi::c_void>();
123 unsafe { Self::from_static(sptr, bptr) }
124 }
125 }
126
127 /// Send an item, blocking indefinitely if the queue is full.
128 /// `std::sync::mpsc::SyncSender::send` analog.
129 ///
130 /// `item` is taken by `&T` rather than by value because the
131 /// substrate `memcpy`s the bytes — `T: Copy` is enforced at the
132 /// `Queue<T, N>` type level, so `&T` vs `T` is a wash semantically
133 /// and `&T` avoids one stack-side `memcpy` for large `T`.
134 #[inline]
135 pub fn send(&self, item: &T) -> Result<()> {
136 let rc = unsafe {
137 bindings::ove_queue_send(self.handle, item as *const T as *const _, u64::MAX)
138 };
139 Error::from_code(rc)
140 }
141
142 /// Non-blocking send. `std::sync::mpsc::SyncSender::try_send` analog.
143 ///
144 /// # Errors
145 /// Returns [`Error::QueueFull`] if the queue is full.
146 #[inline]
147 pub fn try_send(&self, item: &T) -> Result<()> {
148 let rc = unsafe { bindings::ove_queue_send(self.handle, item as *const T as *const _, 0) };
149 Error::from_code(rc)
150 }
151
152 /// Send, waiting up to `d`.
153 ///
154 /// # Errors
155 /// Returns [`Error::QueueFull`] / [`Error::Timeout`] if the queue
156 /// stays full through `d`.
157 #[inline]
158 pub fn try_send_for(&self, item: &T, d: core::time::Duration) -> Result<()> {
159 let rc = unsafe {
160 bindings::ove_queue_send(
161 self.handle,
162 item as *const T as *const _,
163 crate::time::dur_to_ns(d),
164 )
165 };
166 Error::from_code(rc)
167 }
168
169 /// Send by the given deadline. Use
170 /// [`Instant::FOREVER`](crate::time::Instant::FOREVER) for an
171 /// indefinite wait.
172 #[inline]
173 pub fn try_send_until(&self, item: &T, deadline: crate::time::Instant) -> Result<()> {
174 let timeout = crate::time::deadline_to_timeout_ns(deadline);
175 let rc =
176 unsafe { bindings::ove_queue_send(self.handle, item as *const T as *const _, timeout) };
177 Error::from_code(rc)
178 }
179
180 /// Receive an item, blocking indefinitely. `std::sync::mpsc::Receiver::recv`
181 /// analog.
182 #[inline]
183 pub fn recv(&self) -> Result<T> {
184 let mut item: mem::MaybeUninit<T> = mem::MaybeUninit::uninit();
185 let rc = unsafe {
186 bindings::ove_queue_receive(self.handle, item.as_mut_ptr() as *mut _, u64::MAX)
187 };
188 Error::from_code(rc)?;
189 Ok(unsafe { item.assume_init() })
190 }
191
192 /// Non-blocking receive. `std::sync::mpsc::Receiver::try_recv` analog.
193 ///
194 /// # Errors
195 /// Returns [`Error::QueueEmpty`] if the queue is empty.
196 #[inline]
197 pub fn try_recv(&self) -> Result<T> {
198 let mut item: mem::MaybeUninit<T> = mem::MaybeUninit::uninit();
199 let rc =
200 unsafe { bindings::ove_queue_receive(self.handle, item.as_mut_ptr() as *mut _, 0) };
201 Error::from_code(rc)?;
202 Ok(unsafe { item.assume_init() })
203 }
204
205 /// Receive, waiting up to `d`.
206 ///
207 /// # Errors
208 /// Returns [`Error::QueueEmpty`] / [`Error::Timeout`] if no item is
209 /// available within `d`.
210 #[inline]
211 pub fn try_recv_for(&self, d: core::time::Duration) -> Result<T> {
212 let mut item: mem::MaybeUninit<T> = mem::MaybeUninit::uninit();
213 let rc = unsafe {
214 bindings::ove_queue_receive(
215 self.handle,
216 item.as_mut_ptr() as *mut _,
217 crate::time::dur_to_ns(d),
218 )
219 };
220 Error::from_code(rc)?;
221 Ok(unsafe { item.assume_init() })
222 }
223
224 /// Receive by the given deadline.
225 #[inline]
226 pub fn try_recv_until(&self, deadline: crate::time::Instant) -> Result<T> {
227 let timeout = crate::time::deadline_to_timeout_ns(deadline);
228 let mut item: mem::MaybeUninit<T> = mem::MaybeUninit::uninit();
229 let rc = unsafe {
230 bindings::ove_queue_receive(self.handle, item.as_mut_ptr() as *mut _, timeout)
231 };
232 Error::from_code(rc)?;
233 Ok(unsafe { item.assume_init() })
234 }
235
236 /// Send an item from an ISR context (non-blocking, returns immediately if full).
237 ///
238 /// # Errors
239 /// Returns [`Error::QueueFull`] if the queue has no space.
240 #[inline]
241 pub fn send_from_isr(&self, item: &T) -> Result<()> {
242 let rc =
243 unsafe { bindings::ove_queue_send_from_isr(self.handle, item as *const T as *const _) };
244 Error::from_code(rc)
245 }
246
247 /// Receive an item from an ISR context (non-blocking, returns immediately if empty).
248 ///
249 /// # Errors
250 /// Returns [`Error::QueueEmpty`] if the queue is empty.
251 #[inline]
252 pub fn receive_from_isr(&self) -> Result<T> {
253 let mut item: mem::MaybeUninit<T> = mem::MaybeUninit::uninit();
254 let rc = unsafe {
255 bindings::ove_queue_receive_from_isr(self.handle, item.as_mut_ptr() as *mut _)
256 };
257 Error::from_code(rc)?;
258 Ok(unsafe { item.assume_init() })
259 }
260
261 /// Register a notify callback fired after every successful send.
262 /// Wraps the C-level `ove_queue_set_notify`. Reach for
263 /// [`AsyncQueue`](crate::async_runtime::AsyncQueue) instead of using
264 /// this directly — it hides the lifetime + ISR-safety constraints
265 /// behind a safe API.
266 ///
267 /// # Safety
268 /// Same contract as [`crate::Stream::set_notify`]: `user_data` must
269 /// outlive the registration, and `cb` must be ISR-safe.
270 #[cfg(has_async)]
271 #[inline]
272 pub unsafe fn set_notify(
273 &self,
274 cb: Option<unsafe extern "C" fn(*mut core::ffi::c_void)>,
275 user_data: *mut core::ffi::c_void,
276 ) -> Result<()> {
277 let rc = unsafe { bindings::ove_queue_set_notify(self.handle, cb, user_data) };
278 Error::from_code(rc)
279 }
280}
281
282impl<T: Copy, const N: usize> fmt::Debug for Queue<T, N> {
283 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284 f.debug_struct("Queue")
285 .field("handle", &format_args!("{:p}", self.handle))
286 .finish()
287 }
288}
289
290impl<T: Copy, const N: usize> Drop for Queue<T, N> {
291 fn drop(&mut self) {
292 if self.handle.is_null() {
293 return;
294 }
295 #[cfg(not(zero_heap))]
296 unsafe {
297 bindings::ove_queue_destroy(self.handle);
298 }
299 #[cfg(zero_heap)]
300 unsafe {
301 bindings::ove_queue_deinit(self.handle);
302 }
303 }
304}
305
306// SAFETY: `Queue<T, N>` wraps an `ove_queue_t` FIFO. The substrate
307// serialises producer/consumer access via internal locks; the `T: Copy +
308// Send` bound ensures items can be safely transferred across threads.
309unsafe impl<T: Copy + Send, const N: usize> Send for Queue<T, N> {}
310unsafe impl<T: Copy + Send, const N: usize> Sync for Queue<T, N> {}