keos/channel.rs
1//! Multi-producer, multi-consumer FIFO queue communication primitives.
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined among two types:
5//!
6//! * [`Sender`]
7//! * [`Receiver`]
8//!
9//! A [`Sender`] is used to send data to a [`Receiver`]. Both
10//! sender and receiver are clone-able (multi-producer) such that many threads
11//! can send simultaneously to multiple receiver (multi-consumer).
12//!
13//!
14//! [`send`]: Sender::send
15//!
16//! ## Disconnection
17//!
18//! The send and receive operations on channels will all return a `Result`
19//! indicating whether the operation succeeded or not. An unsuccessful operation
20//! is normally indicative of the other half of a channel having "hung up" by
21//! being dropped in its corresponding thread.
22//!
23//! Once half of a channel has been deallocated, most operations can no longer
24//! continue to make progress, so `Err` will be returned. Many applications
25//! will continue to `unwrap` the results returned from this module,
26//! instigating a propagation of failure among threads if one unexpectedly dies.
27
28use crate::{
29 spinlock::SpinLock,
30 thread::{Current, ParkHandle},
31};
32use alloc::{boxed::Box, vec::Vec};
33use core::{
34 fmt,
35 sync::atomic::{AtomicUsize, Ordering},
36};
37use crossbeam_queue::ArrayQueue;
38
39pub(crate) struct ChannelInner<T> {
40 pub q: ArrayQueue<T>,
41 pub tx_cnt: AtomicUsize,
42 pub rx_cnt: AtomicUsize,
43 tx_waiter: SpinLock<Vec<ParkHandle>>,
44 rx_waiter: SpinLock<Vec<ParkHandle>>,
45}
46
47impl<T> ChannelInner<T> {
48 #[inline]
49 pub fn has_receiver(&self) -> bool {
50 self.rx_cnt.load(Ordering::Acquire) != 0
51 }
52
53 #[inline]
54 pub fn has_sender(&self) -> bool {
55 self.tx_cnt.load(Ordering::Acquire) != 0
56 }
57
58 #[inline]
59 pub fn capacity(&self) -> usize {
60 self.q.capacity()
61 }
62
63 pub fn push(
64 &self,
65 value: T,
66 do_unpark: impl Fn(ParkHandle) -> Result<(), ()>,
67 ) -> Result<(), T> {
68 match self.q.push(value) {
69 Ok(_) => {
70 let mut guard = self.rx_waiter.lock();
71 if let Some(th) = guard.pop() {
72 do_unpark(th).expect("Failed to unpark channel tx waiter.")
73 }
74 guard.unlock();
75 Ok(())
76 }
77 Err(e) => Err(e),
78 }
79 }
80 pub fn pop(&self, do_unpark: impl Fn(ParkHandle) -> Result<(), ()>) -> Option<T> {
81 match self.q.pop() {
82 Some(v) => {
83 let mut guard = self.tx_waiter.lock();
84
85 if let Some(th) = guard.pop() {
86 do_unpark(th).expect("Failed to unpark channel rx waiter.")
87 }
88 guard.unlock();
89 Some(v)
90 }
91 None => None,
92 }
93 }
94}
95
96/// The receiving half of [`channel`] type.
97///
98/// This half can only be owned by one thread, but it can be cloned to receive
99/// to other threads.
100///
101/// Messages sent to the channel can be retrieved using [`recv`].
102///
103/// [`recv`]: Receiver::recv
104pub struct Receiver<T: core::marker::Send + 'static> {
105 inner: *mut ChannelInner<T>,
106}
107
108// The receiver port can be sent from place to place, so long as it
109// is not used to receive non-sendable things.
110unsafe impl<T: Send> Send for Receiver<T> {}
111unsafe impl<T: Send> Sync for Receiver<T> {}
112
113/// An iterator over messages on a [`Receiver`], created by [`iter`].
114///
115/// This iterator will block whenever `next` is called,
116/// waiting for a new message, and `None` will be returned
117/// when the corresponding channel has hung up.
118///
119/// [`iter`]: Receiver::iter
120#[derive(Debug)]
121pub struct Iter<'a, T: core::marker::Send + 'static> {
122 rx: &'a Receiver<T>,
123}
124
125/// An iterator that attempts to yield all pending values for a [`Receiver`],
126/// created by [`try_iter`].
127///
128/// `None` will be returned when there are no pending values remaining or
129/// if the corresponding channel has hung up.
130///
131/// This iterator will never block the caller in order to wait for data to
132/// become available. Instead, it will return `None`.
133///
134/// [`try_iter`]: Receiver::try_iter
135#[derive(Debug)]
136pub struct TryIter<'a, T: core::marker::Send + 'static> {
137 rx: &'a Receiver<T>,
138}
139
140/// An owning iterator over messages on a [`Receiver`],
141/// created by [`Receiver::into_iter`].
142///
143/// This iterator will block whenever [`next`]
144/// is called, waiting for a new message, and `None` will be
145/// returned if the corresponding channel has hung up.
146///
147/// [`next`]: Iterator::next
148#[derive(Debug)]
149pub struct IntoIter<T: core::marker::Send + 'static> {
150 rx: Receiver<T>,
151}
152
153/// The sending-half of [`channel`] type.
154///
155/// This half can only be owned by one thread, but it can be cloned to send to
156/// other threads.
157///
158/// Messages can be sent through this channel with [`send`].
159///
160/// [`send`]: Sender::send
161pub struct Sender<T: core::marker::Send + 'static> {
162 inner: *mut ChannelInner<T>,
163}
164
165// The send port can be sent from place to place, so long as it
166// is not used to send non-sendable things.
167unsafe impl<T: Send> Send for Sender<T> {}
168unsafe impl<T: Send> Sync for Sender<T> {}
169
170/// An error returned from the [`Sender::send`] function on **channel**s.
171///
172/// A **send** operation can only fail if the receiving end of a channel is
173/// disconnected, implying that the data could never be received. The error
174/// contains the data being sent as a payload so it can be recovered.
175///
176/// [`Sender::send`]: Sender::send
177#[derive(PartialEq, Eq, Clone, Copy)]
178pub struct SendError<T>(pub T);
179
180/// An error returned from the [`recv`] function on a [`Receiver`].
181///
182/// The [`recv`] operation can only fail if the sending half of a
183/// [`channel`] is disconnected, implying that no further
184/// messages will ever be received.
185///
186/// [`recv`]: Receiver::recv
187#[derive(PartialEq, Eq, Clone, Copy, Debug)]
188pub struct RecvError;
189
190/// The list of the possible error outcomes for the [`try_send`] method.
191///
192/// [`try_send`]: Sender::try_send
193#[derive(PartialEq, Eq, Clone, Copy)]
194pub enum TrySendError<T> {
195 /// The data could not be sent on the [`channel`] because it would
196 /// require that the callee block to send the data.
197 ///
198 /// If this is a buffered channel, then the buffer is full at this time. If
199 /// this is not a buffered channel, then there is no [`Receiver`] available
200 /// to acquire the data.
201 Full(T),
202
203 /// This [`channel`]'s receiving half has disconnected, so the data
204 /// could not be sent. The data is returned back to the callee in this
205 /// case.
206 Disconnected(T),
207}
208
209/// The list of the possible reasons that [`try_recv`] could not return data
210/// when called.
211///
212/// This can occur with a [`channel`].
213///
214/// [`try_recv`]: Receiver::try_recv
215#[derive(PartialEq, Eq, Clone, Copy, Debug)]
216pub enum TryRecvError {
217 /// This **channel** is currently empty, but the **Sender**(s) have not yet
218 /// disconnected, so data may yet become available.
219 Empty,
220
221 /// The **channel**'s sending half has become disconnected, and there will
222 /// never be any more data received on it.
223 Disconnected,
224}
225
226/// Creates a new bounded channel.
227///
228/// All data sent on the [`Sender`] will become available on the [`Receiver`]
229/// in the same order as it was sent. Like asynchronous [`channel`]s, the
230/// [`Receiver`] will block until a message becomes available.
231///
232/// This channel has an internal buffer on which messages will be queued.
233/// `bound` specifies the buffer size. When the internal buffer becomes full,
234/// future sends will *block* waiting for the buffer to open up. Note that a
235/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
236/// where each [`send`] will not return until a [`recv`] is paired with it.
237///
238/// Both [`Sender`] and [`Receiver`] can be cloned to [`send`] or [`recv`] to
239/// the same channel multiple times.
240///
241/// If the [`Receiver`] is disconnected while trying to [`send`] with the
242/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, If
243/// the [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method
244/// will return a [`RecvError`].
245///
246/// [`send`]: Sender::send
247/// [`recv`]: Receiver::recv
248pub fn channel<T: core::marker::Send + 'static>(bound: usize) -> (Sender<T>, Receiver<T>) {
249 let chan = Box::into_raw(Box::new(ChannelInner {
250 q: ArrayQueue::new(bound),
251 tx_cnt: AtomicUsize::new(1),
252 rx_cnt: AtomicUsize::new(1),
253 tx_waiter: SpinLock::new(Vec::new()),
254 rx_waiter: SpinLock::new(Vec::new()),
255 }));
256 (Sender { inner: chan }, Receiver { inner: chan })
257}
258
259////////////////////////////////////////////////////////////////////////////////
260// Sender
261////////////////////////////////////////////////////////////////////////////////
262impl<T: core::marker::Send + 'static> Sender<T> {
263 #[inline]
264 fn inner<'a>(&self) -> &'a ChannelInner<T> {
265 unsafe { &*self.inner }
266 }
267
268 /// Can send a value through this channel.
269 pub fn can_send(&self) -> bool {
270 let inner = self.inner();
271 inner.q.is_empty() && inner.has_receiver()
272 }
273
274 /// Does anyone can receive a value through this channel.
275 pub fn has_receiver(&self) -> bool {
276 let inner = self.inner();
277 inner.has_receiver()
278 }
279
280 /// Sends a value on this channel.
281 ///
282 /// This function will *block* until space in the internal buffer becomes
283 /// available or a receiver is available to hand off the message to.
284 ///
285 /// Note that a successful send does *not* guarantee that the receiver will
286 /// ever see the data if there is a buffer on this channel. Items may be
287 /// enqueued in the internal buffer for the receiver to receive at a later
288 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
289 /// channel and it guarantees that the receiver has indeed received
290 /// the data if this function returns success.
291 ///
292 /// This function will never panic, but it may return `Err` if the
293 /// [`Receiver`] has disconnected and is no longer able to receive
294 /// information.
295 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
296 let inner = self.inner();
297 let mut t_ = t;
298 loop {
299 if !inner.has_receiver() {
300 break Err(SendError(t_));
301 } else {
302 match inner.push(t_, |th| {
303 th.unpark();
304 Ok(())
305 }) {
306 Err(e) => {
307 t_ = e;
308 if inner.q.is_full() {
309 let mut guard = inner.tx_waiter.lock();
310 if inner.q.is_full() {
311 Current::park_with(move |th| {
312 guard.push(th);
313 drop(guard)
314 });
315 }
316 }
317 }
318 _ => {
319 break Ok(());
320 }
321 }
322 }
323 }
324 }
325
326 /// Attempts to send a value on this channel without blocking.
327 ///
328 /// This method differs from [`send`] by returning immediately if the
329 /// channel's buffer is full or no receiver is waiting to acquire some
330 /// data. Compared with [`send`], this function has two failure cases
331 /// instead of one (one for disconnection, one for a full buffer).
332 ///
333 /// See [`send`] for notes about guarantees of whether the
334 /// receiver has received the data or not if this function is successful.
335 ///
336 /// [`send`]: Self::send
337 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
338 let inner = self.inner();
339 if !inner.has_receiver() {
340 Err(TrySendError::Disconnected(t))
341 } else {
342 match inner.push(t, |th| {
343 th.unpark();
344 Ok(())
345 }) {
346 Err(t) => Err(TrySendError::Full(t)),
347 _ => Ok(()),
348 }
349 }
350 }
351
352 /// Get capacity of the channel.
353 pub fn capacity(&self) -> usize {
354 self.inner().capacity()
355 }
356}
357
358impl<T: core::marker::Send + 'static> Clone for Sender<T> {
359 fn clone(&self) -> Sender<T> {
360 if self.inner().tx_cnt.fetch_add(1, Ordering::Relaxed) > isize::MAX as usize {
361 panic!("sender count overflowed.");
362 }
363 Sender { inner: self.inner }
364 }
365}
366
367impl<T: core::marker::Send + 'static> Drop for Sender<T> {
368 fn drop(&mut self) {
369 let inner = self.inner();
370 if inner.tx_cnt.fetch_sub(1, Ordering::AcqRel) == 1
371 && inner.rx_cnt.load(Ordering::Relaxed) == 0
372 {
373 unsafe { drop(Box::from_raw(self.inner)) }
374 }
375 }
376}
377
378impl<T: core::marker::Send + 'static> fmt::Debug for Sender<T> {
379 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380 f.debug_struct("Sender").finish()
381 }
382}
383
384////////////////////////////////////////////////////////////////////////////////
385// Receiver
386////////////////////////////////////////////////////////////////////////////////
387
388impl<T: core::marker::Send + 'static> Receiver<T> {
389 #[inline]
390 fn inner<'a>(&self) -> &'a ChannelInner<T> {
391 unsafe { &*self.inner }
392 }
393
394 /// Can receive a value through this channel.
395 pub fn can_recv(&self) -> bool {
396 let inner = self.inner();
397 !inner.q.is_empty() && inner.has_sender()
398 }
399
400 /// Does anyone can send a value through this channel.
401 pub fn has_sender(&self) -> bool {
402 let inner = self.inner();
403 inner.has_sender()
404 }
405
406 /// Attempts to wait for a value on this receiver, returning an error if the
407 /// corresponding channel has hung up.
408 ///
409 /// This function will always block the current thread if there is no data
410 /// available and it's possible for more data to be sent. Once a message is
411 /// sent to the corresponding [`Sender`] (or [`Sender`]), then this
412 /// receiver will wake up and return that message.
413 ///
414 /// If the corresponding [`Sender`] has disconnected, or it disconnects
415 /// while this call is blocking, this call will wake up and return
416 /// `Err` to indicate that no more messages can ever be received on
417 /// this channel. However, since channels are buffered, messages sent
418 /// before the disconnect will still be properly received.
419 pub fn recv(&self) -> Result<T, RecvError> {
420 let inner = self.inner();
421 loop {
422 match inner.pop(|th| {
423 th.unpark();
424 Ok(())
425 }) {
426 Some(n) => break Ok(n),
427 None if !inner.has_sender() => {
428 break inner
429 .pop(|th| {
430 th.unpark();
431 Ok(())
432 })
433 .ok_or(RecvError);
434 }
435 None => {
436 let mut guard = inner.rx_waiter.lock();
437 match inner.pop(|th| {
438 th.unpark();
439 Ok(())
440 }) {
441 Some(n) => {
442 guard.unlock();
443 break Ok(n);
444 }
445 _ => {
446 Current::park_with(|handle| {
447 guard.push(handle);
448 guard.unlock();
449 });
450 }
451 }
452 }
453 }
454 }
455 }
456
457 /// Attempts to return a pending value on this receiver without blocking.
458 ///
459 /// This method will never block the caller in order to wait for data to
460 /// become available. Instead, this will always return immediately with a
461 /// possible option of pending data on the channel.
462 ///
463 /// This is useful for a flavor of "optimistic check" before deciding to
464 /// block on a receiver.
465 ///
466 /// Compared with [`recv`], this function has two failure cases instead of
467 /// one (one for disconnection, one for an empty buffer).
468 ///
469 /// [`recv`]: Self::recv
470 pub fn try_recv(&self) -> Result<T, TryRecvError> {
471 let inner = self.inner();
472 match inner.pop(|th| {
473 th.unpark();
474 Ok(())
475 }) {
476 Some(n) => Ok(n),
477 None if !inner.has_sender() => inner
478 .pop(|th| {
479 th.unpark();
480 Ok(())
481 })
482 .ok_or(TryRecvError::Disconnected),
483 None => Err(TryRecvError::Empty),
484 }
485 }
486
487 /// Returns an iterator that will block waiting for messages, but never
488 /// panicked. It will return `None` when the channel has hung up.
489 pub fn iter(&self) -> Iter<'_, T> {
490 Iter { rx: self }
491 }
492
493 /// Returns an iterator that will attempt to yield all pending values.
494 /// It will return `None` if there are no more pending values or if the
495 /// channel has hung up. The iterator will never panicked or block the
496 /// user by waiting for values.
497 pub fn try_iter(&self) -> TryIter<'_, T> {
498 TryIter { rx: self }
499 }
500
501 /// Get capacity of the channel.
502 pub fn capacity(&self) -> usize {
503 self.inner().capacity()
504 }
505}
506
507impl<T: core::marker::Send + 'static> Iterator for Iter<'_, T> {
508 type Item = T;
509
510 fn next(&mut self) -> Option<T> {
511 self.rx.recv().ok()
512 }
513}
514
515impl<T: core::marker::Send + 'static> Iterator for TryIter<'_, T> {
516 type Item = T;
517
518 fn next(&mut self) -> Option<T> {
519 self.rx.try_recv().ok()
520 }
521}
522
523impl<'a, T: core::marker::Send + 'static> IntoIterator for &'a Receiver<T> {
524 type Item = T;
525 type IntoIter = Iter<'a, T>;
526
527 fn into_iter(self) -> Iter<'a, T> {
528 self.iter()
529 }
530}
531
532impl<T: core::marker::Send + 'static> Iterator for IntoIter<T> {
533 type Item = T;
534 fn next(&mut self) -> Option<T> {
535 self.rx.recv().ok()
536 }
537}
538
539impl<T: core::marker::Send + 'static> IntoIterator for Receiver<T> {
540 type Item = T;
541 type IntoIter = IntoIter<T>;
542
543 fn into_iter(self) -> IntoIter<T> {
544 IntoIter { rx: self }
545 }
546}
547
548impl<T: core::marker::Send + 'static> Clone for Receiver<T> {
549 fn clone(&self) -> Receiver<T> {
550 if self.inner().rx_cnt.fetch_add(1, Ordering::Relaxed) > isize::MAX as usize {
551 panic!("receiver count overflowed.");
552 }
553 Receiver { inner: self.inner }
554 }
555}
556
557impl<T: core::marker::Send + 'static> Drop for Receiver<T> {
558 fn drop(&mut self) {
559 let inner = self.inner();
560 if inner.rx_cnt.fetch_sub(1, Ordering::AcqRel) == 1
561 && inner.tx_cnt.load(Ordering::Relaxed) == 0
562 {
563 unsafe { drop(Box::from_raw(self.inner)) }
564 }
565 }
566}
567
568impl<T: core::marker::Send + 'static> fmt::Debug for Receiver<T> {
569 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570 f.debug_struct("Receiver").finish()
571 }
572}