Skip to main content

ove/async_runtime/
queue.rs

1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Async wrapper around [`crate::Queue`] using `ove_queue_set_notify`.
8//!
9//! Same pattern as `AsyncStream`: own the wrapped primitive + an
10//! `AtomicWaker`, register a C trampoline that wakes the waker, poll
11//! `try_recv` with waker-recheck.
12//!
13//! Lifetime: methods take `&'static self` because the C-side notify
14//! retains a pointer to the internal AtomicWaker.
15
16use core::future::poll_fn;
17use core::task::Poll;
18
19use embassy_sync::waitqueue::AtomicWaker;
20
21use crate::error::{Error, Result};
22use crate::queue::Queue;
23
24/// Async wrapper around an [`ove::Queue`](crate::Queue).
25pub struct AsyncQueue<T: Copy, const N: usize> {
26    inner: Queue<T, N>,
27    waker: AtomicWaker,
28}
29
30// SAFETY: `AsyncQueue<T, N>` wraps a `Queue<T, N>` whose own Send/Sync
31// impls (see `queue.rs`) reflect the substrate's locking.  The `T: Copy +
32// Send` bound guarantees items can cross thread boundaries.
33unsafe impl<T: Copy + Send, const N: usize> Send for AsyncQueue<T, N> {}
34unsafe impl<T: Copy + Send, const N: usize> Sync for AsyncQueue<T, N> {}
35
36impl<T: Copy, const N: usize> AsyncQueue<T, N> {
37    /// Wrap a queue for async use. See [`Self::arm`] for the lifetime
38    /// constraint.
39    pub const fn new(inner: Queue<T, N>) -> Self {
40        Self {
41            inner,
42            waker: AtomicWaker::new(),
43        }
44    }
45
46    /// Register the C-side notify callback. Must be called exactly once
47    /// after the wrapper reaches its final 'static location.
48    pub fn arm(&'static self) -> Result<()> {
49        unsafe {
50            self.inner.set_notify(
51                Some(queue_notify_trampoline::<T, N>),
52                &self.waker as *const AtomicWaker as *mut core::ffi::c_void,
53            )
54        }
55    }
56
57    /// Async receive — yields control until an item is available, then
58    /// returns it.
59    pub async fn recv(&'static self) -> Result<T> {
60        poll_fn(|cx| {
61            // Fast path
62            match self.inner.try_recv() {
63                Ok(v) => return Poll::Ready(Ok(v)),
64                Err(Error::WouldBlock) | Err(Error::QueueEmpty) | Err(Error::Timeout) => {}
65                Err(e) => return Poll::Ready(Err(e)),
66            }
67            // Register + recheck
68            self.waker.register(cx.waker());
69            match self.inner.try_recv() {
70                Ok(v) => Poll::Ready(Ok(v)),
71                Err(Error::WouldBlock) | Err(Error::QueueEmpty) | Err(Error::Timeout) => {
72                    Poll::Pending
73                }
74                Err(e) => Poll::Ready(Err(e)),
75            }
76        })
77        .await
78    }
79
80    /// Borrow the underlying [`Queue`] for synchronous operations
81    /// (`try_send`, etc.).
82    #[inline]
83    pub fn inner(&self) -> &Queue<T, N> {
84        &self.inner
85    }
86}
87
88unsafe extern "C" fn queue_notify_trampoline<T: Copy, const N: usize>(
89    user_data: *mut core::ffi::c_void,
90) {
91    let waker = unsafe { &*(user_data as *const AtomicWaker) };
92    waker.wake();
93}