ove/
stream.rs

1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Byte-stream buffer for oveRTOS.
8//!
9//! [`Stream<N>`] is a lock-free ring buffer for passing arbitrary byte sequences
10//! between threads or between ISR and thread contexts. Unlike [`crate::Queue`], which
11//! transfers discrete items, a `Stream` treats data as a continuous byte flow with
12//! a configurable receive-trigger threshold.
13
14use crate::bindings;
15use crate::error::{Error, Result};
16
17/// Byte-oriented stream buffer with compile-time buffer size.
18///
19/// Wraps `ove_stream_t` for passing variable-length byte data between
20/// threads or between ISR and thread contexts. `N` is the buffer size in bytes.
21pub struct Stream<const N: usize> {
22    handle: bindings::ove_stream_t,
23}
24
25impl<const N: usize> Stream<N> {
26    /// Create a stream via heap allocation (only in heap mode).
27    #[cfg(not(zero_heap))]
28    pub fn new(trigger: usize) -> Result<Self> {
29        let mut handle: bindings::ove_stream_t = core::ptr::null_mut();
30        let rc = unsafe { bindings::ove_stream_create(&mut handle, N, trigger) };
31        Error::from_code(rc)?;
32        Ok(Self { handle })
33    }
34
35    /// Create from caller-provided static storage and buffer.
36    ///
37    /// # Safety
38    /// - `storage` must outlive the `Stream` and not be shared.
39    /// - `buffer` must point to at least `N` bytes and outlive the `Stream`.
40    #[cfg(zero_heap)]
41    pub unsafe fn from_static(
42        storage: *mut bindings::ove_stream_storage_t,
43        buffer: *mut core::ffi::c_void,
44        trigger: usize,
45    ) -> Result<Self> {
46        let mut handle: bindings::ove_stream_t = core::ptr::null_mut();
47        let rc = unsafe { bindings::ove_stream_init(&mut handle, storage, buffer, N, trigger) };
48        Error::from_code(rc)?;
49        Ok(Self { handle })
50    }
51
52    /// Send bytes into the stream, blocking up to `timeout_ms` if the buffer is full.
53    ///
54    /// Returns the number of bytes actually sent, which may be less than `data.len()`
55    /// if the stream fills before the timeout.
56    ///
57    /// # Errors
58    /// Returns [`Error::Timeout`] if no bytes could be sent within `timeout_ms`.
59    pub fn send(&self, data: &[u8], timeout_ms: u32) -> Result<usize> {
60        let mut bytes_sent: usize = 0;
61        let rc = unsafe {
62            bindings::ove_stream_send(
63                self.handle,
64                data.as_ptr() as *const _,
65                data.len(),
66                timeout_ms,
67                &mut bytes_sent,
68            )
69        };
70        Error::from_code(rc)?;
71        Ok(bytes_sent)
72    }
73
74    /// Receive bytes from the stream into `buf`, blocking up to `timeout_ms`.
75    ///
76    /// Returns the number of bytes actually received. Blocks until at least the
77    /// trigger byte count (set at creation time) is available, or `timeout_ms` expires.
78    ///
79    /// # Errors
80    /// Returns [`Error::Timeout`] if no bytes could be received within `timeout_ms`.
81    pub fn receive(&self, buf: &mut [u8], timeout_ms: u32) -> Result<usize> {
82        let mut bytes_received: usize = 0;
83        let rc = unsafe {
84            bindings::ove_stream_receive(
85                self.handle,
86                buf.as_mut_ptr() as *mut _,
87                buf.len(),
88                timeout_ms,
89                &mut bytes_received,
90            )
91        };
92        Error::from_code(rc)?;
93        Ok(bytes_received)
94    }
95
96    /// Send bytes from an ISR context (non-blocking, returns immediately).
97    ///
98    /// Returns the number of bytes sent; may be less than `data.len()` if the buffer fills.
99    ///
100    /// # Errors
101    /// Returns an error if the stream is full.
102    pub fn send_from_isr(&self, data: &[u8]) -> Result<usize> {
103        let mut bytes_sent: usize = 0;
104        let rc = unsafe {
105            bindings::ove_stream_send_from_isr(
106                self.handle,
107                data.as_ptr() as *const _,
108                data.len(),
109                &mut bytes_sent,
110            )
111        };
112        Error::from_code(rc)?;
113        Ok(bytes_sent)
114    }
115
116    /// Receive bytes from an ISR context (non-blocking, returns immediately).
117    ///
118    /// Returns the number of bytes received; may be zero if no data is available.
119    ///
120    /// # Errors
121    /// Returns an error if the stream is empty.
122    pub fn receive_from_isr(&self, buf: &mut [u8]) -> Result<usize> {
123        let mut bytes_received: usize = 0;
124        let rc = unsafe {
125            bindings::ove_stream_receive_from_isr(
126                self.handle,
127                buf.as_mut_ptr() as *mut _,
128                buf.len(),
129                &mut bytes_received,
130            )
131        };
132        Error::from_code(rc)?;
133        Ok(bytes_received)
134    }
135
136    /// Reset the stream, discarding all currently buffered data.
137    ///
138    /// # Errors
139    /// Returns an error if the underlying RTOS call fails.
140    pub fn reset(&self) -> Result<()> {
141        let rc = unsafe { bindings::ove_stream_reset(self.handle) };
142        Error::from_code(rc)
143    }
144
145    /// Return the number of bytes currently available for reading.
146    pub fn bytes_available(&self) -> usize {
147        unsafe { bindings::ove_stream_bytes_available(self.handle) }
148    }
149}
150
151impl<const N: usize> Drop for Stream<N> {
152    fn drop(&mut self) {
153        if self.handle.is_null() { return; }
154        #[cfg(not(zero_heap))]
155        unsafe { bindings::ove_stream_destroy(self.handle) }
156        #[cfg(zero_heap)]
157        unsafe { bindings::ove_stream_deinit(self.handle) }
158    }
159}
160
161// SAFETY: Stream wraps a ove_stream_t handle. Send/receive are
162// thread-safe RTOS calls. Create/destroy are single-threaded (lifecycle).
163unsafe impl<const N: usize> Send for Stream<N> {}
164unsafe impl<const N: usize> Sync for Stream<N> {}