1use core::{
4 cell::UnsafeCell,
5 fmt,
6 future::Future,
7 pin::Pin,
8 task::{Context, Poll, Waker},
9};
10
11#[cfg(feature = "alloc")]
12use crate::alloc::rc::Rc;
13
14use crate::error::{SendError, TryRecvError};
15
16pub const fn channel<T>() -> OneshotChannel<T> {
18 OneshotChannel(UnsafeCell::new(Slot {
19 value: None,
20 recv_waker: None,
21 close_waker: None,
22 closed: false,
23 }))
24}
25
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
28pub struct RecvError;
29
30pub struct OneshotChannel<T>(UnsafeCell<Slot<T>>);
35
36impl<T> OneshotChannel<T> {
37 pub fn split(&mut self) -> (SenderRef<'_, T>, ReceiverRef<'_, T>) {
40 let slot = &self.0;
41 (SenderRef { slot }, ReceiverRef { slot })
42 }
43
44 #[cfg(feature = "alloc")]
45 pub fn into_split(self) -> (Sender<T>, Receiver<T>) {
52 let slot = Rc::new(self.0);
53 (Sender { slot: Rc::clone(&slot) }, Receiver { slot })
54 }
55}
56
57#[cfg(feature = "alloc")]
58pub struct Sender<T> {
60 slot: Rc<UnsafeCell<Slot<T>>>,
61}
62
63#[cfg(feature = "alloc")]
64impl<T> Sender<T> {
65 pub fn is_closed(&self) -> bool {
67 unsafe { (*self.slot.get()).closed }
69 }
70
71 pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
73 unsafe { (*self.slot.get()).poll_closed(cx) }
75 }
76
77 pub async fn closed(&mut self) {
79 core::future::poll_fn(|cx| self.poll_closed(cx)).await
80 }
81
82 pub fn send(self, value: T) -> Result<(), SendError<T>> {
88 unsafe { (*self.slot.get()).send(value) }
90 }
91}
92
93#[cfg(feature = "alloc")]
94impl<T> Drop for Sender<T> {
95 fn drop(&mut self) {
96 unsafe { (*self.slot.get()).closed = true }
98 }
99}
100
101#[cfg(feature = "alloc")]
102impl<T> fmt::Debug for Sender<T>
103where
104 T: fmt::Debug,
105{
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 let value = unsafe { &(*self.slot.get()).value };
109 f.debug_struct("Sender")
110 .field("is_closed", &self.is_closed())
111 .field("value", value)
112 .finish_non_exhaustive()
113 }
114}
115
116pub struct SenderRef<'a, T> {
119 slot: &'a UnsafeCell<Slot<T>>,
120}
121
122impl<'a, T> SenderRef<'a, T> {
123 pub fn is_closed(&self) -> bool {
125 unsafe { (*self.slot.get()).closed }
127 }
128
129 pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
131 unsafe { (*self.slot.get()).poll_closed(cx) }
133 }
134
135 pub async fn closed(&mut self) {
137 core::future::poll_fn(|cx| self.poll_closed(cx)).await
138 }
139
140 pub fn send(self, value: T) -> Result<(), SendError<T>> {
146 unsafe { (*self.slot.get()).send(value) }
148 }
149}
150
151impl<T> Drop for SenderRef<'_, T> {
152 fn drop(&mut self) {
153 unsafe { (*self.slot.get()).closed = true }
155 }
156}
157
158impl<T> fmt::Debug for SenderRef<'_, T>
159where
160 T: fmt::Debug,
161{
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 let value = unsafe { &(*self.slot.get()).value };
165 f.debug_struct("SenderRef")
166 .field("is_closed", &self.is_closed())
167 .field("value", value)
168 .finish_non_exhaustive()
169 }
170}
171
172#[cfg(feature = "alloc")]
173pub struct Receiver<T> {
187 slot: Rc<UnsafeCell<Slot<T>>>,
188}
189
190#[cfg(feature = "alloc")]
191impl<T> Receiver<T> {
192 pub fn is_closed(&self) -> bool {
194 unsafe { (*self.slot.get()).closed }
196 }
197
198 pub fn close(&mut self) {
202 unsafe { (*self.slot.get()).close_and_wake() }
204 }
205
206 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
213 unsafe { (*self.slot.get()).try_recv() }
215 }
216}
217
218#[cfg(feature = "alloc")]
220impl<T> Future for Receiver<T> {
221 type Output = Result<T, RecvError>;
222
223 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
224 let slot = &self.get_mut().slot;
225 unsafe { (*slot.get()).poll_recv(cx) }
227 }
228}
229
230#[cfg(feature = "alloc")]
231impl<T> Drop for Receiver<T> {
232 fn drop(&mut self) {
233 self.close();
234 }
235}
236
237#[cfg(feature = "alloc")]
238impl<T> fmt::Debug for Receiver<T>
239where
240 T: fmt::Debug,
241{
242 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243 let value = unsafe { &(*self.slot.get()).value };
245 f.debug_struct("Receiver")
246 .field("is_closed", &self.is_closed())
247 .field("value", value)
248 .finish_non_exhaustive()
249 }
250}
251
252pub struct ReceiverRef<'a, T> {
270 slot: &'a UnsafeCell<Slot<T>>,
271}
272
273impl<T> ReceiverRef<'_, T> {
274 pub fn is_closed(&self) -> bool {
276 unsafe { (*self.slot.get()).closed }
278 }
279
280 pub fn close(&mut self) {
285 unsafe { (*self.slot.get()).close_and_wake() }
287 }
288
289 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
296 unsafe { (*self.slot.get()).try_recv() }
298 }
299}
300
301impl<T> Future for ReceiverRef<'_, T> {
302 type Output = Result<T, RecvError>;
303
304 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
305 let slot = self.get_mut().slot;
306 unsafe { &mut *slot.get() }.poll_recv(cx)
308 }
309}
310
311impl<T> Drop for ReceiverRef<'_, T> {
312 fn drop(&mut self) {
313 self.close();
314 }
315}
316
317impl<T> fmt::Debug for ReceiverRef<'_, T>
318where
319 T: fmt::Debug,
320{
321 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
322 let value = unsafe { &(*self.slot.get()).value };
324 f.debug_struct("ReceiverRef")
325 .field("is_closed", &self.is_closed())
326 .field("value", value)
327 .finish_non_exhaustive()
328 }
329}
330
331struct Slot<T> {
334 value: Option<T>,
337 recv_waker: Option<Waker>,
338 close_waker: Option<Waker>,
339 closed: bool,
340}
341
342impl<T> Slot<T> {
343 fn send(&mut self, value: T) -> Result<(), SendError<T>> {
344 if self.closed {
346 return Err(SendError(value));
347 }
348
349 self.value = Some(value);
351 if let Some(waker) = &self.recv_waker {
352 waker.wake_by_ref();
353 }
354
355 Ok(())
356 }
357
358 fn close_and_wake(&mut self) {
359 if self.closed {
360 return;
361 }
362
363 self.closed = true;
364 if let Some(waker) = &self.close_waker {
365 waker.wake_by_ref();
366 }
367 }
368
369 fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
370 if self.closed {
371 Poll::Ready(())
372 } else {
373 self.close_waker = Some(cx.waker().clone());
374 Poll::Pending
375 }
376 }
377
378 fn try_recv(&mut self) -> Result<T, TryRecvError> {
379 match self.value.take() {
380 Some(value) => Ok(value),
381 None => match self.closed {
382 true => Err(TryRecvError::Disconnected),
383 false => Err(TryRecvError::Empty),
384 },
385 }
386 }
387
388 fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
389 match self.try_recv() {
390 Ok(value) => Poll::Ready(Ok(value)),
391 Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)),
392 Err(TryRecvError::Empty) => {
393 self.recv_waker = Some(cx.waker().clone());
394 Poll::Pending
395 }
396 }
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use std::{future::Future as _, task::Poll};
403
404 use futures_lite::future;
405
406 #[test]
407 fn recv() {
408 future::block_on(async {
409 let mut chan = super::channel::<i32>();
410 let (tx, rx) = chan.split();
411
412 tx.send(-1).unwrap();
413 assert_eq!(rx.await, Ok(-1));
414 });
415 }
416
417 #[test]
418 fn split_twice() {
419 future::block_on(async {
420 let mut chan = super::channel::<()>();
421 let (tx, rx) = chan.split();
422
423 tx.send(()).unwrap();
424 assert!(rx.await.is_ok());
425
426 let (tx, rx) = chan.split();
427 assert!(tx.send(()).is_err());
428 assert!(rx.await.is_err());
429 });
430 }
431
432 #[test]
433 fn wake_on_close() {
434 future::block_on(async {
435 let mut chan = super::channel::<i32>();
436 let (tx, mut rx) = chan.split();
437 let mut rx = core::pin::pin!(rx);
438
439 core::future::poll_fn(|cx| {
441 assert!(rx.as_mut().poll(cx).is_pending());
442 Poll::Ready(())
443 })
444 .await;
445
446 drop(tx);
448
449 core::future::poll_fn(move |cx| {
451 assert!(rx.as_mut().poll(cx).is_ready());
452 Poll::Ready(())
453 })
454 .await;
455 });
456 }
457}