futures_util/future/future/
flatten.rs

1use core::pin::Pin;
2use futures_core::future::{FusedFuture, Future};
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use futures_core::task::{Context, Poll};
6#[cfg(feature = "sink")]
7use futures_sink::Sink;
8use pin_project_lite::pin_project;
9
10pin_project! {
11    #[project = FlattenProj]
12    #[derive(Debug)]
13    pub enum Flatten<Fut1, Fut2> {
14        First { #[pin] f: Fut1 },
15        Second { #[pin] f: Fut2 },
16        Empty,
17    }
18}
19
20impl<Fut1, Fut2> Flatten<Fut1, Fut2> {
21    pub(crate) fn new(future: Fut1) -> Self {
22        Self::First { f: future }
23    }
24}
25
26impl<Fut> FusedFuture for Flatten<Fut, Fut::Output>
27where
28    Fut: Future,
29    Fut::Output: Future,
30{
31    fn is_terminated(&self) -> bool {
32        match self {
33            Self::Empty => true,
34            _ => false,
35        }
36    }
37}
38
39impl<Fut> Future for Flatten<Fut, Fut::Output>
40where
41    Fut: Future,
42    Fut::Output: Future,
43{
44    type Output = <Fut::Output as Future>::Output;
45
46    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47        Poll::Ready(loop {
48            match self.as_mut().project() {
49                FlattenProj::First { f } => {
50                    let f = ready!(f.poll(cx));
51                    self.set(Self::Second { f });
52                }
53                FlattenProj::Second { f } => {
54                    let output = ready!(f.poll(cx));
55                    self.set(Self::Empty);
56                    break output;
57                }
58                FlattenProj::Empty => panic!("Flatten polled after completion"),
59            }
60        })
61    }
62}
63
64impl<Fut> FusedStream for Flatten<Fut, Fut::Output>
65where
66    Fut: Future,
67    Fut::Output: Stream,
68{
69    fn is_terminated(&self) -> bool {
70        match self {
71            Self::Empty => true,
72            _ => false,
73        }
74    }
75}
76
77impl<Fut> Stream for Flatten<Fut, Fut::Output>
78where
79    Fut: Future,
80    Fut::Output: Stream,
81{
82    type Item = <Fut::Output as Stream>::Item;
83
84    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85        Poll::Ready(loop {
86            match self.as_mut().project() {
87                FlattenProj::First { f } => {
88                    let f = ready!(f.poll(cx));
89                    self.set(Self::Second { f });
90                }
91                FlattenProj::Second { f } => {
92                    let output = ready!(f.poll_next(cx));
93                    if output.is_none() {
94                        self.set(Self::Empty);
95                    }
96                    break output;
97                }
98                FlattenProj::Empty => break None,
99            }
100        })
101    }
102}
103
104#[cfg(feature = "sink")]
105impl<Fut, Item> Sink<Item> for Flatten<Fut, Fut::Output>
106where
107    Fut: Future,
108    Fut::Output: Sink<Item>,
109{
110    type Error = <Fut::Output as Sink<Item>>::Error;
111
112    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113        Poll::Ready(loop {
114            match self.as_mut().project() {
115                FlattenProj::First { f } => {
116                    let f = ready!(f.poll(cx));
117                    self.set(Self::Second { f });
118                }
119                FlattenProj::Second { f } => {
120                    break ready!(f.poll_ready(cx));
121                }
122                FlattenProj::Empty => panic!("poll_ready called after eof"),
123            }
124        })
125    }
126
127    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
128        match self.project() {
129            FlattenProj::First { .. } => panic!("poll_ready not called first"),
130            FlattenProj::Second { f } => f.start_send(item),
131            FlattenProj::Empty => panic!("start_send called after eof"),
132        }
133    }
134
135    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
136        match self.project() {
137            FlattenProj::First { .. } => Poll::Ready(Ok(())),
138            FlattenProj::Second { f } => f.poll_flush(cx),
139            FlattenProj::Empty => panic!("poll_flush called after eof"),
140        }
141    }
142
143    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
144        let res = match self.as_mut().project() {
145            FlattenProj::Second { f } => f.poll_close(cx),
146            _ => Poll::Ready(Ok(())),
147        };
148        if res.is_ready() {
149            self.set(Self::Empty);
150        }
151        res
152    }
153}