futures_util/stream/stream/
take_while.rs

1use core::fmt;
2use core::pin::Pin;
3use futures_core::future::Future;
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10
11pin_project! {
12    /// Stream for the [`take_while`](super::StreamExt::take_while) method.
13    #[must_use = "streams do nothing unless polled"]
14    pub struct TakeWhile<St: Stream, Fut, F> {
15        #[pin]
16        stream: St,
17        f: F,
18        #[pin]
19        pending_fut: Option<Fut>,
20        pending_item: Option<St::Item>,
21        done_taking: bool,
22    }
23}
24
25impl<St, Fut, F> fmt::Debug for TakeWhile<St, Fut, F>
26where
27    St: Stream + fmt::Debug,
28    St::Item: fmt::Debug,
29    Fut: fmt::Debug,
30{
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        f.debug_struct("TakeWhile")
33            .field("stream", &self.stream)
34            .field("pending_fut", &self.pending_fut)
35            .field("pending_item", &self.pending_item)
36            .field("done_taking", &self.done_taking)
37            .finish()
38    }
39}
40
41impl<St, Fut, F> TakeWhile<St, Fut, F>
42where
43    St: Stream,
44    F: FnMut(&St::Item) -> Fut,
45    Fut: Future<Output = bool>,
46{
47    pub(super) fn new(stream: St, f: F) -> Self {
48        Self { stream, f, pending_fut: None, pending_item: None, done_taking: false }
49    }
50
51    delegate_access_inner!(stream, St, ());
52}
53
54impl<St, Fut, F> Stream for TakeWhile<St, Fut, F>
55where
56    St: Stream,
57    F: FnMut(&St::Item) -> Fut,
58    Fut: Future<Output = bool>,
59{
60    type Item = St::Item;
61
62    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
63        if self.done_taking {
64            return Poll::Ready(None);
65        }
66
67        let mut this = self.project();
68
69        Poll::Ready(loop {
70            if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
71                let take = ready!(fut.poll(cx));
72                let item = this.pending_item.take();
73                this.pending_fut.set(None);
74                if take {
75                    break item;
76                } else {
77                    *this.done_taking = true;
78                    break None;
79                }
80            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
81                this.pending_fut.set(Some((this.f)(&item)));
82                *this.pending_item = Some(item);
83            } else {
84                break None;
85            }
86        })
87    }
88
89    fn size_hint(&self) -> (usize, Option<usize>) {
90        if self.done_taking {
91            return (0, Some(0));
92        }
93
94        let pending_len = usize::from(self.pending_item.is_some());
95        let (_, upper) = self.stream.size_hint();
96        let upper = match upper {
97            Some(x) => x.checked_add(pending_len),
98            None => None,
99        };
100        (0, upper) // can't know a lower bound, due to the predicate
101    }
102}
103
104impl<St, Fut, F> FusedStream for TakeWhile<St, Fut, F>
105where
106    St: FusedStream,
107    F: FnMut(&St::Item) -> Fut,
108    Fut: Future<Output = bool>,
109{
110    fn is_terminated(&self) -> bool {
111        self.done_taking || self.pending_item.is_none() && self.stream.is_terminated()
112    }
113}
114
115// Forwarding impl of Sink from the underlying stream
116#[cfg(feature = "sink")]
117impl<S, Fut, F, Item> Sink<Item> for TakeWhile<S, Fut, F>
118where
119    S: Stream + Sink<Item>,
120{
121    type Error = S::Error;
122
123    delegate_sink!(stream, Item);
124}