ove/stream.rs
1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Byte-stream buffer for oveRTOS.
8//!
9//! [`Stream<N>`] is a lock-free ring buffer for passing arbitrary byte sequences
10//! between threads or between ISR and thread contexts. Unlike [`crate::Queue`], which
11//! transfers discrete items, a `Stream` treats data as a continuous byte flow with
12//! a configurable receive-trigger threshold.
13
14use core::cell::UnsafeCell;
15use core::mem::MaybeUninit;
16
17use crate::bindings;
18use crate::error::{Error, Result};
19
20// SAFETY (module-wide contract for the `unsafe { bindings::ove_*(...) }` FFI
21// calls below): any handle passed to the C API is non-null and refers to a
22// live RTOS object — wrapper constructors establish validity via
23// `Error::from_code`, and `Drop` (or an explicit `deinit`) is the only place
24// a handle is released. Pointer and slice arguments reference caller-owned
25// memory valid for the duration of the call; the C side copies whatever it
26// retains and does not alias them past return (verified against the
27// signatures in `include/ove/*.h`). Blocks that deviate — `transmute`, raw
28// pointer casts from user data, slice reconstruction via `from_raw_parts`,
29// or storing a callback across the FFI boundary — carry their own
30// `// SAFETY:` comment.
31
32/// Caller-owned storage + byte buffer for a [`Stream`] in zero-heap mode.
33/// Declare in a `static` and pass `&STORAGE` to [`Stream::create`]; in heap
34/// mode it is ignored. See [`crate::MutexStorage`]; this one additionally
35/// embeds the `[u8; N]` ring buffer the stream needs.
36#[allow(dead_code)]
37pub struct StreamStorage<const N: usize> {
38 storage: UnsafeCell<MaybeUninit<bindings::ove_stream_storage_t>>,
39 buffer: UnsafeCell<MaybeUninit<[u8; N]>>,
40}
41
42impl<const N: usize> StreamStorage<N> {
43 /// Zero-initialised storage with an uninitialised byte buffer. `const`
44 /// so it can initialise a `static`.
45 #[inline]
46 pub const fn new() -> Self {
47 Self {
48 storage: UnsafeCell::new(MaybeUninit::zeroed()),
49 buffer: UnsafeCell::new(MaybeUninit::uninit()),
50 }
51 }
52}
53
54impl<const N: usize> Default for StreamStorage<N> {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60// SAFETY: see crate::MutexStorage.
61unsafe impl<const N: usize> Sync for StreamStorage<N> {}
62
63/// Byte-oriented stream buffer with compile-time buffer size.
64///
65/// Wraps `ove_stream_t` for passing variable-length byte data between
66/// threads or between ISR and thread contexts. `N` is the buffer size in bytes.
67pub struct Stream<const N: usize> {
68 handle: bindings::ove_stream_t,
69}
70
71impl<const N: usize> Stream<N> {
72 /// Create a stream via heap allocation (only in heap mode).
73 #[cfg(not(zero_heap))]
74 pub fn new(trigger: usize) -> Result<Self> {
75 let mut handle: bindings::ove_stream_t = core::ptr::null_mut();
76 let rc = unsafe { bindings::ove_stream_create(&mut handle, N, trigger) };
77 Error::from_code(rc)?;
78 Ok(Self { handle })
79 }
80
81 /// Create from caller-provided static storage and buffer.
82 ///
83 /// # Safety
84 /// - `storage` must outlive the `Stream` and not be shared.
85 /// - `buffer` must point to at least `N` bytes and outlive the `Stream`.
86 #[cfg(zero_heap)]
87 pub unsafe fn from_static(
88 storage: *mut bindings::ove_stream_storage_t,
89 buffer: *mut core::ffi::c_void,
90 trigger: usize,
91 ) -> Result<Self> {
92 let mut handle: bindings::ove_stream_t = core::ptr::null_mut();
93 let rc = unsafe { bindings::ove_stream_init(&mut handle, storage, buffer, N, trigger) };
94 Error::from_code(rc)?;
95 Ok(Self { handle })
96 }
97
98 /// Mode-agnostic constructor (see [`crate::Mutex::create`]). Heap mode
99 /// ignores `storage`; zero-heap mode backs the stream with its storage and
100 /// embedded byte buffer.
101 pub fn create(storage: &'static StreamStorage<N>, trigger: usize) -> Result<Self> {
102 #[cfg(not(zero_heap))]
103 {
104 let _ = storage;
105 Self::new(trigger)
106 }
107 #[cfg(zero_heap)]
108 {
109 let sptr = UnsafeCell::raw_get(&storage.storage).cast();
110 let bptr = UnsafeCell::raw_get(&storage.buffer).cast::<core::ffi::c_void>();
111 unsafe { Self::from_static(sptr, bptr, trigger) }
112 }
113 }
114
115 /// Send bytes, blocking indefinitely if the buffer is full.
116 /// Returns the number of bytes actually sent (may be < `data.len()`).
117 #[inline]
118 pub fn send(&self, data: &[u8]) -> Result<usize> {
119 self.send_with_timeout(data, u64::MAX)
120 }
121
122 /// Non-blocking send. Returns the number of bytes actually sent.
123 ///
124 /// # Errors
125 /// Returns [`Error::WouldBlock`] if the buffer is full and no
126 /// bytes could be written.
127 #[inline]
128 pub fn try_send(&self, data: &[u8]) -> Result<usize> {
129 self.send_with_timeout(data, 0)
130 }
131
132 /// Send up to `d`. Returns the number of bytes actually sent.
133 #[inline]
134 pub fn try_send_for(&self, data: &[u8], d: core::time::Duration) -> Result<usize> {
135 self.send_with_timeout(data, crate::time::dur_to_ns(d))
136 }
137
138 /// Send by the given deadline. Returns the number of bytes
139 /// actually sent.
140 #[inline]
141 pub fn try_send_until(&self, data: &[u8], deadline: crate::time::Instant) -> Result<usize> {
142 self.send_with_timeout(data, crate::time::deadline_to_timeout_ns(deadline))
143 }
144
145 #[inline]
146 fn send_with_timeout(&self, data: &[u8], timeout_ns: u64) -> Result<usize> {
147 let mut bytes_sent: usize = 0;
148 let rc = unsafe {
149 bindings::ove_stream_send(
150 self.handle,
151 data.as_ptr() as *const _,
152 data.len(),
153 timeout_ns,
154 &mut bytes_sent,
155 )
156 };
157 Error::from_code(rc)?;
158 Ok(bytes_sent)
159 }
160
161 /// Receive bytes, blocking indefinitely. Returns the number of
162 /// bytes actually read (may be < `buf.len()` — blocks until at
163 /// least the trigger byte count is available).
164 #[inline]
165 pub fn recv(&self, buf: &mut [u8]) -> Result<usize> {
166 self.recv_with_timeout(buf, u64::MAX)
167 }
168
169 /// Non-blocking receive. Returns the number of bytes read.
170 ///
171 /// # Errors
172 /// Returns [`Error::WouldBlock`] if no bytes are available.
173 #[inline]
174 pub fn try_recv(&self, buf: &mut [u8]) -> Result<usize> {
175 self.recv_with_timeout(buf, 0)
176 }
177
178 /// Receive up to `d`.
179 #[inline]
180 pub fn try_recv_for(&self, buf: &mut [u8], d: core::time::Duration) -> Result<usize> {
181 self.recv_with_timeout(buf, crate::time::dur_to_ns(d))
182 }
183
184 /// Receive by the given deadline.
185 #[inline]
186 pub fn try_recv_until(&self, buf: &mut [u8], deadline: crate::time::Instant) -> Result<usize> {
187 self.recv_with_timeout(buf, crate::time::deadline_to_timeout_ns(deadline))
188 }
189
190 #[inline]
191 fn recv_with_timeout(&self, buf: &mut [u8], timeout_ns: u64) -> Result<usize> {
192 let mut bytes_received: usize = 0;
193 let rc = unsafe {
194 bindings::ove_stream_receive(
195 self.handle,
196 buf.as_mut_ptr() as *mut _,
197 buf.len(),
198 timeout_ns,
199 &mut bytes_received,
200 )
201 };
202 Error::from_code(rc)?;
203 Ok(bytes_received)
204 }
205
206 /// Send bytes from an ISR context (non-blocking, returns immediately).
207 ///
208 /// Returns the number of bytes sent; may be less than `data.len()` if the buffer fills.
209 ///
210 /// # Errors
211 /// Returns an error if the stream is full.
212 #[inline]
213 pub fn send_from_isr(&self, data: &[u8]) -> Result<usize> {
214 let mut bytes_sent: usize = 0;
215 let rc = unsafe {
216 bindings::ove_stream_send_from_isr(
217 self.handle,
218 data.as_ptr() as *const _,
219 data.len(),
220 &mut bytes_sent,
221 )
222 };
223 Error::from_code(rc)?;
224 Ok(bytes_sent)
225 }
226
227 /// Receive bytes from an ISR context (non-blocking, returns immediately).
228 ///
229 /// Returns the number of bytes received; may be zero if no data is available.
230 ///
231 /// # Errors
232 /// Returns an error if the stream is empty.
233 #[inline]
234 pub fn receive_from_isr(&self, buf: &mut [u8]) -> Result<usize> {
235 let mut bytes_received: usize = 0;
236 let rc = unsafe {
237 bindings::ove_stream_receive_from_isr(
238 self.handle,
239 buf.as_mut_ptr() as *mut _,
240 buf.len(),
241 &mut bytes_received,
242 )
243 };
244 Error::from_code(rc)?;
245 Ok(bytes_received)
246 }
247
248 /// Reset the stream, discarding all currently buffered data.
249 ///
250 /// # Errors
251 /// Returns an error if the underlying RTOS call fails.
252 #[inline]
253 pub fn reset(&self) -> Result<()> {
254 let rc = unsafe { bindings::ove_stream_reset(self.handle) };
255 Error::from_code(rc)
256 }
257
258 /// Return the number of bytes currently available for reading.
259 #[inline]
260 pub fn bytes_available(&self) -> usize {
261 unsafe { bindings::ove_stream_bytes_available(self.handle) }
262 }
263
264 /// Register a notify callback fired after every successful send.
265 /// Wraps the C-level `ove_stream_set_notify`.
266 ///
267 /// # Safety
268 /// - `user_data` must remain valid for as long as the callback may
269 /// fire — i.e. until either the stream is destroyed or
270 /// `set_notify(None, ...)` clears the registration.
271 /// - `cb` may be invoked from any context the producer uses,
272 /// including ISR. Its body must therefore be non-blocking and
273 /// ISR-safe.
274 ///
275 /// Higher-level users should reach for the async wrappers in
276 /// `ove::async_runtime` instead of using this directly — they hide
277 /// the lifetime and ISR-safety constraints behind a safe API.
278 #[cfg(has_async)]
279 #[inline]
280 pub unsafe fn set_notify(
281 &self,
282 cb: Option<unsafe extern "C" fn(*mut core::ffi::c_void)>,
283 user_data: *mut core::ffi::c_void,
284 ) -> Result<()> {
285 let rc = unsafe { bindings::ove_stream_set_notify(self.handle, cb, user_data) };
286 Error::from_code(rc)
287 }
288}
289
290crate::ove_handle_impl!(Stream<const N: usize>, ove_stream_destroy, ove_stream_deinit);