Skip to main content

ove/
stream.rs

1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Byte-stream buffer for oveRTOS.
8//!
9//! [`Stream<N>`] is a lock-free ring buffer for passing arbitrary byte sequences
10//! between threads or between ISR and thread contexts. Unlike [`crate::Queue`], which
11//! transfers discrete items, a `Stream` treats data as a continuous byte flow with
12//! a configurable receive-trigger threshold.
13
14use core::cell::UnsafeCell;
15use core::mem::MaybeUninit;
16
17use crate::bindings;
18use crate::error::{Error, Result};
19
20// SAFETY (module-wide contract for the `unsafe { bindings::ove_*(...) }` FFI
21// calls below): any handle passed to the C API is non-null and refers to a
22// live RTOS object — wrapper constructors establish validity via
23// `Error::from_code`, and `Drop` (or an explicit `deinit`) is the only place
24// a handle is released. Pointer and slice arguments reference caller-owned
25// memory valid for the duration of the call; the C side copies whatever it
26// retains and does not alias them past return (verified against the
27// signatures in `include/ove/*.h`). Blocks that deviate — `transmute`, raw
28// pointer casts from user data, slice reconstruction via `from_raw_parts`,
29// or storing a callback across the FFI boundary — carry their own
30// `// SAFETY:` comment.
31
32/// Caller-owned storage + byte buffer for a [`Stream`] in zero-heap mode.
33/// Declare in a `static` and pass `&STORAGE` to [`Stream::create`]; in heap
34/// mode it is ignored.  See [`crate::MutexStorage`]; this one additionally
35/// embeds the `[u8; N]` ring buffer the stream needs.
36#[allow(dead_code)]
37pub struct StreamStorage<const N: usize> {
38    storage: UnsafeCell<MaybeUninit<bindings::ove_stream_storage_t>>,
39    buffer: UnsafeCell<MaybeUninit<[u8; N]>>,
40}
41
42impl<const N: usize> StreamStorage<N> {
43    /// Zero-initialised storage with an uninitialised byte buffer.  `const`
44    /// so it can initialise a `static`.
45    #[inline]
46    pub const fn new() -> Self {
47        Self {
48            storage: UnsafeCell::new(MaybeUninit::zeroed()),
49            buffer: UnsafeCell::new(MaybeUninit::uninit()),
50        }
51    }
52}
53
54impl<const N: usize> Default for StreamStorage<N> {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60// SAFETY: see crate::MutexStorage.
61unsafe impl<const N: usize> Sync for StreamStorage<N> {}
62
63/// Byte-oriented stream buffer with compile-time buffer size.
64///
65/// Wraps `ove_stream_t` for passing variable-length byte data between
66/// threads or between ISR and thread contexts. `N` is the buffer size in bytes.
67pub struct Stream<const N: usize> {
68    handle: bindings::ove_stream_t,
69}
70
71impl<const N: usize> Stream<N> {
72    /// Create a stream via heap allocation (only in heap mode).
73    #[cfg(not(zero_heap))]
74    pub fn new(trigger: usize) -> Result<Self> {
75        let mut handle: bindings::ove_stream_t = core::ptr::null_mut();
76        let rc = unsafe { bindings::ove_stream_create(&mut handle, N, trigger) };
77        Error::from_code(rc)?;
78        Ok(Self { handle })
79    }
80
81    /// Create from caller-provided static storage and buffer.
82    ///
83    /// # Safety
84    /// - `storage` must outlive the `Stream` and not be shared.
85    /// - `buffer` must point to at least `N` bytes and outlive the `Stream`.
86    #[cfg(zero_heap)]
87    pub unsafe fn from_static(
88        storage: *mut bindings::ove_stream_storage_t,
89        buffer: *mut core::ffi::c_void,
90        trigger: usize,
91    ) -> Result<Self> {
92        let mut handle: bindings::ove_stream_t = core::ptr::null_mut();
93        let rc = unsafe { bindings::ove_stream_init(&mut handle, storage, buffer, N, trigger) };
94        Error::from_code(rc)?;
95        Ok(Self { handle })
96    }
97
98    /// Mode-agnostic constructor (see [`crate::Mutex::create`]).  Heap mode
99    /// ignores `storage`; zero-heap mode backs the stream with its storage and
100    /// embedded byte buffer.
101    pub fn create(storage: &'static StreamStorage<N>, trigger: usize) -> Result<Self> {
102        #[cfg(not(zero_heap))]
103        {
104            let _ = storage;
105            Self::new(trigger)
106        }
107        #[cfg(zero_heap)]
108        {
109            let sptr = UnsafeCell::raw_get(&storage.storage).cast();
110            let bptr = UnsafeCell::raw_get(&storage.buffer).cast::<core::ffi::c_void>();
111            unsafe { Self::from_static(sptr, bptr, trigger) }
112        }
113    }
114
115    /// Send bytes, blocking indefinitely if the buffer is full.
116    /// Returns the number of bytes actually sent (may be < `data.len()`).
117    #[inline]
118    pub fn send(&self, data: &[u8]) -> Result<usize> {
119        self.send_with_timeout(data, u64::MAX)
120    }
121
122    /// Non-blocking send.  Returns the number of bytes actually sent.
123    ///
124    /// # Errors
125    /// Returns [`Error::WouldBlock`] if the buffer is full and no
126    /// bytes could be written.
127    #[inline]
128    pub fn try_send(&self, data: &[u8]) -> Result<usize> {
129        self.send_with_timeout(data, 0)
130    }
131
132    /// Send up to `d`.  Returns the number of bytes actually sent.
133    #[inline]
134    pub fn try_send_for(&self, data: &[u8], d: core::time::Duration) -> Result<usize> {
135        self.send_with_timeout(data, crate::time::dur_to_ns(d))
136    }
137
138    /// Send by the given deadline.  Returns the number of bytes
139    /// actually sent.
140    #[inline]
141    pub fn try_send_until(&self, data: &[u8], deadline: crate::time::Instant) -> Result<usize> {
142        self.send_with_timeout(data, crate::time::deadline_to_timeout_ns(deadline))
143    }
144
145    #[inline]
146    fn send_with_timeout(&self, data: &[u8], timeout_ns: u64) -> Result<usize> {
147        let mut bytes_sent: usize = 0;
148        let rc = unsafe {
149            bindings::ove_stream_send(
150                self.handle,
151                data.as_ptr() as *const _,
152                data.len(),
153                timeout_ns,
154                &mut bytes_sent,
155            )
156        };
157        Error::from_code(rc)?;
158        Ok(bytes_sent)
159    }
160
161    /// Receive bytes, blocking indefinitely.  Returns the number of
162    /// bytes actually read (may be < `buf.len()` — blocks until at
163    /// least the trigger byte count is available).
164    #[inline]
165    pub fn recv(&self, buf: &mut [u8]) -> Result<usize> {
166        self.recv_with_timeout(buf, u64::MAX)
167    }
168
169    /// Non-blocking receive.  Returns the number of bytes read.
170    ///
171    /// # Errors
172    /// Returns [`Error::WouldBlock`] if no bytes are available.
173    #[inline]
174    pub fn try_recv(&self, buf: &mut [u8]) -> Result<usize> {
175        self.recv_with_timeout(buf, 0)
176    }
177
178    /// Receive up to `d`.
179    #[inline]
180    pub fn try_recv_for(&self, buf: &mut [u8], d: core::time::Duration) -> Result<usize> {
181        self.recv_with_timeout(buf, crate::time::dur_to_ns(d))
182    }
183
184    /// Receive by the given deadline.
185    #[inline]
186    pub fn try_recv_until(&self, buf: &mut [u8], deadline: crate::time::Instant) -> Result<usize> {
187        self.recv_with_timeout(buf, crate::time::deadline_to_timeout_ns(deadline))
188    }
189
190    #[inline]
191    fn recv_with_timeout(&self, buf: &mut [u8], timeout_ns: u64) -> Result<usize> {
192        let mut bytes_received: usize = 0;
193        let rc = unsafe {
194            bindings::ove_stream_receive(
195                self.handle,
196                buf.as_mut_ptr() as *mut _,
197                buf.len(),
198                timeout_ns,
199                &mut bytes_received,
200            )
201        };
202        Error::from_code(rc)?;
203        Ok(bytes_received)
204    }
205
206    /// Send bytes from an ISR context (non-blocking, returns immediately).
207    ///
208    /// Returns the number of bytes sent; may be less than `data.len()` if the buffer fills.
209    ///
210    /// # Errors
211    /// Returns an error if the stream is full.
212    #[inline]
213    pub fn send_from_isr(&self, data: &[u8]) -> Result<usize> {
214        let mut bytes_sent: usize = 0;
215        let rc = unsafe {
216            bindings::ove_stream_send_from_isr(
217                self.handle,
218                data.as_ptr() as *const _,
219                data.len(),
220                &mut bytes_sent,
221            )
222        };
223        Error::from_code(rc)?;
224        Ok(bytes_sent)
225    }
226
227    /// Receive bytes from an ISR context (non-blocking, returns immediately).
228    ///
229    /// Returns the number of bytes received; may be zero if no data is available.
230    ///
231    /// # Errors
232    /// Returns an error if the stream is empty.
233    #[inline]
234    pub fn receive_from_isr(&self, buf: &mut [u8]) -> Result<usize> {
235        let mut bytes_received: usize = 0;
236        let rc = unsafe {
237            bindings::ove_stream_receive_from_isr(
238                self.handle,
239                buf.as_mut_ptr() as *mut _,
240                buf.len(),
241                &mut bytes_received,
242            )
243        };
244        Error::from_code(rc)?;
245        Ok(bytes_received)
246    }
247
248    /// Reset the stream, discarding all currently buffered data.
249    ///
250    /// # Errors
251    /// Returns an error if the underlying RTOS call fails.
252    #[inline]
253    pub fn reset(&self) -> Result<()> {
254        let rc = unsafe { bindings::ove_stream_reset(self.handle) };
255        Error::from_code(rc)
256    }
257
258    /// Return the number of bytes currently available for reading.
259    #[inline]
260    pub fn bytes_available(&self) -> usize {
261        unsafe { bindings::ove_stream_bytes_available(self.handle) }
262    }
263
264    /// Register a notify callback fired after every successful send.
265    /// Wraps the C-level `ove_stream_set_notify`.
266    ///
267    /// # Safety
268    /// - `user_data` must remain valid for as long as the callback may
269    ///   fire — i.e. until either the stream is destroyed or
270    ///   `set_notify(None, ...)` clears the registration.
271    /// - `cb` may be invoked from any context the producer uses,
272    ///   including ISR. Its body must therefore be non-blocking and
273    ///   ISR-safe.
274    ///
275    /// Higher-level users should reach for the async wrappers in
276    /// `ove::async_runtime` instead of using this directly — they hide
277    /// the lifetime and ISR-safety constraints behind a safe API.
278    #[cfg(has_async)]
279    #[inline]
280    pub unsafe fn set_notify(
281        &self,
282        cb: Option<unsafe extern "C" fn(*mut core::ffi::c_void)>,
283        user_data: *mut core::ffi::c_void,
284    ) -> Result<()> {
285        let rc = unsafe { bindings::ove_stream_set_notify(self.handle, cb, user_data) };
286        Error::from_code(rc)
287    }
288}
289
290crate::ove_handle_impl!(Stream<const N: usize>, ove_stream_destroy, ove_stream_deinit);