futures_util/stream/stream/
buffered.rs

1use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt};
2use core::fmt;
3use core::pin::Pin;
4use futures_core::future::Future;
5use futures_core::ready;
6use futures_core::stream::Stream;
7use futures_core::task::{Context, Poll};
8#[cfg(feature = "sink")]
9use futures_sink::Sink;
10use pin_project_lite::pin_project;
11
12pin_project! {
13    /// Stream for the [`buffered`](super::StreamExt::buffered) method.
14    #[must_use = "streams do nothing unless polled"]
15    pub struct Buffered<St>
16    where
17        St: Stream,
18        St::Item: Future,
19    {
20        #[pin]
21        stream: Fuse<St>,
22        in_progress_queue: FuturesOrdered<St::Item>,
23        max: usize,
24    }
25}
26
27impl<St> fmt::Debug for Buffered<St>
28where
29    St: Stream + fmt::Debug,
30    St::Item: Future,
31{
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        f.debug_struct("Buffered")
34            .field("stream", &self.stream)
35            .field("in_progress_queue", &self.in_progress_queue)
36            .field("max", &self.max)
37            .finish()
38    }
39}
40
41impl<St> Buffered<St>
42where
43    St: Stream,
44    St::Item: Future,
45{
46    pub(super) fn new(stream: St, n: usize) -> Self {
47        Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesOrdered::new(), max: n }
48    }
49
50    delegate_access_inner!(stream, St, (.));
51}
52
53impl<St> Stream for Buffered<St>
54where
55    St: Stream,
56    St::Item: Future,
57{
58    type Item = <St::Item as Future>::Output;
59
60    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61        let mut this = self.project();
62
63        // First up, try to spawn off as many futures as possible by filling up
64        // our queue of futures.
65        while this.in_progress_queue.len() < *this.max {
66            match this.stream.as_mut().poll_next(cx) {
67                Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
68                Poll::Ready(None) | Poll::Pending => break,
69            }
70        }
71
72        // Attempt to pull the next value from the in_progress_queue
73        let res = this.in_progress_queue.poll_next_unpin(cx);
74        if let Some(val) = ready!(res) {
75            return Poll::Ready(Some(val));
76        }
77
78        // If more values are still coming from the stream, we're not done yet
79        if this.stream.is_done() {
80            Poll::Ready(None)
81        } else {
82            Poll::Pending
83        }
84    }
85
86    fn size_hint(&self) -> (usize, Option<usize>) {
87        let queue_len = self.in_progress_queue.len();
88        let (lower, upper) = self.stream.size_hint();
89        let lower = lower.saturating_add(queue_len);
90        let upper = match upper {
91            Some(x) => x.checked_add(queue_len),
92            None => None,
93        };
94        (lower, upper)
95    }
96}
97
98impl<St> FusedStream for Buffered<St>
99where
100    St: Stream,
101    St::Item: Future,
102{
103    fn is_terminated(&self) -> bool {
104        self.stream.is_done() && self.in_progress_queue.is_terminated()
105    }
106}
107
108// Forwarding impl of Sink from the underlying stream
109#[cfg(feature = "sink")]
110impl<S, Item> Sink<Item> for Buffered<S>
111where
112    S: Stream + Sink<Item>,
113    S::Item: Future,
114{
115    type Error = S::Error;
116
117    delegate_sink!(stream, Item);
118}