sel4_async_single_threaded_executor/
lib.rs

1//
2// Copyright 2023, Colias Group, LLC
3// Copyright (c) 2017 The Tokio Authors
4// Copyright (c) 2016 Alex Crichton
5//
6// SPDX-License-Identifier: MIT OR Apache-2.0
7//
8
9#![no_std]
10#![feature(thread_local)]
11
12extern crate alloc;
13
14use alloc::rc::{Rc, Weak};
15use alloc::sync::Arc;
16use alloc::vec::Vec;
17use core::cell::{OnceCell, RefCell};
18use core::pin::Pin;
19use core::sync::atomic::{AtomicBool, Ordering};
20
21use futures::future::Future;
22use futures::stream::FuturesUnordered;
23use futures::stream::StreamExt;
24use futures::task::{waker_ref, ArcWake};
25use futures::task::{Context, Poll};
26use futures::task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
27
28mod enter;
29
30#[derive(Debug)]
31pub struct LocalPool {
32    pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
33    incoming: Rc<Incoming>,
34}
35
36#[derive(Clone, Debug)]
37pub struct LocalSpawner {
38    incoming: Weak<Incoming>,
39}
40
41type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
42
43struct ThreadNotify {
44    woken: AtomicBool,
45}
46
47impl ThreadNotify {
48    fn new() -> Self {
49        Self {
50            woken: AtomicBool::new(false),
51        }
52    }
53
54    fn wake(&self) {
55        self.woken.store(true, Ordering::Release);
56    }
57
58    #[allow(dead_code)]
59    fn woken(&self) -> bool {
60        self.woken.load(Ordering::Acquire)
61    }
62
63    fn take_wakeup(&self) -> bool {
64        self.woken.swap(false, Ordering::Acquire)
65    }
66}
67
68impl ArcWake for ThreadNotify {
69    fn wake_by_ref(arc_self: &Arc<Self>) {
70        ThreadNotify::wake(arc_self);
71    }
72}
73
74fn run_executor_until_stalled<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> Poll<T> {
75    #[thread_local]
76    static CURRENT_THREAD_NOTIFY: OnceCell<Arc<ThreadNotify>> = OnceCell::new();
77
78    let _enter =
79        enter::enter().expect("cannot execute `LocalPool` executor from within another executor");
80
81    let current_thread_notify = CURRENT_THREAD_NOTIFY.get_or_init(|| Arc::new(ThreadNotify::new()));
82
83    let waker = waker_ref(current_thread_notify);
84    let mut cx = Context::from_waker(&waker);
85    loop {
86        if let Poll::Ready(t) = f(&mut cx) {
87            return Poll::Ready(t);
88        }
89
90        if !current_thread_notify.take_wakeup() {
91            return Poll::Pending;
92        }
93    }
94}
95
96impl LocalPool {
97    /// Create a new, empty pool of tasks.
98    pub fn new() -> Self {
99        Self {
100            pool: FuturesUnordered::new(),
101            incoming: Default::default(),
102        }
103    }
104
105    /// Get a clonable handle to the pool as a [`Spawn`].
106    pub fn spawner(&self) -> LocalSpawner {
107        LocalSpawner {
108            incoming: Rc::downgrade(&self.incoming),
109        }
110    }
111
112    pub fn run_all_until_stalled(&mut self) -> Poll<()> {
113        run_executor_until_stalled(|cx| self.poll_pool(cx))
114    }
115
116    pub fn run_until_stalled<F: Future>(&mut self, mut future: Pin<&mut F>) -> Poll<F::Output> {
117        run_executor_until_stalled(|cx| {
118            {
119                // if our main task is done, so are we
120                let result = future.as_mut().poll(cx);
121                if let Poll::Ready(output) = result {
122                    return Poll::Ready(output);
123                }
124            }
125
126            let _ = self.poll_pool(cx);
127            Poll::Pending
128        })
129    }
130
131    /// Poll `self.pool`, re-filling it with any newly-spawned tasks.
132    /// Repeat until either the pool is empty, or it returns `Pending`.
133    ///
134    /// Returns `Ready` if the pool was empty, and `Pending` otherwise.
135    ///
136    /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
137    /// mean that the pool can't make progress.
138    fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
139        loop {
140            self.drain_incoming();
141
142            let pool_ret = self.pool.poll_next_unpin(cx);
143
144            // We queued up some new tasks; add them and poll again.
145            if !self.incoming.borrow().is_empty() {
146                continue;
147            }
148
149            match pool_ret {
150                Poll::Ready(Some(())) => continue,
151                Poll::Ready(None) => return Poll::Ready(()),
152                Poll::Pending => return Poll::Pending,
153            }
154        }
155    }
156
157    /// Empty the incoming queue of newly-spawned tasks.
158    fn drain_incoming(&mut self) {
159        let mut incoming = self.incoming.borrow_mut();
160        for task in incoming.drain(..) {
161            self.pool.push(task)
162        }
163    }
164}
165
166impl Default for LocalPool {
167    fn default() -> Self {
168        Self::new()
169    }
170}
171
172/// Run a future to completion on the current thread.
173///
174/// This function will block the caller until the given future has completed.
175///
176/// Use a [`LocalPool`] if you need finer-grained control over spawned tasks.
177pub fn run_until_stalled<F: Future>(mut future: Pin<&mut F>) -> Poll<F::Output> {
178    run_executor_until_stalled(|cx| future.as_mut().poll(cx))
179}
180
181impl Spawn for LocalSpawner {
182    fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
183        if let Some(incoming) = self.incoming.upgrade() {
184            incoming.borrow_mut().push(future.into());
185            Ok(())
186        } else {
187            Err(SpawnError::shutdown())
188        }
189    }
190
191    fn status(&self) -> Result<(), SpawnError> {
192        if self.incoming.upgrade().is_some() {
193            Ok(())
194        } else {
195            Err(SpawnError::shutdown())
196        }
197    }
198}
199
200impl LocalSpawn for LocalSpawner {
201    fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
202        if let Some(incoming) = self.incoming.upgrade() {
203            incoming.borrow_mut().push(future);
204            Ok(())
205        } else {
206            Err(SpawnError::shutdown())
207        }
208    }
209
210    fn status_local(&self) -> Result<(), SpawnError> {
211        if self.incoming.upgrade().is_some() {
212            Ok(())
213        } else {
214            Err(SpawnError::shutdown())
215        }
216    }
217}