Skip to main content

ove/
channel.rs

1// Copyright (C) 2026 Kamil Lulko <kamil.lulko@gmail.com>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4//
5// This file is part of oveRTOS.
6
7//! Crossbeam-style MPMC channel over [`crate::Queue`].
8//!
9//! [`Sender<T, N>`] and [`Receiver<T, N>`] are cloneable handles to a
10//! shared backing [`crate::Queue`]. Multiple producers can hold their
11//! own [`Sender`]; multiple consumers can hold their own [`Receiver`].
12//! The semantics match `std::sync::mpsc` / `crossbeam::channel`:
13//!
14//! - `Sender::send` blocks until the queue accepts the item, or returns
15//!   [`Error::NetClosed`] if all receivers have been dropped.
16//! - `Receiver::recv` blocks until a producer sends, or returns
17//!   [`Error::NetClosed`] if all senders have been dropped.
18//! - `Sender::try_send` / `Receiver::try_recv` are non-blocking
19//!   variants returning [`Error::Timeout`] when empty/full.
20//!
21//! Variants:
22//!
23//! - **Heap mode** ([`channel`]) — convenience constructor that
24//!   allocates the queue + a refcount on the heap. Requires the
25//!   `alloc` feature.
26//! - **Static mode** ([`Sender::from_static`]) — wraps a caller-owned
27//!   `&'static Queue<T, N>` for zero-heap builds; refcount lives in a
28//!   companion `&'static AtomicUsize` you provide.
29//!
30//! Mirrors `zephyr::sync::channel::{Sender, Receiver}` but rides on
31//! the existing `ove_queue_*` FFI so frames can bridge to C producers
32//! / consumers (the C side just calls `ove_queue_send` / `_receive`
33//! and stays oblivious of the Rust-side refcount).
34
35use ::core::sync::atomic::{AtomicUsize, Ordering};
36
37use crate::error::{Error, Result};
38use crate::queue::Queue;
39
40#[cfg(all(feature = "alloc", not(zero_heap)))]
41extern crate alloc;
42
43/// Shared refcount + queue handle for the heap-mode allocation.
44#[cfg(all(feature = "alloc", not(zero_heap)))]
45struct ChannelInner<T: Copy, const N: usize> {
46    queue: Queue<T, N>,
47    tx_count: AtomicUsize,
48    rx_count: AtomicUsize,
49}
50
51/// Multi-producer half of an [`ove::channel`](self).
52pub struct Sender<T: Copy + 'static, const N: usize> {
53    state: SenderState<T, N>,
54}
55
56enum SenderState<T: Copy + 'static, const N: usize> {
57    #[cfg(all(feature = "alloc", not(zero_heap)))]
58    Heap(alloc::sync::Arc<ChannelInner<T, N>>),
59    Static {
60        queue: &'static Queue<T, N>,
61        tx_count: &'static AtomicUsize,
62        rx_count: &'static AtomicUsize,
63    },
64}
65
66/// Multi-consumer half of an [`ove::channel`](self).
67pub struct Receiver<T: Copy + 'static, const N: usize> {
68    state: ReceiverState<T, N>,
69}
70
71enum ReceiverState<T: Copy + 'static, const N: usize> {
72    #[cfg(all(feature = "alloc", not(zero_heap)))]
73    Heap(alloc::sync::Arc<ChannelInner<T, N>>),
74    Static {
75        queue: &'static Queue<T, N>,
76        tx_count: &'static AtomicUsize,
77        rx_count: &'static AtomicUsize,
78    },
79}
80
81/// Construct a heap-allocated channel with one initial sender + one
82/// initial receiver. Clone either half to fan out.
83///
84/// Requires the `alloc` feature and a heap-mode build (i.e. not
85/// `CONFIG_OVE_ZERO_HEAP=y`). Zero-heap callers must use
86/// [`Sender::from_static`] + [`Receiver::from_static`] instead.
87#[cfg(all(feature = "alloc", not(zero_heap)))]
88pub fn channel<T: Copy + 'static, const N: usize>() -> Result<(Sender<T, N>, Receiver<T, N>)> {
89    let inner = alloc::sync::Arc::new(ChannelInner {
90        queue: Queue::<T, N>::new()?,
91        tx_count: AtomicUsize::new(1),
92        rx_count: AtomicUsize::new(1),
93    });
94    Ok((
95        Sender {
96            state: SenderState::Heap(inner.clone()),
97        },
98        Receiver {
99            state: ReceiverState::Heap(inner),
100        },
101    ))
102}
103
104// ── Helpers shared between heap + static state ──────────────────────
105
106#[inline]
107fn queue_of<T: Copy + 'static, const N: usize>(s: &SenderState<T, N>) -> &Queue<T, N> {
108    match s {
109        #[cfg(all(feature = "alloc", not(zero_heap)))]
110        SenderState::Heap(arc) => &arc.queue,
111        SenderState::Static { queue, .. } => queue,
112    }
113}
114
115#[inline]
116fn rx_count_of<T: Copy + 'static, const N: usize>(s: &SenderState<T, N>) -> &AtomicUsize {
117    match s {
118        #[cfg(all(feature = "alloc", not(zero_heap)))]
119        SenderState::Heap(arc) => &arc.rx_count,
120        SenderState::Static { rx_count, .. } => rx_count,
121    }
122}
123
124#[inline]
125fn tx_count_of<T: Copy + 'static, const N: usize>(s: &SenderState<T, N>) -> &AtomicUsize {
126    match s {
127        #[cfg(all(feature = "alloc", not(zero_heap)))]
128        SenderState::Heap(arc) => &arc.tx_count,
129        SenderState::Static { tx_count, .. } => tx_count,
130    }
131}
132
133#[inline]
134fn queue_of_rx<T: Copy + 'static, const N: usize>(s: &ReceiverState<T, N>) -> &Queue<T, N> {
135    match s {
136        #[cfg(all(feature = "alloc", not(zero_heap)))]
137        ReceiverState::Heap(arc) => &arc.queue,
138        ReceiverState::Static { queue, .. } => queue,
139    }
140}
141
142#[inline]
143fn rx_count_of_rx<T: Copy + 'static, const N: usize>(s: &ReceiverState<T, N>) -> &AtomicUsize {
144    match s {
145        #[cfg(all(feature = "alloc", not(zero_heap)))]
146        ReceiverState::Heap(arc) => &arc.rx_count,
147        ReceiverState::Static { rx_count, .. } => rx_count,
148    }
149}
150
151#[inline]
152fn tx_count_of_rx<T: Copy + 'static, const N: usize>(s: &ReceiverState<T, N>) -> &AtomicUsize {
153    match s {
154        #[cfg(all(feature = "alloc", not(zero_heap)))]
155        ReceiverState::Heap(arc) => &arc.tx_count,
156        ReceiverState::Static { tx_count, .. } => tx_count,
157    }
158}
159
160// ── Sender ──────────────────────────────────────────────────────────
161
162impl<T: Copy + 'static, const N: usize> Sender<T, N> {
163    /// Construct a sender from caller-owned static state. Use for
164    /// zero-heap builds. The caller is responsible for ensuring the
165    /// matching [`Receiver::from_static`] uses the same `queue`,
166    /// `tx_count`, and `rx_count` and that both counts start at the
167    /// correct initial value (typically 1 each for one sender + one
168    /// receiver).
169    ///
170    /// # Safety
171    /// Must be called with a queue and counts that are not already in
172    /// use by another channel half.
173    pub const unsafe fn from_static(
174        queue: &'static Queue<T, N>,
175        tx_count: &'static AtomicUsize,
176        rx_count: &'static AtomicUsize,
177    ) -> Self {
178        Self {
179            state: SenderState::Static {
180                queue,
181                tx_count,
182                rx_count,
183            },
184        }
185    }
186
187    /// Send an item. Blocks until the queue accepts it, or returns
188    /// [`Error::NetClosed`] if every [`Receiver`] has been dropped.
189    pub fn send(&self, item: T) -> Result<()> {
190        if rx_count_of(&self.state).load(Ordering::Acquire) == 0 {
191            return Err(Error::NetClosed);
192        }
193        queue_of(&self.state).send(&item)
194    }
195
196    /// Non-blocking send. Returns [`Error::QueueFull`] when full,
197    /// [`Error::NetClosed`] when all receivers are gone.
198    pub fn try_send(&self, item: T) -> Result<()> {
199        if rx_count_of(&self.state).load(Ordering::Acquire) == 0 {
200            return Err(Error::NetClosed);
201        }
202        queue_of(&self.state).try_send(&item)
203    }
204
205    /// Number of currently-live [`Sender`] handles. Snapshot only —
206    /// the count may change under concurrent clone/drop.
207    pub fn sender_count(&self) -> usize {
208        tx_count_of(&self.state).load(Ordering::Acquire)
209    }
210
211    /// Number of currently-live [`Receiver`] handles.
212    pub fn receiver_count(&self) -> usize {
213        rx_count_of(&self.state).load(Ordering::Acquire)
214    }
215}
216
217impl<T: Copy + 'static, const N: usize> Clone for Sender<T, N> {
218    fn clone(&self) -> Self {
219        tx_count_of(&self.state).fetch_add(1, Ordering::AcqRel);
220        Self {
221            state: match &self.state {
222                #[cfg(all(feature = "alloc", not(zero_heap)))]
223                SenderState::Heap(arc) => SenderState::Heap(arc.clone()),
224                SenderState::Static {
225                    queue,
226                    tx_count,
227                    rx_count,
228                } => SenderState::Static {
229                    queue,
230                    tx_count,
231                    rx_count,
232                },
233            },
234        }
235    }
236}
237
238impl<T: Copy + 'static, const N: usize> Drop for Sender<T, N> {
239    fn drop(&mut self) {
240        tx_count_of(&self.state).fetch_sub(1, Ordering::AcqRel);
241    }
242}
243
244// ── Receiver ────────────────────────────────────────────────────────
245
246impl<T: Copy + 'static, const N: usize> Receiver<T, N> {
247    /// Construct a receiver from caller-owned static state. See
248    /// [`Sender::from_static`] for the safety + count-initialisation
249    /// contract.
250    ///
251    /// # Safety
252    /// Same as [`Sender::from_static`].
253    pub const unsafe fn from_static(
254        queue: &'static Queue<T, N>,
255        tx_count: &'static AtomicUsize,
256        rx_count: &'static AtomicUsize,
257    ) -> Self {
258        Self {
259            state: ReceiverState::Static {
260                queue,
261                tx_count,
262                rx_count,
263            },
264        }
265    }
266
267    /// Receive an item. Blocks until a sender posts, or returns
268    /// [`Error::NetClosed`] when every [`Sender`] has been dropped *and*
269    /// the queue is empty.
270    pub fn recv(&self) -> Result<T> {
271        // Fast path: try non-blocking first; if it returns an item we
272        // don't care whether senders are still alive.
273        if let Ok(v) = queue_of_rx(&self.state).try_recv() {
274            return Ok(v);
275        }
276        if tx_count_of_rx(&self.state).load(Ordering::Acquire) == 0 {
277            return Err(Error::NetClosed);
278        }
279        queue_of_rx(&self.state).recv()
280    }
281
282    /// Non-blocking receive. Returns [`Error::Timeout`] when empty
283    /// and senders are still alive; [`Error::NetClosed`] when empty *and*
284    /// all senders are gone.
285    pub fn try_recv(&self) -> Result<T> {
286        match queue_of_rx(&self.state).try_recv() {
287            Ok(v) => Ok(v),
288            Err(Error::QueueEmpty | Error::Timeout) => {
289                if tx_count_of_rx(&self.state).load(Ordering::Acquire) == 0 {
290                    Err(Error::NetClosed)
291                } else {
292                    Err(Error::Timeout)
293                }
294            }
295            Err(e) => Err(e),
296        }
297    }
298
299    /// Number of currently-live [`Sender`] handles.
300    pub fn sender_count(&self) -> usize {
301        tx_count_of_rx(&self.state).load(Ordering::Acquire)
302    }
303
304    /// Number of currently-live [`Receiver`] handles.
305    pub fn receiver_count(&self) -> usize {
306        rx_count_of_rx(&self.state).load(Ordering::Acquire)
307    }
308}
309
310impl<T: Copy + 'static, const N: usize> Clone for Receiver<T, N> {
311    fn clone(&self) -> Self {
312        rx_count_of_rx(&self.state).fetch_add(1, Ordering::AcqRel);
313        Self {
314            state: match &self.state {
315                #[cfg(all(feature = "alloc", not(zero_heap)))]
316                ReceiverState::Heap(arc) => ReceiverState::Heap(arc.clone()),
317                ReceiverState::Static {
318                    queue,
319                    tx_count,
320                    rx_count,
321                } => ReceiverState::Static {
322                    queue,
323                    tx_count,
324                    rx_count,
325                },
326            },
327        }
328    }
329}
330
331impl<T: Copy + 'static, const N: usize> Drop for Receiver<T, N> {
332    fn drop(&mut self) {
333        rx_count_of_rx(&self.state).fetch_sub(1, Ordering::AcqRel);
334    }
335}