Skip to main content

ove/async_runtime/
stream.rs

1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Async wrapper around [`crate::Stream`] using the C-level
8//! `ove_stream_set_notify` notify hook.
9//!
10//! The pattern is the canonical "embassy primitive on top of an RTOS
11//! synchronous primitive":
12//!
13//!   1. The wrapper owns an `AtomicWaker`.
14//!   2. On construction it registers a C callback against the underlying
15//!      stream; the callback is `AtomicWaker::wake` via a void* user_data.
16//!   3. `recv_async` polls the non-blocking `Stream::try_recv` first,
17//!      and on `WouldBlock` registers `cx.waker()` into the AtomicWaker,
18//!      then re-checks (avoids lost-wake races against the producer).
19//!
20//! Lifetime constraint: the C callback retains a pointer to the
21//! AtomicWaker, so the AsyncStream's address must be stable for its
22//! entire lifetime. The simplest pattern is a `static` declaration or
23//! `Box::leak`. Methods take `&'static self` to enforce this at the
24//! type level.
25
26use core::future::poll_fn;
27use core::task::Poll;
28
29use embassy_sync::waitqueue::AtomicWaker;
30
31use crate::error::{Error, Result};
32use crate::stream::Stream;
33
34/// Async wrapper around an [`ove::Stream`](crate::Stream).
35///
36/// Owns the wrapped stream and the `AtomicWaker` used to bridge the
37/// C-level notify hook into the embassy executor's wake path.
38pub struct AsyncStream<const N: usize> {
39    inner: Stream<N>,
40    waker: AtomicWaker,
41}
42
43// SAFETY: `Stream<N>` is itself Send+Sync (handle is a kernel-managed
44// pointer); `AtomicWaker` is Sync. The wrapper inherits both.
45unsafe impl<const N: usize> Send for AsyncStream<N> {}
46unsafe impl<const N: usize> Sync for AsyncStream<N> {}
47
48impl<const N: usize> AsyncStream<N> {
49    /// Wrap a stream for async use. Does not arm the notify hook yet —
50    /// call [`Self::arm`] after the wrapper has reached its final
51    /// 'static location (the C callback retains a pointer to the
52    /// internal `AtomicWaker`, so the wrapper must not move after
53    /// arming).
54    pub const fn new(inner: Stream<N>) -> Self {
55        Self {
56            inner,
57            waker: AtomicWaker::new(),
58        }
59    }
60
61    /// Register the C-side notify callback. Must be called exactly once,
62    /// and only after the wrapper is at its final 'static address.
63    pub fn arm(&'static self) -> Result<()> {
64        // SAFETY: the wrapper is 'static, so the waker pointer is valid
65        // for the rest of the program's lifetime; `notify_trampoline`
66        // dereferences it as `&AtomicWaker`, which is Sync.
67        unsafe {
68            self.inner.set_notify(
69                Some(notify_trampoline),
70                &self.waker as *const AtomicWaker as *mut core::ffi::c_void,
71            )
72        }
73    }
74
75    /// Receive up to `buf.len()` bytes. Awaits until at least one byte
76    /// is available, then returns the number of bytes copied.
77    ///
78    /// Returns `Ok(0)` only if `buf.is_empty()`.
79    pub async fn read(&'static self, buf: &mut [u8]) -> Result<usize> {
80        if buf.is_empty() {
81            return Ok(0);
82        }
83        poll_fn(|cx| {
84            // Fast path — try non-blocking read before touching the
85            // waker. Covers the common case where bytes are already
86            // pending.
87            match self.inner.try_recv(buf) {
88                Ok(n) if n > 0 => return Poll::Ready(Ok(n)),
89                Ok(_) | Err(Error::WouldBlock) | Err(Error::Timeout) => {}
90                Err(e) => return Poll::Ready(Err(e)),
91            }
92            // Register, then re-check. The recheck closes the race where
93            // a producer fires the notify callback (which wakes the
94            // current waker, not ours) between our first try_recv and
95            // the register call.
96            self.waker.register(cx.waker());
97            match self.inner.try_recv(buf) {
98                Ok(n) if n > 0 => Poll::Ready(Ok(n)),
99                Ok(_) | Err(Error::WouldBlock) | Err(Error::Timeout) => Poll::Pending,
100                Err(e) => Poll::Ready(Err(e)),
101            }
102        })
103        .await
104    }
105
106    /// Borrow the underlying [`Stream`] for synchronous operations
107    /// (`send`, `bytes_available`, …). The async `read` is the only
108    /// path that touches the AtomicWaker; non-blocking access through
109    /// this borrow is independent.
110    #[inline]
111    pub fn inner(&self) -> &Stream<N> {
112        &self.inner
113    }
114}
115
116/// C-ABI trampoline invoked by `ove_stream_send` / `_send_from_isr`
117/// after a successful write. `user_data` is the address of the
118/// AtomicWaker inside an `AsyncStream` (pinned to 'static by the
119/// `arm()` contract).
120unsafe extern "C" fn notify_trampoline(user_data: *mut core::ffi::c_void) {
121    // SAFETY: the pointer was set up by AsyncStream::arm; the target is
122    // a valid AtomicWaker pinned for 'static. AtomicWaker::wake is
123    // lock-free and ISR-safe.
124    let waker = unsafe { &*(user_data as *const AtomicWaker) };
125    waker.wake();
126}