futures_util/stream/stream/
take.rs

1use core::cmp;
2use core::pin::Pin;
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use futures_core::task::{Context, Poll};
6#[cfg(feature = "sink")]
7use futures_sink::Sink;
8use pin_project_lite::pin_project;
9
10pin_project! {
11    /// Stream for the [`take`](super::StreamExt::take) method.
12    #[derive(Debug)]
13    #[must_use = "streams do nothing unless polled"]
14    pub struct Take<St> {
15        #[pin]
16        stream: St,
17        remaining: usize,
18    }
19}
20
21impl<St: Stream> Take<St> {
22    pub(super) fn new(stream: St, n: usize) -> Self {
23        Self { stream, remaining: n }
24    }
25
26    delegate_access_inner!(stream, St, ());
27}
28
29impl<St> Stream for Take<St>
30where
31    St: Stream,
32{
33    type Item = St::Item;
34
35    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
36        if self.remaining == 0 {
37            Poll::Ready(None)
38        } else {
39            let this = self.project();
40            let next = ready!(this.stream.poll_next(cx));
41            if next.is_some() {
42                *this.remaining -= 1;
43            } else {
44                *this.remaining = 0;
45            }
46            Poll::Ready(next)
47        }
48    }
49
50    fn size_hint(&self) -> (usize, Option<usize>) {
51        if self.remaining == 0 {
52            return (0, Some(0));
53        }
54
55        let (lower, upper) = self.stream.size_hint();
56
57        let lower = cmp::min(lower, self.remaining);
58
59        let upper = match upper {
60            Some(x) if x < self.remaining => Some(x),
61            _ => Some(self.remaining),
62        };
63
64        (lower, upper)
65    }
66}
67
68impl<St> FusedStream for Take<St>
69where
70    St: FusedStream,
71{
72    fn is_terminated(&self) -> bool {
73        self.remaining == 0 || self.stream.is_terminated()
74    }
75}
76
77// Forwarding impl of Sink from the underlying stream
78#[cfg(feature = "sink")]
79impl<S, Item> Sink<Item> for Take<S>
80where
81    S: Stream + Sink<Item>,
82{
83    type Error = S::Error;
84
85    delegate_sink!(stream, Item);
86}