futures_util/stream/stream/
buffer_unordered.rs

1use crate::stream::{Fuse, FuturesUnordered, StreamExt};
2use core::fmt;
3use core::pin::Pin;
4use futures_core::future::Future;
5use futures_core::stream::{FusedStream, Stream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10
11pin_project! {
12    /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
13    /// method.
14    #[must_use = "streams do nothing unless polled"]
15    pub struct BufferUnordered<St>
16    where
17        St: Stream,
18    {
19        #[pin]
20        stream: Fuse<St>,
21        in_progress_queue: FuturesUnordered<St::Item>,
22        max: usize,
23    }
24}
25
26impl<St> fmt::Debug for BufferUnordered<St>
27where
28    St: Stream + fmt::Debug,
29{
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        f.debug_struct("BufferUnordered")
32            .field("stream", &self.stream)
33            .field("in_progress_queue", &self.in_progress_queue)
34            .field("max", &self.max)
35            .finish()
36    }
37}
38
39impl<St> BufferUnordered<St>
40where
41    St: Stream,
42    St::Item: Future,
43{
44    pub(super) fn new(stream: St, n: usize) -> Self {
45        Self {
46            stream: super::Fuse::new(stream),
47            in_progress_queue: FuturesUnordered::new(),
48            max: n,
49        }
50    }
51
52    delegate_access_inner!(stream, St, (.));
53}
54
55impl<St> Stream for BufferUnordered<St>
56where
57    St: Stream,
58    St::Item: Future,
59{
60    type Item = <St::Item as Future>::Output;
61
62    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63        let mut this = self.project();
64
65        // First up, try to spawn off as many futures as possible by filling up
66        // our queue of futures.
67        while this.in_progress_queue.len() < *this.max {
68            match this.stream.as_mut().poll_next(cx) {
69                Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
70                Poll::Ready(None) | Poll::Pending => break,
71            }
72        }
73
74        // Attempt to pull the next value from the in_progress_queue
75        match this.in_progress_queue.poll_next_unpin(cx) {
76            x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
77            Poll::Ready(None) => {}
78        }
79
80        // If more values are still coming from the stream, we're not done yet
81        if this.stream.is_done() {
82            Poll::Ready(None)
83        } else {
84            Poll::Pending
85        }
86    }
87
88    fn size_hint(&self) -> (usize, Option<usize>) {
89        let queue_len = self.in_progress_queue.len();
90        let (lower, upper) = self.stream.size_hint();
91        let lower = lower.saturating_add(queue_len);
92        let upper = match upper {
93            Some(x) => x.checked_add(queue_len),
94            None => None,
95        };
96        (lower, upper)
97    }
98}
99
100impl<St> FusedStream for BufferUnordered<St>
101where
102    St: Stream,
103    St::Item: Future,
104{
105    fn is_terminated(&self) -> bool {
106        self.in_progress_queue.is_terminated() && self.stream.is_terminated()
107    }
108}
109
110// Forwarding impl of Sink from the underlying stream
111#[cfg(feature = "sink")]
112impl<S, Item> Sink<Item> for BufferUnordered<S>
113where
114    S: Stream + Sink<Item>,
115    S::Item: Future,
116{
117    type Error = S::Error;
118
119    delegate_sink!(stream, Item);
120}