ove/async_runtime/queue.rs
1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Async wrapper around [`crate::Queue`] using `ove_queue_set_notify`.
8//!
9//! Same pattern as `AsyncStream`: own the wrapped primitive + an
10//! `AtomicWaker`, register a C trampoline that wakes the waker, poll
11//! `try_recv` with waker-recheck.
12//!
13//! Lifetime: methods take `&'static self` because the C-side notify
14//! retains a pointer to the internal AtomicWaker.
15
16use core::future::poll_fn;
17use core::task::Poll;
18
19use embassy_sync::waitqueue::AtomicWaker;
20
21use crate::error::{Error, Result};
22use crate::queue::Queue;
23
24/// Async wrapper around an [`ove::Queue`](crate::Queue).
25pub struct AsyncQueue<T: Copy, const N: usize> {
26 inner: Queue<T, N>,
27 waker: AtomicWaker,
28}
29
30// SAFETY: `AsyncQueue<T, N>` wraps a `Queue<T, N>` whose own Send/Sync
31// impls (see `queue.rs`) reflect the substrate's locking. The `T: Copy +
32// Send` bound guarantees items can cross thread boundaries.
33unsafe impl<T: Copy + Send, const N: usize> Send for AsyncQueue<T, N> {}
34unsafe impl<T: Copy + Send, const N: usize> Sync for AsyncQueue<T, N> {}
35
36impl<T: Copy, const N: usize> AsyncQueue<T, N> {
37 /// Wrap a queue for async use. See [`Self::arm`] for the lifetime
38 /// constraint.
39 pub const fn new(inner: Queue<T, N>) -> Self {
40 Self {
41 inner,
42 waker: AtomicWaker::new(),
43 }
44 }
45
46 /// Register the C-side notify callback. Must be called exactly once
47 /// after the wrapper reaches its final 'static location.
48 pub fn arm(&'static self) -> Result<()> {
49 unsafe {
50 self.inner.set_notify(
51 Some(queue_notify_trampoline::<T, N>),
52 &self.waker as *const AtomicWaker as *mut core::ffi::c_void,
53 )
54 }
55 }
56
57 /// Async receive — yields control until an item is available, then
58 /// returns it.
59 pub async fn recv(&'static self) -> Result<T> {
60 poll_fn(|cx| {
61 // Fast path
62 match self.inner.try_recv() {
63 Ok(v) => return Poll::Ready(Ok(v)),
64 Err(Error::WouldBlock) | Err(Error::QueueEmpty) | Err(Error::Timeout) => {}
65 Err(e) => return Poll::Ready(Err(e)),
66 }
67 // Register + recheck
68 self.waker.register(cx.waker());
69 match self.inner.try_recv() {
70 Ok(v) => Poll::Ready(Ok(v)),
71 Err(Error::WouldBlock) | Err(Error::QueueEmpty) | Err(Error::Timeout) => {
72 Poll::Pending
73 }
74 Err(e) => Poll::Ready(Err(e)),
75 }
76 })
77 .await
78 }
79
80 /// Borrow the underlying [`Queue`] for synchronous operations
81 /// (`try_send`, etc.).
82 #[inline]
83 pub fn inner(&self) -> &Queue<T, N> {
84 &self.inner
85 }
86}
87
88unsafe extern "C" fn queue_notify_trampoline<T: Copy, const N: usize>(
89 user_data: *mut core::ffi::c_void,
90) {
91 let waker = unsafe { &*(user_data as *const AtomicWaker) };
92 waker.wake();
93}