sel4_async_single_threaded_executor/
lib.rs
1#![no_std]
10#![feature(thread_local)]
11
12extern crate alloc;
13
14use alloc::rc::{Rc, Weak};
15use alloc::sync::Arc;
16use alloc::vec::Vec;
17use core::cell::{OnceCell, RefCell};
18use core::pin::Pin;
19use core::sync::atomic::{AtomicBool, Ordering};
20
21use futures::future::Future;
22use futures::stream::FuturesUnordered;
23use futures::stream::StreamExt;
24use futures::task::{waker_ref, ArcWake};
25use futures::task::{Context, Poll};
26use futures::task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
27
28mod enter;
29
30#[derive(Debug)]
31pub struct LocalPool {
32 pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
33 incoming: Rc<Incoming>,
34}
35
36#[derive(Clone, Debug)]
37pub struct LocalSpawner {
38 incoming: Weak<Incoming>,
39}
40
41type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
42
43struct ThreadNotify {
44 woken: AtomicBool,
45}
46
47impl ThreadNotify {
48 fn new() -> Self {
49 Self {
50 woken: AtomicBool::new(false),
51 }
52 }
53
54 fn wake(&self) {
55 self.woken.store(true, Ordering::Release);
56 }
57
58 #[allow(dead_code)]
59 fn woken(&self) -> bool {
60 self.woken.load(Ordering::Acquire)
61 }
62
63 fn take_wakeup(&self) -> bool {
64 self.woken.swap(false, Ordering::Acquire)
65 }
66}
67
68impl ArcWake for ThreadNotify {
69 fn wake_by_ref(arc_self: &Arc<Self>) {
70 ThreadNotify::wake(arc_self);
71 }
72}
73
74fn run_executor_until_stalled<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> Poll<T> {
75 #[thread_local]
76 static CURRENT_THREAD_NOTIFY: OnceCell<Arc<ThreadNotify>> = OnceCell::new();
77
78 let _enter =
79 enter::enter().expect("cannot execute `LocalPool` executor from within another executor");
80
81 let current_thread_notify = CURRENT_THREAD_NOTIFY.get_or_init(|| Arc::new(ThreadNotify::new()));
82
83 let waker = waker_ref(current_thread_notify);
84 let mut cx = Context::from_waker(&waker);
85 loop {
86 if let Poll::Ready(t) = f(&mut cx) {
87 return Poll::Ready(t);
88 }
89
90 if !current_thread_notify.take_wakeup() {
91 return Poll::Pending;
92 }
93 }
94}
95
96impl LocalPool {
97 pub fn new() -> Self {
99 Self {
100 pool: FuturesUnordered::new(),
101 incoming: Default::default(),
102 }
103 }
104
105 pub fn spawner(&self) -> LocalSpawner {
107 LocalSpawner {
108 incoming: Rc::downgrade(&self.incoming),
109 }
110 }
111
112 pub fn run_all_until_stalled(&mut self) -> Poll<()> {
113 run_executor_until_stalled(|cx| self.poll_pool(cx))
114 }
115
116 pub fn run_until_stalled<F: Future>(&mut self, mut future: Pin<&mut F>) -> Poll<F::Output> {
117 run_executor_until_stalled(|cx| {
118 {
119 let result = future.as_mut().poll(cx);
121 if let Poll::Ready(output) = result {
122 return Poll::Ready(output);
123 }
124 }
125
126 let _ = self.poll_pool(cx);
127 Poll::Pending
128 })
129 }
130
131 fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
139 loop {
140 self.drain_incoming();
141
142 let pool_ret = self.pool.poll_next_unpin(cx);
143
144 if !self.incoming.borrow().is_empty() {
146 continue;
147 }
148
149 match pool_ret {
150 Poll::Ready(Some(())) => continue,
151 Poll::Ready(None) => return Poll::Ready(()),
152 Poll::Pending => return Poll::Pending,
153 }
154 }
155 }
156
157 fn drain_incoming(&mut self) {
159 let mut incoming = self.incoming.borrow_mut();
160 for task in incoming.drain(..) {
161 self.pool.push(task)
162 }
163 }
164}
165
166impl Default for LocalPool {
167 fn default() -> Self {
168 Self::new()
169 }
170}
171
172pub fn run_until_stalled<F: Future>(mut future: Pin<&mut F>) -> Poll<F::Output> {
178 run_executor_until_stalled(|cx| future.as_mut().poll(cx))
179}
180
181impl Spawn for LocalSpawner {
182 fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
183 if let Some(incoming) = self.incoming.upgrade() {
184 incoming.borrow_mut().push(future.into());
185 Ok(())
186 } else {
187 Err(SpawnError::shutdown())
188 }
189 }
190
191 fn status(&self) -> Result<(), SpawnError> {
192 if self.incoming.upgrade().is_some() {
193 Ok(())
194 } else {
195 Err(SpawnError::shutdown())
196 }
197 }
198}
199
200impl LocalSpawn for LocalSpawner {
201 fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
202 if let Some(incoming) = self.incoming.upgrade() {
203 incoming.borrow_mut().push(future);
204 Ok(())
205 } else {
206 Err(SpawnError::shutdown())
207 }
208 }
209
210 fn status_local(&self) -> Result<(), SpawnError> {
211 if self.incoming.upgrade().is_some() {
212 Ok(())
213 } else {
214 Err(SpawnError::shutdown())
215 }
216 }
217}