ove/stream.rs
1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Byte-stream buffer for oveRTOS.
8//!
9//! [`Stream<N>`] is a lock-free ring buffer for passing arbitrary byte sequences
10//! between threads or between ISR and thread contexts. Unlike [`crate::Queue`], which
11//! transfers discrete items, a `Stream` treats data as a continuous byte flow with
12//! a configurable receive-trigger threshold.
13
14use crate::bindings;
15use crate::error::{Error, Result};
16
17/// Byte-oriented stream buffer with compile-time buffer size.
18///
19/// Wraps `ove_stream_t` for passing variable-length byte data between
20/// threads or between ISR and thread contexts. `N` is the buffer size in bytes.
21pub struct Stream<const N: usize> {
22 handle: bindings::ove_stream_t,
23}
24
25impl<const N: usize> Stream<N> {
26 /// Create a stream via heap allocation (only in heap mode).
27 #[cfg(not(zero_heap))]
28 pub fn new(trigger: usize) -> Result<Self> {
29 let mut handle: bindings::ove_stream_t = core::ptr::null_mut();
30 let rc = unsafe { bindings::ove_stream_create(&mut handle, N, trigger) };
31 Error::from_code(rc)?;
32 Ok(Self { handle })
33 }
34
35 /// Create from caller-provided static storage and buffer.
36 ///
37 /// # Safety
38 /// - `storage` must outlive the `Stream` and not be shared.
39 /// - `buffer` must point to at least `N` bytes and outlive the `Stream`.
40 #[cfg(zero_heap)]
41 pub unsafe fn from_static(
42 storage: *mut bindings::ove_stream_storage_t,
43 buffer: *mut core::ffi::c_void,
44 trigger: usize,
45 ) -> Result<Self> {
46 let mut handle: bindings::ove_stream_t = core::ptr::null_mut();
47 let rc = unsafe { bindings::ove_stream_init(&mut handle, storage, buffer, N, trigger) };
48 Error::from_code(rc)?;
49 Ok(Self { handle })
50 }
51
52 /// Send bytes into the stream, blocking up to `timeout_ms` if the buffer is full.
53 ///
54 /// Returns the number of bytes actually sent, which may be less than `data.len()`
55 /// if the stream fills before the timeout.
56 ///
57 /// # Errors
58 /// Returns [`Error::Timeout`] if no bytes could be sent within `timeout_ms`.
59 pub fn send(&self, data: &[u8], timeout_ms: u32) -> Result<usize> {
60 let mut bytes_sent: usize = 0;
61 let rc = unsafe {
62 bindings::ove_stream_send(
63 self.handle,
64 data.as_ptr() as *const _,
65 data.len(),
66 timeout_ms,
67 &mut bytes_sent,
68 )
69 };
70 Error::from_code(rc)?;
71 Ok(bytes_sent)
72 }
73
74 /// Receive bytes from the stream into `buf`, blocking up to `timeout_ms`.
75 ///
76 /// Returns the number of bytes actually received. Blocks until at least the
77 /// trigger byte count (set at creation time) is available, or `timeout_ms` expires.
78 ///
79 /// # Errors
80 /// Returns [`Error::Timeout`] if no bytes could be received within `timeout_ms`.
81 pub fn receive(&self, buf: &mut [u8], timeout_ms: u32) -> Result<usize> {
82 let mut bytes_received: usize = 0;
83 let rc = unsafe {
84 bindings::ove_stream_receive(
85 self.handle,
86 buf.as_mut_ptr() as *mut _,
87 buf.len(),
88 timeout_ms,
89 &mut bytes_received,
90 )
91 };
92 Error::from_code(rc)?;
93 Ok(bytes_received)
94 }
95
96 /// Send bytes from an ISR context (non-blocking, returns immediately).
97 ///
98 /// Returns the number of bytes sent; may be less than `data.len()` if the buffer fills.
99 ///
100 /// # Errors
101 /// Returns an error if the stream is full.
102 pub fn send_from_isr(&self, data: &[u8]) -> Result<usize> {
103 let mut bytes_sent: usize = 0;
104 let rc = unsafe {
105 bindings::ove_stream_send_from_isr(
106 self.handle,
107 data.as_ptr() as *const _,
108 data.len(),
109 &mut bytes_sent,
110 )
111 };
112 Error::from_code(rc)?;
113 Ok(bytes_sent)
114 }
115
116 /// Receive bytes from an ISR context (non-blocking, returns immediately).
117 ///
118 /// Returns the number of bytes received; may be zero if no data is available.
119 ///
120 /// # Errors
121 /// Returns an error if the stream is empty.
122 pub fn receive_from_isr(&self, buf: &mut [u8]) -> Result<usize> {
123 let mut bytes_received: usize = 0;
124 let rc = unsafe {
125 bindings::ove_stream_receive_from_isr(
126 self.handle,
127 buf.as_mut_ptr() as *mut _,
128 buf.len(),
129 &mut bytes_received,
130 )
131 };
132 Error::from_code(rc)?;
133 Ok(bytes_received)
134 }
135
136 /// Reset the stream, discarding all currently buffered data.
137 ///
138 /// # Errors
139 /// Returns an error if the underlying RTOS call fails.
140 pub fn reset(&self) -> Result<()> {
141 let rc = unsafe { bindings::ove_stream_reset(self.handle) };
142 Error::from_code(rc)
143 }
144
145 /// Return the number of bytes currently available for reading.
146 pub fn bytes_available(&self) -> usize {
147 unsafe { bindings::ove_stream_bytes_available(self.handle) }
148 }
149}
150
151impl<const N: usize> Drop for Stream<N> {
152 fn drop(&mut self) {
153 if self.handle.is_null() { return; }
154 #[cfg(not(zero_heap))]
155 unsafe { bindings::ove_stream_destroy(self.handle) }
156 #[cfg(zero_heap)]
157 unsafe { bindings::ove_stream_deinit(self.handle) }
158 }
159}
160
161// SAFETY: Stream wraps a ove_stream_t handle. Send/receive are
162// thread-safe RTOS calls. Create/destroy are single-threaded (lifecycle).
163unsafe impl<const N: usize> Send for Stream<N> {}
164unsafe impl<const N: usize> Sync for Stream<N> {}