ove/async_runtime/stream.rs
1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Async wrapper around [`crate::Stream`] using the C-level
8//! `ove_stream_set_notify` notify hook.
9//!
10//! The pattern is the canonical "embassy primitive on top of an RTOS
11//! synchronous primitive":
12//!
13//! 1. The wrapper owns an `AtomicWaker`.
14//! 2. On construction it registers a C callback against the underlying
15//! stream; the callback is `AtomicWaker::wake` via a void* user_data.
16//! 3. `recv_async` polls the non-blocking `Stream::try_recv` first,
17//! and on `WouldBlock` registers `cx.waker()` into the AtomicWaker,
18//! then re-checks (avoids lost-wake races against the producer).
19//!
20//! Lifetime constraint: the C callback retains a pointer to the
21//! AtomicWaker, so the AsyncStream's address must be stable for its
22//! entire lifetime. The simplest pattern is a `static` declaration or
23//! `Box::leak`. Methods take `&'static self` to enforce this at the
24//! type level.
25
26use core::future::poll_fn;
27use core::task::Poll;
28
29use embassy_sync::waitqueue::AtomicWaker;
30
31use crate::error::{Error, Result};
32use crate::stream::Stream;
33
34/// Async wrapper around an [`ove::Stream`](crate::Stream).
35///
36/// Owns the wrapped stream and the `AtomicWaker` used to bridge the
37/// C-level notify hook into the embassy executor's wake path.
38pub struct AsyncStream<const N: usize> {
39 inner: Stream<N>,
40 waker: AtomicWaker,
41}
42
43// SAFETY: `Stream<N>` is itself Send+Sync (handle is a kernel-managed
44// pointer); `AtomicWaker` is Sync. The wrapper inherits both.
45unsafe impl<const N: usize> Send for AsyncStream<N> {}
46unsafe impl<const N: usize> Sync for AsyncStream<N> {}
47
48impl<const N: usize> AsyncStream<N> {
49 /// Wrap a stream for async use. Does not arm the notify hook yet —
50 /// call [`Self::arm`] after the wrapper has reached its final
51 /// 'static location (the C callback retains a pointer to the
52 /// internal `AtomicWaker`, so the wrapper must not move after
53 /// arming).
54 pub const fn new(inner: Stream<N>) -> Self {
55 Self {
56 inner,
57 waker: AtomicWaker::new(),
58 }
59 }
60
61 /// Register the C-side notify callback. Must be called exactly once,
62 /// and only after the wrapper is at its final 'static address.
63 pub fn arm(&'static self) -> Result<()> {
64 // SAFETY: the wrapper is 'static, so the waker pointer is valid
65 // for the rest of the program's lifetime; `notify_trampoline`
66 // dereferences it as `&AtomicWaker`, which is Sync.
67 unsafe {
68 self.inner.set_notify(
69 Some(notify_trampoline),
70 &self.waker as *const AtomicWaker as *mut core::ffi::c_void,
71 )
72 }
73 }
74
75 /// Receive up to `buf.len()` bytes. Awaits until at least one byte
76 /// is available, then returns the number of bytes copied.
77 ///
78 /// Returns `Ok(0)` only if `buf.is_empty()`.
79 pub async fn read(&'static self, buf: &mut [u8]) -> Result<usize> {
80 if buf.is_empty() {
81 return Ok(0);
82 }
83 poll_fn(|cx| {
84 // Fast path — try non-blocking read before touching the
85 // waker. Covers the common case where bytes are already
86 // pending.
87 match self.inner.try_recv(buf) {
88 Ok(n) if n > 0 => return Poll::Ready(Ok(n)),
89 Ok(_) | Err(Error::WouldBlock) | Err(Error::Timeout) => {}
90 Err(e) => return Poll::Ready(Err(e)),
91 }
92 // Register, then re-check. The recheck closes the race where
93 // a producer fires the notify callback (which wakes the
94 // current waker, not ours) between our first try_recv and
95 // the register call.
96 self.waker.register(cx.waker());
97 match self.inner.try_recv(buf) {
98 Ok(n) if n > 0 => Poll::Ready(Ok(n)),
99 Ok(_) | Err(Error::WouldBlock) | Err(Error::Timeout) => Poll::Pending,
100 Err(e) => Poll::Ready(Err(e)),
101 }
102 })
103 .await
104 }
105
106 /// Borrow the underlying [`Stream`] for synchronous operations
107 /// (`send`, `bytes_available`, …). The async `read` is the only
108 /// path that touches the AtomicWaker; non-blocking access through
109 /// this borrow is independent.
110 #[inline]
111 pub fn inner(&self) -> &Stream<N> {
112 &self.inner
113 }
114}
115
116/// C-ABI trampoline invoked by `ove_stream_send` / `_send_from_isr`
117/// after a successful write. `user_data` is the address of the
118/// AtomicWaker inside an `AsyncStream` (pinned to 'static by the
119/// `arm()` contract).
120unsafe extern "C" fn notify_trampoline(user_data: *mut core::ffi::c_void) {
121 // SAFETY: the pointer was set up by AsyncStream::arm; the target is
122 // a valid AtomicWaker pinned for 'static. AtomicWaker::wake is
123 // lock-free and ISR-safe.
124 let waker = unsafe { &*(user_data as *const AtomicWaker) };
125 waker.wake();
126}