1use ::core::sync::atomic::{AtomicUsize, Ordering};
36
37use crate::error::{Error, Result};
38use crate::queue::Queue;
39
40#[cfg(all(feature = "alloc", not(zero_heap)))]
41extern crate alloc;
42
43#[cfg(all(feature = "alloc", not(zero_heap)))]
45struct ChannelInner<T: Copy, const N: usize> {
46 queue: Queue<T, N>,
47 tx_count: AtomicUsize,
48 rx_count: AtomicUsize,
49}
50
51pub struct Sender<T: Copy + 'static, const N: usize> {
53 state: SenderState<T, N>,
54}
55
56enum SenderState<T: Copy + 'static, const N: usize> {
57 #[cfg(all(feature = "alloc", not(zero_heap)))]
58 Heap(alloc::sync::Arc<ChannelInner<T, N>>),
59 Static {
60 queue: &'static Queue<T, N>,
61 tx_count: &'static AtomicUsize,
62 rx_count: &'static AtomicUsize,
63 },
64}
65
66pub struct Receiver<T: Copy + 'static, const N: usize> {
68 state: ReceiverState<T, N>,
69}
70
71enum ReceiverState<T: Copy + 'static, const N: usize> {
72 #[cfg(all(feature = "alloc", not(zero_heap)))]
73 Heap(alloc::sync::Arc<ChannelInner<T, N>>),
74 Static {
75 queue: &'static Queue<T, N>,
76 tx_count: &'static AtomicUsize,
77 rx_count: &'static AtomicUsize,
78 },
79}
80
81#[cfg(all(feature = "alloc", not(zero_heap)))]
88pub fn channel<T: Copy + 'static, const N: usize>() -> Result<(Sender<T, N>, Receiver<T, N>)> {
89 let inner = alloc::sync::Arc::new(ChannelInner {
90 queue: Queue::<T, N>::new()?,
91 tx_count: AtomicUsize::new(1),
92 rx_count: AtomicUsize::new(1),
93 });
94 Ok((
95 Sender {
96 state: SenderState::Heap(inner.clone()),
97 },
98 Receiver {
99 state: ReceiverState::Heap(inner),
100 },
101 ))
102}
103
104#[inline]
107fn queue_of<T: Copy + 'static, const N: usize>(s: &SenderState<T, N>) -> &Queue<T, N> {
108 match s {
109 #[cfg(all(feature = "alloc", not(zero_heap)))]
110 SenderState::Heap(arc) => &arc.queue,
111 SenderState::Static { queue, .. } => queue,
112 }
113}
114
115#[inline]
116fn rx_count_of<T: Copy + 'static, const N: usize>(s: &SenderState<T, N>) -> &AtomicUsize {
117 match s {
118 #[cfg(all(feature = "alloc", not(zero_heap)))]
119 SenderState::Heap(arc) => &arc.rx_count,
120 SenderState::Static { rx_count, .. } => rx_count,
121 }
122}
123
124#[inline]
125fn tx_count_of<T: Copy + 'static, const N: usize>(s: &SenderState<T, N>) -> &AtomicUsize {
126 match s {
127 #[cfg(all(feature = "alloc", not(zero_heap)))]
128 SenderState::Heap(arc) => &arc.tx_count,
129 SenderState::Static { tx_count, .. } => tx_count,
130 }
131}
132
133#[inline]
134fn queue_of_rx<T: Copy + 'static, const N: usize>(s: &ReceiverState<T, N>) -> &Queue<T, N> {
135 match s {
136 #[cfg(all(feature = "alloc", not(zero_heap)))]
137 ReceiverState::Heap(arc) => &arc.queue,
138 ReceiverState::Static { queue, .. } => queue,
139 }
140}
141
142#[inline]
143fn rx_count_of_rx<T: Copy + 'static, const N: usize>(s: &ReceiverState<T, N>) -> &AtomicUsize {
144 match s {
145 #[cfg(all(feature = "alloc", not(zero_heap)))]
146 ReceiverState::Heap(arc) => &arc.rx_count,
147 ReceiverState::Static { rx_count, .. } => rx_count,
148 }
149}
150
151#[inline]
152fn tx_count_of_rx<T: Copy + 'static, const N: usize>(s: &ReceiverState<T, N>) -> &AtomicUsize {
153 match s {
154 #[cfg(all(feature = "alloc", not(zero_heap)))]
155 ReceiverState::Heap(arc) => &arc.tx_count,
156 ReceiverState::Static { tx_count, .. } => tx_count,
157 }
158}
159
160impl<T: Copy + 'static, const N: usize> Sender<T, N> {
163 pub const unsafe fn from_static(
174 queue: &'static Queue<T, N>,
175 tx_count: &'static AtomicUsize,
176 rx_count: &'static AtomicUsize,
177 ) -> Self {
178 Self {
179 state: SenderState::Static {
180 queue,
181 tx_count,
182 rx_count,
183 },
184 }
185 }
186
187 pub fn send(&self, item: T) -> Result<()> {
190 if rx_count_of(&self.state).load(Ordering::Acquire) == 0 {
191 return Err(Error::NetClosed);
192 }
193 queue_of(&self.state).send(&item)
194 }
195
196 pub fn try_send(&self, item: T) -> Result<()> {
199 if rx_count_of(&self.state).load(Ordering::Acquire) == 0 {
200 return Err(Error::NetClosed);
201 }
202 queue_of(&self.state).try_send(&item)
203 }
204
205 pub fn sender_count(&self) -> usize {
208 tx_count_of(&self.state).load(Ordering::Acquire)
209 }
210
211 pub fn receiver_count(&self) -> usize {
213 rx_count_of(&self.state).load(Ordering::Acquire)
214 }
215}
216
217impl<T: Copy + 'static, const N: usize> Clone for Sender<T, N> {
218 fn clone(&self) -> Self {
219 tx_count_of(&self.state).fetch_add(1, Ordering::AcqRel);
220 Self {
221 state: match &self.state {
222 #[cfg(all(feature = "alloc", not(zero_heap)))]
223 SenderState::Heap(arc) => SenderState::Heap(arc.clone()),
224 SenderState::Static {
225 queue,
226 tx_count,
227 rx_count,
228 } => SenderState::Static {
229 queue,
230 tx_count,
231 rx_count,
232 },
233 },
234 }
235 }
236}
237
238impl<T: Copy + 'static, const N: usize> Drop for Sender<T, N> {
239 fn drop(&mut self) {
240 tx_count_of(&self.state).fetch_sub(1, Ordering::AcqRel);
241 }
242}
243
244impl<T: Copy + 'static, const N: usize> Receiver<T, N> {
247 pub const unsafe fn from_static(
254 queue: &'static Queue<T, N>,
255 tx_count: &'static AtomicUsize,
256 rx_count: &'static AtomicUsize,
257 ) -> Self {
258 Self {
259 state: ReceiverState::Static {
260 queue,
261 tx_count,
262 rx_count,
263 },
264 }
265 }
266
267 pub fn recv(&self) -> Result<T> {
271 if let Ok(v) = queue_of_rx(&self.state).try_recv() {
274 return Ok(v);
275 }
276 if tx_count_of_rx(&self.state).load(Ordering::Acquire) == 0 {
277 return Err(Error::NetClosed);
278 }
279 queue_of_rx(&self.state).recv()
280 }
281
282 pub fn try_recv(&self) -> Result<T> {
286 match queue_of_rx(&self.state).try_recv() {
287 Ok(v) => Ok(v),
288 Err(Error::QueueEmpty | Error::Timeout) => {
289 if tx_count_of_rx(&self.state).load(Ordering::Acquire) == 0 {
290 Err(Error::NetClosed)
291 } else {
292 Err(Error::Timeout)
293 }
294 }
295 Err(e) => Err(e),
296 }
297 }
298
299 pub fn sender_count(&self) -> usize {
301 tx_count_of_rx(&self.state).load(Ordering::Acquire)
302 }
303
304 pub fn receiver_count(&self) -> usize {
306 rx_count_of_rx(&self.state).load(Ordering::Acquire)
307 }
308}
309
310impl<T: Copy + 'static, const N: usize> Clone for Receiver<T, N> {
311 fn clone(&self) -> Self {
312 rx_count_of_rx(&self.state).fetch_add(1, Ordering::AcqRel);
313 Self {
314 state: match &self.state {
315 #[cfg(all(feature = "alloc", not(zero_heap)))]
316 ReceiverState::Heap(arc) => ReceiverState::Heap(arc.clone()),
317 ReceiverState::Static {
318 queue,
319 tx_count,
320 rx_count,
321 } => ReceiverState::Static {
322 queue,
323 tx_count,
324 rx_count,
325 },
326 },
327 }
328 }
329}
330
331impl<T: Copy + 'static, const N: usize> Drop for Receiver<T, N> {
332 fn drop(&mut self) {
333 rx_count_of_rx(&self.state).fetch_sub(1, Ordering::AcqRel);
334 }
335}