futures_util/stream/stream/
into_future.rs

1use crate::stream::StreamExt;
2use core::pin::Pin;
3use futures_core::future::{FusedFuture, Future};
4use futures_core::ready;
5use futures_core::stream::Stream;
6use futures_core::task::{Context, Poll};
7
8/// Future for the [`into_future`](super::StreamExt::into_future) method.
9#[derive(Debug)]
10#[must_use = "futures do nothing unless you `.await` or poll them"]
11pub struct StreamFuture<St> {
12    stream: Option<St>,
13}
14
15impl<St: Stream + Unpin> StreamFuture<St> {
16    pub(super) fn new(stream: St) -> Self {
17        Self { stream: Some(stream) }
18    }
19
20    /// Acquires a reference to the underlying stream that this combinator is
21    /// pulling from.
22    ///
23    /// This method returns an `Option` to account for the fact that `StreamFuture`'s
24    /// implementation of `Future::poll` consumes the underlying stream during polling
25    /// in order to return it to the caller of `Future::poll` if the stream yielded
26    /// an element.
27    pub fn get_ref(&self) -> Option<&St> {
28        self.stream.as_ref()
29    }
30
31    /// Acquires a mutable reference to the underlying stream that this
32    /// combinator is pulling from.
33    ///
34    /// Note that care must be taken to avoid tampering with the state of the
35    /// stream which may otherwise confuse this combinator.
36    ///
37    /// This method returns an `Option` to account for the fact that `StreamFuture`'s
38    /// implementation of `Future::poll` consumes the underlying stream during polling
39    /// in order to return it to the caller of `Future::poll` if the stream yielded
40    /// an element.
41    pub fn get_mut(&mut self) -> Option<&mut St> {
42        self.stream.as_mut()
43    }
44
45    /// Acquires a pinned mutable reference to the underlying stream that this
46    /// combinator is pulling from.
47    ///
48    /// Note that care must be taken to avoid tampering with the state of the
49    /// stream which may otherwise confuse this combinator.
50    ///
51    /// This method returns an `Option` to account for the fact that `StreamFuture`'s
52    /// implementation of `Future::poll` consumes the underlying stream during polling
53    /// in order to return it to the caller of `Future::poll` if the stream yielded
54    /// an element.
55    pub fn get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>> {
56        self.get_mut().stream.as_mut().map(Pin::new)
57    }
58
59    /// Consumes this combinator, returning the underlying stream.
60    ///
61    /// Note that this may discard intermediate state of this combinator, so
62    /// care should be taken to avoid losing resources when this is called.
63    ///
64    /// This method returns an `Option` to account for the fact that `StreamFuture`'s
65    /// implementation of `Future::poll` consumes the underlying stream during polling
66    /// in order to return it to the caller of `Future::poll` if the stream yielded
67    /// an element.
68    pub fn into_inner(self) -> Option<St> {
69        self.stream
70    }
71}
72
73impl<St: Stream + Unpin> FusedFuture for StreamFuture<St> {
74    fn is_terminated(&self) -> bool {
75        self.stream.is_none()
76    }
77}
78
79impl<St: Stream + Unpin> Future for StreamFuture<St> {
80    type Output = (Option<St::Item>, St);
81
82    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83        let item = {
84            let s = self.stream.as_mut().expect("polling StreamFuture twice");
85            ready!(s.poll_next_unpin(cx))
86        };
87        let stream = self.stream.take().unwrap();
88        Poll::Ready((item, stream))
89    }
90}