sel4_shared_ring_buffer_bookkeeping/
slot_set_semaphore.rs

1//
2// Copyright 2023, Colias Group, LLC
3//
4// SPDX-License-Identifier: BSD-2-Clause
5//
6
7use alloc::rc::Rc;
8use core::array;
9use core::cell::Cell;
10use core::future::Future;
11use core::mem;
12
13use crate::slot_count_tracker::{SlotCountTracker, SlotCountTrackerError};
14
15pub trait SlotSemaphore {
16    fn new(count: usize) -> Self;
17
18    fn try_take(&self, n: usize) -> Result<bool, SlotSemaphoreClosedError>;
19
20    fn give(&self, n: usize);
21
22    fn close(&self);
23}
24
25pub trait AsyncSlotSemaphore: SlotSemaphore {
26    #[allow(clippy::needless_lifetimes)]
27    fn take<'a>(
28        &'a self,
29        n: usize,
30    ) -> impl Future<Output = Result<(), SlotSemaphoreClosedError>> + 'a;
31}
32
33pub struct SlotSetSemaphore<T, const N: usize> {
34    per_pool_slot_count_trackers: [SlotCountTracker; N],
35    handle: SlotSetSemaphoreHandle<T, N>,
36}
37
38#[derive(Clone)]
39pub struct SlotSetSemaphoreHandle<T, const N: usize> {
40    per_pool_semaphores: [T; N],
41}
42
43pub struct SlotSetReservation<'a, T: SlotSemaphore, const N: usize> {
44    handle: &'a SlotSetSemaphoreHandle<T, N>,
45    n: usize,
46}
47
48impl<T: SlotSemaphore, const N: usize> SlotSetSemaphore<T, N> {
49    pub fn new(slot_pool_capacities: [usize; N]) -> Self {
50        Self {
51            per_pool_slot_count_trackers: array::from_fn(|i| {
52                SlotCountTracker::new(slot_pool_capacities[i])
53            }),
54            handle: SlotSetSemaphoreHandle {
55                per_pool_semaphores: array::from_fn(|i| T::new(slot_pool_capacities[i])),
56            },
57        }
58    }
59
60    pub fn close(&self) {
61        for sem in &self.handle.per_pool_semaphores {
62            sem.close();
63        }
64    }
65
66    pub fn handle(&self) -> &SlotSetSemaphoreHandle<T, N> {
67        &self.handle
68    }
69
70    pub fn consume(
71        &mut self,
72        reservation: &mut SlotSetReservation<'_, T, N>,
73        n: usize,
74    ) -> Result<(), Error> {
75        reservation.reduce_count(n)?;
76        for tracker in &mut self.per_pool_slot_count_trackers {
77            tracker.report_occupied(n)?;
78        }
79        Ok(())
80    }
81
82    pub fn report_current_num_free_slots(
83        &mut self,
84        slot_pool_index: usize,
85        current_num_free_slots: usize,
86    ) -> Result<(), SlotCountTrackerError> {
87        self.handle.per_pool_semaphores[slot_pool_index].give(
88            self.per_pool_slot_count_trackers[slot_pool_index]
89                .redeem_newly_free(current_num_free_slots)?,
90        );
91        Ok(())
92    }
93}
94
95impl<T: SlotSemaphore, const N: usize> SlotSetSemaphoreHandle<T, N> {
96    pub fn try_reserve(
97        &self,
98        n: usize,
99    ) -> Result<Option<SlotSetReservation<'_, T, N>>, SlotSemaphoreClosedError> {
100        let mut tmp_permits: [Option<TemporaryPermit<'_, T>>; N] = array::from_fn(|_| None);
101        for (i, sem) in self.per_pool_semaphores.iter().enumerate() {
102            if sem.try_take(n)? {
103                tmp_permits[i] = Some(TemporaryPermit::new(sem, n));
104            } else {
105                return Ok(None);
106            }
107        }
108
109        mem::forget(tmp_permits);
110
111        Ok(Some(self.reservation(n)))
112    }
113
114    fn reservation(&self, n: usize) -> SlotSetReservation<'_, T, N> {
115        SlotSetReservation { handle: self, n }
116    }
117}
118
119impl<T: AsyncSlotSemaphore, const N: usize> SlotSetSemaphoreHandle<T, N> {
120    pub async fn reserve(
121        &self,
122        n: usize,
123    ) -> Result<SlotSetReservation<'_, T, N>, SlotSemaphoreClosedError> {
124        let mut tmp_permits: [Option<TemporaryPermit<'_, T>>; N] = array::from_fn(|_| None);
125        for (i, sem) in self.per_pool_semaphores.iter().enumerate() {
126            sem.take(n).await?;
127            tmp_permits[i] = Some(TemporaryPermit::new(sem, n));
128        }
129
130        mem::forget(tmp_permits);
131
132        Ok(self.reservation(n))
133    }
134}
135
136struct TemporaryPermit<'a, T: SlotSemaphore> {
137    sem: &'a T,
138    n: usize,
139}
140
141impl<'a, T: SlotSemaphore> TemporaryPermit<'a, T> {
142    fn new(sem: &'a T, n: usize) -> Self {
143        Self { sem, n }
144    }
145}
146
147impl<T: SlotSemaphore> Drop for TemporaryPermit<'_, T> {
148    fn drop(&mut self) {
149        self.sem.give(self.n);
150    }
151}
152
153impl<T: SlotSemaphore, const N: usize> SlotSetReservation<'_, T, N> {
154    pub fn count(&self) -> usize {
155        self.n
156    }
157
158    fn reduce_count(&mut self, n: usize) -> Result<(), SlotReservationExhaustedError> {
159        if n > self.n {
160            return Err(SlotReservationExhaustedError::new());
161        }
162        self.n -= n;
163        Ok(())
164    }
165
166    pub fn split(&mut self, split_off: usize) -> Result<Self, SlotReservationExhaustedError> {
167        self.reduce_count(split_off)?;
168        Ok(Self {
169            handle: self.handle,
170            n: split_off,
171        })
172    }
173
174    pub fn merge(&mut self, other: Self) {
175        self.n = self.n.checked_add(other.n).unwrap();
176    }
177}
178
179impl<T: SlotSemaphore, const N: usize> Drop for SlotSetReservation<'_, T, N> {
180    fn drop(&mut self) {
181        for sem in &self.handle.per_pool_semaphores {
182            sem.give(self.n);
183        }
184    }
185}
186
187#[derive(Debug)]
188pub enum Error {
189    SlotReservationExhausted,
190    SlotCountTrackingError,
191}
192
193impl From<SlotReservationExhaustedError> for Error {
194    fn from(_err: SlotReservationExhaustedError) -> Self {
195        Self::SlotReservationExhausted
196    }
197}
198
199impl From<SlotCountTrackerError> for Error {
200    fn from(_err: SlotCountTrackerError) -> Self {
201        Self::SlotCountTrackingError
202    }
203}
204
205#[derive(Debug)]
206pub struct SlotSemaphoreClosedError(());
207
208impl SlotSemaphoreClosedError {
209    #[allow(clippy::new_without_default)]
210    pub fn new() -> Self {
211        Self(())
212    }
213}
214
215#[derive(Debug)]
216pub struct SlotReservationExhaustedError(());
217
218impl SlotReservationExhaustedError {
219    #![allow(clippy::new_without_default)]
220    pub fn new() -> Self {
221        Self(())
222    }
223}
224
225pub struct DummySlotSemaphore {
226    permits: Cell<Option<usize>>,
227}
228
229impl SlotSemaphore for Rc<DummySlotSemaphore> {
230    fn new(count: usize) -> Self {
231        Rc::new(DummySlotSemaphore {
232            permits: Cell::new(Some(count)),
233        })
234    }
235
236    fn try_take(&self, n: usize) -> Result<bool, SlotSemaphoreClosedError> {
237        match self.permits.get() {
238            Some(permits) => Ok(match n.checked_sub(permits) {
239                Some(new_permits) => {
240                    self.permits.set(Some(new_permits));
241                    true
242                }
243                None => false,
244            }),
245            None => Err(SlotSemaphoreClosedError::new()),
246        }
247    }
248
249    fn give(&self, n: usize) {
250        if let Some(permits) = self.permits.get() {
251            self.permits.set(Some(permits.checked_add(n).unwrap()));
252        }
253    }
254
255    fn close(&self) {
256        self.permits.set(None);
257    }
258}
259
260#[cfg(feature = "async-unsync")]
261mod async_unsync_impl {
262    use async_unsync::semaphore::{Semaphore, TryAcquireError};
263
264    use super::*;
265
266    impl SlotSemaphore for Rc<Semaphore> {
267        fn new(count: usize) -> Self {
268            Rc::new(Semaphore::new(count))
269        }
270
271        fn try_take(&self, n: usize) -> Result<bool, SlotSemaphoreClosedError> {
272            match self.try_acquire_many(n) {
273                Ok(permit) => {
274                    permit.forget();
275                    Ok(true)
276                }
277                Err(TryAcquireError::NoPermits) => Ok(false),
278                Err(TryAcquireError::Closed) => Err(SlotSemaphoreClosedError::new()),
279            }
280        }
281
282        fn give(&self, n: usize) {
283            self.add_permits(n)
284        }
285
286        fn close(&self) {
287            Semaphore::close(self);
288        }
289    }
290
291    impl AsyncSlotSemaphore for Rc<Semaphore> {
292        async fn take(&self, n: usize) -> Result<(), SlotSemaphoreClosedError> {
293            self.acquire_many(n)
294                .await
295                .map_err(|_| SlotSemaphoreClosedError::new())?
296                .forget();
297            Ok(())
298        }
299    }
300}