opentitanlib/util/
runtime.rs

1// Copyright lowRISC contributors (OpenTitan project).
2// Licensed under the Apache License, Version 2.0, see LICENSE for details.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::{Arc, LazyLock, Mutex};
7use std::task::{Context, Poll, Wake, Waker};
8use std::time::Duration;
9
10use anyhow::Result;
11use tokio::runtime::Runtime;
12
13static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
14
15pub fn runtime() -> &'static Runtime {
16    &RUNTIME
17}
18
19/// Block on a future to complete.
20///
21/// As opentitanlib is currently messy with its use MIO, to aid migration we allow this to be called within async context too.
22pub fn block_on<F: Future>(future: F) -> F::Output {
23    tokio::task::block_in_place(|| runtime().block_on(future))
24}
25
26/// Ask the runtime to poll `timeout` later.
27///
28/// This is a helper function for things that do not yet support notification when they're ready and require
29/// continuous polling with timeouts.
30pub fn poll_later<T>(cx: &mut Context<'_>, timeout: Duration) -> Poll<T> {
31    let waker = cx.waker().clone();
32    runtime().spawn(async move {
33        tokio::time::sleep(timeout).await;
34        waker.wake();
35    });
36    Poll::Pending
37}
38
39struct MultiWakerInner {
40    // A counter that is incremented every time wake happens.
41    generation: AtomicU32,
42    wakers: Mutex<Vec<Waker>>,
43}
44
45impl Wake for MultiWakerInner {
46    fn wake(self: Arc<Self>) {
47        self.wake_by_ref();
48    }
49
50    fn wake_by_ref(self: &Arc<Self>) {
51        let mut guard = self.wakers.lock().unwrap();
52        let wakers = std::mem::take(&mut *guard);
53        self.generation.fetch_add(1, Ordering::Relaxed);
54        drop(guard);
55
56        for waker in wakers {
57            waker.wake();
58        }
59    }
60}
61
62/// A utility type ensures that multiple pollers can all be waken up.
63///
64/// With `tokio`'s design, `poll_*` functions will only wake up the waker associated with the last poller.
65/// This works well with tokio's types, however due to OT-lib design issues, we might have multiple pollers
66/// (e.g. `Uart`'s API takes `&self` instead of `&mut self`). This utility ensures that even if multiple pollers
67/// are present, no wake messages will be lost.
68pub struct MultiWaker {
69    inner: Arc<MultiWakerInner>,
70}
71
72/// A intent to register a waker to `MultiWaker`.
73///
74/// When registration eventually happens, it is treated as if the registration happens at the point in time when
75/// the instance of this type is created.
76///
77/// Dropping this type without registration is a cheap no-op.
78pub struct MultiWakerRegistration<'a> {
79    generation: u32,
80    waker: &'a MultiWakerInner,
81}
82
83impl<'a> MultiWakerRegistration<'a> {
84    /// Complete the registration of waker.
85    pub fn register(self, waker: &Waker) {
86        // When we received pending, we need to register the waker.
87        let mut guard = self.waker.wakers.lock().unwrap();
88        let generation = self.waker.generation.load(Ordering::Relaxed);
89        if generation == self.generation {
90            if guard.iter().all(|w| !w.will_wake(waker)) {
91                guard.push(waker.clone());
92            }
93        } else {
94            // Conceptually, we registered `waker` at the point of creation of `self`.
95            // If generation number changes, it means our waker has been `wake()`-ed since creation,
96            // so we need to invoke the waker immediately.
97            drop(guard);
98            waker.wake_by_ref();
99        }
100    }
101}
102
103impl MultiWaker {
104    /// Create a new `MultiWaker`.
105    pub fn new() -> Self {
106        Self {
107            inner: Arc::new(MultiWakerInner {
108                generation: AtomicU32::new(0),
109                wakers: Mutex::new(Vec::new()),
110            }),
111        }
112    }
113
114    /// Obtain a waker that can be used to wake up all registered wakers.
115    pub fn waker(&self) -> Waker {
116        self.inner.clone().into()
117    }
118
119    /// Signal the intent to register a waker.
120    ///
121    /// When registration eventually happens, it is treated as if the registration happens at this point in time.
122    /// This is a cheap operation compared to the actual registration of the waker.
123    pub fn register(&self) -> MultiWakerRegistration<'_> {
124        MultiWakerRegistration {
125            generation: self.inner.generation.load(Ordering::Relaxed),
126            waker: &self.inner,
127        }
128    }
129
130    /// Call a polling function with the guarantee of not losing `Waker`.
131    ///
132    /// For the guarantee to be upheld the same `MultiWaker` instance should be used for every poll call.
133    pub fn poll_with<T, F: FnOnce(&mut Context<'_>) -> Poll<T>>(
134        &self,
135        cx: &mut Context<'_>,
136        f: F,
137    ) -> Poll<T> {
138        let waker = self.waker();
139        let register = self.register();
140        let mut new_cx = Context::from_waker(&waker);
141        match f(&mut new_cx) {
142            Poll::Ready(v) => Poll::Ready(v),
143            Poll::Pending => {
144                register.register(cx.waker());
145                Poll::Pending
146            }
147        }
148    }
149}
150
151impl Default for MultiWaker {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157/// Listen for shutdown signals.
158pub async fn shutdown_signal() {
159    let ctrl_c = async {
160        tokio::signal::ctrl_c()
161            .await
162            .expect("failed to install Ctrl+C handler");
163    };
164
165    let terminate = async {
166        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
167            .expect("failed to install signal handler")
168            .recv()
169            .await;
170    };
171
172    tokio::select! {
173        biased;
174        _ = ctrl_c => {},
175        _ = terminate => {},
176    }
177}
178
179/// Run a future with graceful shutdown.
180pub async fn with_graceful_shutdown(fut: impl Future<Output = Result<()>>) -> Result<()> {
181    tokio::select! {
182        v = fut => v,
183        _ = shutdown_signal() => Ok(()),
184    }
185}