keos/
channel.rs

1//! Multi-producer, multi-consumer FIFO queue communication primitives.
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined among two types:
5//!
6//! * [`Sender`]
7//! * [`Receiver`]
8//!
9//! A [`Sender`] is used to send data to a [`Receiver`]. Both
10//! sender and receiver are clone-able (multi-producer) such that many threads
11//! can send simultaneously to multiple receiver (multi-consumer).
12//!
13//!
14//! [`send`]: Sender::send
15//!
16//! ## Disconnection
17//!
18//! The send and receive operations on channels will all return a `Result`
19//! indicating whether the operation succeeded or not. An unsuccessful operation
20//! is normally indicative of the other half of a channel having "hung up" by
21//! being dropped in its corresponding thread.
22//!
23//! Once half of a channel has been deallocated, most operations can no longer
24//! continue to make progress, so `Err` will be returned. Many applications
25//! will continue to `unwrap` the results returned from this module,
26//! instigating a propagation of failure among threads if one unexpectedly dies.
27
28use crate::{
29    spinlock::SpinLock,
30    thread::{Current, ParkHandle},
31};
32use alloc::{boxed::Box, vec::Vec};
33use core::{
34    fmt,
35    sync::atomic::{AtomicUsize, Ordering},
36};
37use crossbeam_queue::ArrayQueue;
38
39pub(crate) struct ChannelInner<T> {
40    pub q: ArrayQueue<T>,
41    pub tx_cnt: AtomicUsize,
42    pub rx_cnt: AtomicUsize,
43    tx_waiter: SpinLock<Vec<ParkHandle>>,
44    rx_waiter: SpinLock<Vec<ParkHandle>>,
45}
46
47impl<T> ChannelInner<T> {
48    #[inline]
49    pub fn has_receiver(&self) -> bool {
50        self.rx_cnt.load(Ordering::Acquire) != 0
51    }
52
53    #[inline]
54    pub fn has_sender(&self) -> bool {
55        self.tx_cnt.load(Ordering::Acquire) != 0
56    }
57
58    #[inline]
59    pub fn capacity(&self) -> usize {
60        self.q.capacity()
61    }
62
63    pub fn push(
64        &self,
65        value: T,
66        do_unpark: impl Fn(ParkHandle) -> Result<(), ()>,
67    ) -> Result<(), T> {
68        match self.q.push(value) {
69            Ok(_) => {
70                let mut guard = self.rx_waiter.lock();
71                if let Some(th) = guard.pop() {
72                    do_unpark(th).expect("Failed to unpark channel tx waiter.")
73                }
74                guard.unlock();
75                Ok(())
76            }
77            Err(e) => Err(e),
78        }
79    }
80    pub fn pop(&self, do_unpark: impl Fn(ParkHandle) -> Result<(), ()>) -> Option<T> {
81        match self.q.pop() {
82            Some(v) => {
83                let mut guard = self.tx_waiter.lock();
84
85                if let Some(th) = guard.pop() {
86                    do_unpark(th).expect("Failed to unpark channel rx waiter.")
87                }
88                guard.unlock();
89                Some(v)
90            }
91            None => None,
92        }
93    }
94}
95
96/// The receiving half of [`channel`] type.
97///
98/// This half can only be owned by one thread, but it can be cloned to receive
99/// to other threads.
100///
101/// Messages sent to the channel can be retrieved using [`recv`].
102///
103/// [`recv`]: Receiver::recv
104pub struct Receiver<T: core::marker::Send + 'static> {
105    inner: *mut ChannelInner<T>,
106}
107
108// The receiver port can be sent from place to place, so long as it
109// is not used to receive non-sendable things.
110unsafe impl<T: Send> Send for Receiver<T> {}
111unsafe impl<T: Send> Sync for Receiver<T> {}
112
113/// An iterator over messages on a [`Receiver`], created by [`iter`].
114///
115/// This iterator will block whenever `next` is called,
116/// waiting for a new message, and `None` will be returned
117/// when the corresponding channel has hung up.
118///
119/// [`iter`]: Receiver::iter
120#[derive(Debug)]
121pub struct Iter<'a, T: core::marker::Send + 'static> {
122    rx: &'a Receiver<T>,
123}
124
125/// An iterator that attempts to yield all pending values for a [`Receiver`],
126/// created by [`try_iter`].
127///
128/// `None` will be returned when there are no pending values remaining or
129/// if the corresponding channel has hung up.
130///
131/// This iterator will never block the caller in order to wait for data to
132/// become available. Instead, it will return `None`.
133///
134/// [`try_iter`]: Receiver::try_iter
135#[derive(Debug)]
136pub struct TryIter<'a, T: core::marker::Send + 'static> {
137    rx: &'a Receiver<T>,
138}
139
140/// An owning iterator over messages on a [`Receiver`],
141/// created by [`Receiver::into_iter`].
142///
143/// This iterator will block whenever [`next`]
144/// is called, waiting for a new message, and `None` will be
145/// returned if the corresponding channel has hung up.
146///
147/// [`next`]: Iterator::next
148#[derive(Debug)]
149pub struct IntoIter<T: core::marker::Send + 'static> {
150    rx: Receiver<T>,
151}
152
153/// The sending-half of [`channel`] type.
154///
155/// This half can only be owned by one thread, but it can be cloned to send to
156/// other threads.
157///
158/// Messages can be sent through this channel with [`send`].
159///
160/// [`send`]: Sender::send
161pub struct Sender<T: core::marker::Send + 'static> {
162    inner: *mut ChannelInner<T>,
163}
164
165// The send port can be sent from place to place, so long as it
166// is not used to send non-sendable things.
167unsafe impl<T: Send> Send for Sender<T> {}
168unsafe impl<T: Send> Sync for Sender<T> {}
169
170/// An error returned from the [`Sender::send`] function on **channel**s.
171///
172/// A **send** operation can only fail if the receiving end of a channel is
173/// disconnected, implying that the data could never be received. The error
174/// contains the data being sent as a payload so it can be recovered.
175///
176/// [`Sender::send`]: Sender::send
177#[derive(PartialEq, Eq, Clone, Copy)]
178pub struct SendError<T>(pub T);
179
180/// An error returned from the [`recv`] function on a [`Receiver`].
181///
182/// The [`recv`] operation can only fail if the sending half of a
183/// [`channel`] is disconnected, implying that no further
184/// messages will ever be received.
185///
186/// [`recv`]: Receiver::recv
187#[derive(PartialEq, Eq, Clone, Copy, Debug)]
188pub struct RecvError;
189
190/// The list of the possible error outcomes for the [`try_send`] method.
191///
192/// [`try_send`]: Sender::try_send
193#[derive(PartialEq, Eq, Clone, Copy)]
194pub enum TrySendError<T> {
195    /// The data could not be sent on the [`channel`] because it would
196    /// require that the callee block to send the data.
197    ///
198    /// If this is a buffered channel, then the buffer is full at this time. If
199    /// this is not a buffered channel, then there is no [`Receiver`] available
200    /// to acquire the data.
201    Full(T),
202
203    /// This [`channel`]'s receiving half has disconnected, so the data
204    /// could not be sent. The data is returned back to the callee in this
205    /// case.
206    Disconnected(T),
207}
208
209/// The list of the possible reasons that [`try_recv`] could not return data
210/// when called.
211///
212/// This can occur with a [`channel`].
213///
214/// [`try_recv`]: Receiver::try_recv
215#[derive(PartialEq, Eq, Clone, Copy, Debug)]
216pub enum TryRecvError {
217    /// This **channel** is currently empty, but the **Sender**(s) have not yet
218    /// disconnected, so data may yet become available.
219    Empty,
220
221    /// The **channel**'s sending half has become disconnected, and there will
222    /// never be any more data received on it.
223    Disconnected,
224}
225
226/// Creates a new bounded channel.
227///
228/// All data sent on the [`Sender`] will become available on the [`Receiver`]
229/// in the same order as it was sent. Like asynchronous [`channel`]s, the
230/// [`Receiver`] will block until a message becomes available.
231///
232/// This channel has an internal buffer on which messages will be queued.
233/// `bound` specifies the buffer size. When the internal buffer becomes full,
234/// future sends will *block* waiting for the buffer to open up. Note that a
235/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
236/// where each [`send`] will not return until a [`recv`] is paired with it.
237///
238/// Both [`Sender`] and [`Receiver`] can be cloned to [`send`] or [`recv`] to
239/// the same channel multiple times.
240///
241/// If the [`Receiver`] is disconnected while trying to [`send`] with the
242/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, If
243/// the [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method
244/// will return a [`RecvError`].
245///
246/// [`send`]: Sender::send
247/// [`recv`]: Receiver::recv
248pub fn channel<T: core::marker::Send + 'static>(bound: usize) -> (Sender<T>, Receiver<T>) {
249    let chan = Box::into_raw(Box::new(ChannelInner {
250        q: ArrayQueue::new(bound),
251        tx_cnt: AtomicUsize::new(1),
252        rx_cnt: AtomicUsize::new(1),
253        tx_waiter: SpinLock::new(Vec::new()),
254        rx_waiter: SpinLock::new(Vec::new()),
255    }));
256    (Sender { inner: chan }, Receiver { inner: chan })
257}
258
259////////////////////////////////////////////////////////////////////////////////
260// Sender
261////////////////////////////////////////////////////////////////////////////////
262impl<T: core::marker::Send + 'static> Sender<T> {
263    #[inline]
264    fn inner<'a>(&self) -> &'a ChannelInner<T> {
265        unsafe { &*self.inner }
266    }
267
268    /// Can send a value through this channel.
269    pub fn can_send(&self) -> bool {
270        let inner = self.inner();
271        inner.q.is_empty() && inner.has_receiver()
272    }
273
274    /// Does anyone can receive a value through this channel.
275    pub fn has_receiver(&self) -> bool {
276        let inner = self.inner();
277        inner.has_receiver()
278    }
279
280    /// Sends a value on this channel.
281    ///
282    /// This function will *block* until space in the internal buffer becomes
283    /// available or a receiver is available to hand off the message to.
284    ///
285    /// Note that a successful send does *not* guarantee that the receiver will
286    /// ever see the data if there is a buffer on this channel. Items may be
287    /// enqueued in the internal buffer for the receiver to receive at a later
288    /// time. If the buffer size is 0, however, the channel becomes a rendezvous
289    /// channel and it guarantees that the receiver has indeed received
290    /// the data if this function returns success.
291    ///
292    /// This function will never panic, but it may return `Err` if the
293    /// [`Receiver`] has disconnected and is no longer able to receive
294    /// information.
295    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
296        let inner = self.inner();
297        let mut t_ = t;
298        loop {
299            if !inner.has_receiver() {
300                break Err(SendError(t_));
301            } else {
302                match inner.push(t_, |th| {
303                    th.unpark();
304                    Ok(())
305                }) {
306                    Err(e) => {
307                        t_ = e;
308                        if inner.q.is_full() {
309                            let mut guard = inner.tx_waiter.lock();
310                            if inner.q.is_full() {
311                                Current::park_with(move |th| {
312                                    guard.push(th);
313                                    drop(guard)
314                                });
315                            }
316                        }
317                    }
318                    _ => {
319                        break Ok(());
320                    }
321                }
322            }
323        }
324    }
325
326    /// Attempts to send a value on this channel without blocking.
327    ///
328    /// This method differs from [`send`] by returning immediately if the
329    /// channel's buffer is full or no receiver is waiting to acquire some
330    /// data. Compared with [`send`], this function has two failure cases
331    /// instead of one (one for disconnection, one for a full buffer).
332    ///
333    /// See [`send`] for notes about guarantees of whether the
334    /// receiver has received the data or not if this function is successful.
335    ///
336    /// [`send`]: Self::send
337    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
338        let inner = self.inner();
339        if !inner.has_receiver() {
340            Err(TrySendError::Disconnected(t))
341        } else {
342            match inner.push(t, |th| {
343                th.unpark();
344                Ok(())
345            }) {
346                Err(t) => Err(TrySendError::Full(t)),
347                _ => Ok(()),
348            }
349        }
350    }
351
352    /// Get capacity of the channel.
353    pub fn capacity(&self) -> usize {
354        self.inner().capacity()
355    }
356}
357
358impl<T: core::marker::Send + 'static> Clone for Sender<T> {
359    fn clone(&self) -> Sender<T> {
360        if self.inner().tx_cnt.fetch_add(1, Ordering::Relaxed) > isize::MAX as usize {
361            panic!("sender count overflowed.");
362        }
363        Sender { inner: self.inner }
364    }
365}
366
367impl<T: core::marker::Send + 'static> Drop for Sender<T> {
368    fn drop(&mut self) {
369        let inner = self.inner();
370        if inner.tx_cnt.fetch_sub(1, Ordering::AcqRel) == 1
371            && inner.rx_cnt.load(Ordering::Relaxed) == 0
372        {
373            unsafe { drop(Box::from_raw(self.inner)) }
374        }
375    }
376}
377
378impl<T: core::marker::Send + 'static> fmt::Debug for Sender<T> {
379    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380        f.debug_struct("Sender").finish()
381    }
382}
383
384////////////////////////////////////////////////////////////////////////////////
385// Receiver
386////////////////////////////////////////////////////////////////////////////////
387
388impl<T: core::marker::Send + 'static> Receiver<T> {
389    #[inline]
390    fn inner<'a>(&self) -> &'a ChannelInner<T> {
391        unsafe { &*self.inner }
392    }
393
394    /// Can receive a value through this channel.
395    pub fn can_recv(&self) -> bool {
396        let inner = self.inner();
397        !inner.q.is_empty() && inner.has_sender()
398    }
399
400    /// Does anyone can send a value through this channel.
401    pub fn has_sender(&self) -> bool {
402        let inner = self.inner();
403        inner.has_sender()
404    }
405
406    /// Attempts to wait for a value on this receiver, returning an error if the
407    /// corresponding channel has hung up.
408    ///
409    /// This function will always block the current thread if there is no data
410    /// available and it's possible for more data to be sent. Once a message is
411    /// sent to the corresponding [`Sender`] (or [`Sender`]), then this
412    /// receiver will wake up and return that message.
413    ///
414    /// If the corresponding [`Sender`] has disconnected, or it disconnects
415    /// while this call is blocking, this call will wake up and return
416    /// `Err` to indicate that no more messages can ever be received on
417    /// this channel. However, since channels are buffered, messages sent
418    /// before the disconnect will still be properly received.
419    pub fn recv(&self) -> Result<T, RecvError> {
420        let inner = self.inner();
421        loop {
422            match inner.pop(|th| {
423                th.unpark();
424                Ok(())
425            }) {
426                Some(n) => break Ok(n),
427                None if !inner.has_sender() => {
428                    break inner
429                        .pop(|th| {
430                            th.unpark();
431                            Ok(())
432                        })
433                        .ok_or(RecvError);
434                }
435                None => {
436                    let mut guard = inner.rx_waiter.lock();
437                    match inner.pop(|th| {
438                        th.unpark();
439                        Ok(())
440                    }) {
441                        Some(n) => {
442                            guard.unlock();
443                            break Ok(n);
444                        }
445                        _ => {
446                            Current::park_with(|handle| {
447                                guard.push(handle);
448                                guard.unlock();
449                            });
450                        }
451                    }
452                }
453            }
454        }
455    }
456
457    /// Attempts to return a pending value on this receiver without blocking.
458    ///
459    /// This method will never block the caller in order to wait for data to
460    /// become available. Instead, this will always return immediately with a
461    /// possible option of pending data on the channel.
462    ///
463    /// This is useful for a flavor of "optimistic check" before deciding to
464    /// block on a receiver.
465    ///
466    /// Compared with [`recv`], this function has two failure cases instead of
467    /// one (one for disconnection, one for an empty buffer).
468    ///
469    /// [`recv`]: Self::recv
470    pub fn try_recv(&self) -> Result<T, TryRecvError> {
471        let inner = self.inner();
472        match inner.pop(|th| {
473            th.unpark();
474            Ok(())
475        }) {
476            Some(n) => Ok(n),
477            None if !inner.has_sender() => inner
478                .pop(|th| {
479                    th.unpark();
480                    Ok(())
481                })
482                .ok_or(TryRecvError::Disconnected),
483            None => Err(TryRecvError::Empty),
484        }
485    }
486
487    /// Returns an iterator that will block waiting for messages, but never
488    /// panicked. It will return `None` when the channel has hung up.
489    pub fn iter(&self) -> Iter<'_, T> {
490        Iter { rx: self }
491    }
492
493    /// Returns an iterator that will attempt to yield all pending values.
494    /// It will return `None` if there are no more pending values or if the
495    /// channel has hung up. The iterator will never panicked or block the
496    /// user by waiting for values.
497    pub fn try_iter(&self) -> TryIter<'_, T> {
498        TryIter { rx: self }
499    }
500
501    /// Get capacity of the channel.
502    pub fn capacity(&self) -> usize {
503        self.inner().capacity()
504    }
505}
506
507impl<T: core::marker::Send + 'static> Iterator for Iter<'_, T> {
508    type Item = T;
509
510    fn next(&mut self) -> Option<T> {
511        self.rx.recv().ok()
512    }
513}
514
515impl<T: core::marker::Send + 'static> Iterator for TryIter<'_, T> {
516    type Item = T;
517
518    fn next(&mut self) -> Option<T> {
519        self.rx.try_recv().ok()
520    }
521}
522
523impl<'a, T: core::marker::Send + 'static> IntoIterator for &'a Receiver<T> {
524    type Item = T;
525    type IntoIter = Iter<'a, T>;
526
527    fn into_iter(self) -> Iter<'a, T> {
528        self.iter()
529    }
530}
531
532impl<T: core::marker::Send + 'static> Iterator for IntoIter<T> {
533    type Item = T;
534    fn next(&mut self) -> Option<T> {
535        self.rx.recv().ok()
536    }
537}
538
539impl<T: core::marker::Send + 'static> IntoIterator for Receiver<T> {
540    type Item = T;
541    type IntoIter = IntoIter<T>;
542
543    fn into_iter(self) -> IntoIter<T> {
544        IntoIter { rx: self }
545    }
546}
547
548impl<T: core::marker::Send + 'static> Clone for Receiver<T> {
549    fn clone(&self) -> Receiver<T> {
550        if self.inner().rx_cnt.fetch_add(1, Ordering::Relaxed) > isize::MAX as usize {
551            panic!("receiver count overflowed.");
552        }
553        Receiver { inner: self.inner }
554    }
555}
556
557impl<T: core::marker::Send + 'static> Drop for Receiver<T> {
558    fn drop(&mut self) {
559        let inner = self.inner();
560        if inner.rx_cnt.fetch_sub(1, Ordering::AcqRel) == 1
561            && inner.tx_cnt.load(Ordering::Relaxed) == 0
562        {
563            unsafe { drop(Box::from_raw(self.inner)) }
564        }
565    }
566}
567
568impl<T: core::marker::Send + 'static> fmt::Debug for Receiver<T> {
569    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570        f.debug_struct("Receiver").finish()
571    }
572}