futures_util/stream/stream/
skip_while.rs

1use core::fmt;
2use core::pin::Pin;
3use futures_core::future::Future;
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10
11pin_project! {
12    /// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
13    #[must_use = "streams do nothing unless polled"]
14    pub struct SkipWhile<St, Fut, F> where St: Stream {
15        #[pin]
16        stream: St,
17        f: F,
18        #[pin]
19        pending_fut: Option<Fut>,
20        pending_item: Option<St::Item>,
21        done_skipping: bool,
22    }
23}
24
25impl<St, Fut, F> fmt::Debug for SkipWhile<St, Fut, F>
26where
27    St: Stream + fmt::Debug,
28    St::Item: fmt::Debug,
29    Fut: fmt::Debug,
30{
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        f.debug_struct("SkipWhile")
33            .field("stream", &self.stream)
34            .field("pending_fut", &self.pending_fut)
35            .field("pending_item", &self.pending_item)
36            .field("done_skipping", &self.done_skipping)
37            .finish()
38    }
39}
40
41impl<St, Fut, F> SkipWhile<St, Fut, F>
42where
43    St: Stream,
44    F: FnMut(&St::Item) -> Fut,
45    Fut: Future<Output = bool>,
46{
47    pub(super) fn new(stream: St, f: F) -> Self {
48        Self { stream, f, pending_fut: None, pending_item: None, done_skipping: false }
49    }
50
51    delegate_access_inner!(stream, St, ());
52}
53
54impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F>
55where
56    St: FusedStream,
57    F: FnMut(&St::Item) -> Fut,
58    Fut: Future<Output = bool>,
59{
60    fn is_terminated(&self) -> bool {
61        self.pending_item.is_none() && self.stream.is_terminated()
62    }
63}
64
65impl<St, Fut, F> Stream for SkipWhile<St, Fut, F>
66where
67    St: Stream,
68    F: FnMut(&St::Item) -> Fut,
69    Fut: Future<Output = bool>,
70{
71    type Item = St::Item;
72
73    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
74        let mut this = self.project();
75
76        if *this.done_skipping {
77            return this.stream.poll_next(cx);
78        }
79
80        Poll::Ready(loop {
81            if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
82                let skipped = ready!(fut.poll(cx));
83                let item = this.pending_item.take();
84                this.pending_fut.set(None);
85                if !skipped {
86                    *this.done_skipping = true;
87                    break item;
88                }
89            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
90                this.pending_fut.set(Some((this.f)(&item)));
91                *this.pending_item = Some(item);
92            } else {
93                break None;
94            }
95        })
96    }
97
98    fn size_hint(&self) -> (usize, Option<usize>) {
99        if self.done_skipping {
100            self.stream.size_hint()
101        } else {
102            let pending_len = usize::from(self.pending_item.is_some());
103            let (_, upper) = self.stream.size_hint();
104            let upper = match upper {
105                Some(x) => x.checked_add(pending_len),
106                None => None,
107            };
108            (0, upper) // can't know a lower bound, due to the predicate
109        }
110    }
111}
112
113// Forwarding impl of Sink from the underlying stream
114#[cfg(feature = "sink")]
115impl<S, Fut, F, Item> Sink<Item> for SkipWhile<S, Fut, F>
116where
117    S: Stream + Sink<Item>,
118    F: FnMut(&S::Item) -> Fut,
119    Fut: Future<Output = bool>,
120{
121    type Error = S::Error;
122
123    delegate_sink!(stream, Item);
124}