opentitanlib/util/
runtime.rs1use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::{Arc, LazyLock, Mutex};
7use std::task::{Context, Poll, Wake, Waker};
8use std::time::Duration;
9
10use anyhow::Result;
11use tokio::runtime::Runtime;
12
13static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
14
15pub fn runtime() -> &'static Runtime {
16 &RUNTIME
17}
18
19pub fn block_on<F: Future>(future: F) -> F::Output {
23 tokio::task::block_in_place(|| runtime().block_on(future))
24}
25
26pub fn poll_later<T>(cx: &mut Context<'_>, timeout: Duration) -> Poll<T> {
31 let waker = cx.waker().clone();
32 runtime().spawn(async move {
33 tokio::time::sleep(timeout).await;
34 waker.wake();
35 });
36 Poll::Pending
37}
38
39struct MultiWakerInner {
40 generation: AtomicU32,
42 wakers: Mutex<Vec<Waker>>,
43}
44
45impl Wake for MultiWakerInner {
46 fn wake(self: Arc<Self>) {
47 self.wake_by_ref();
48 }
49
50 fn wake_by_ref(self: &Arc<Self>) {
51 let mut guard = self.wakers.lock().unwrap();
52 let wakers = std::mem::take(&mut *guard);
53 self.generation.fetch_add(1, Ordering::Relaxed);
54 drop(guard);
55
56 for waker in wakers {
57 waker.wake();
58 }
59 }
60}
61
62pub struct MultiWaker {
69 inner: Arc<MultiWakerInner>,
70}
71
72pub struct MultiWakerRegistration<'a> {
79 generation: u32,
80 waker: &'a MultiWakerInner,
81}
82
83impl<'a> MultiWakerRegistration<'a> {
84 pub fn register(self, waker: &Waker) {
86 let mut guard = self.waker.wakers.lock().unwrap();
88 let generation = self.waker.generation.load(Ordering::Relaxed);
89 if generation == self.generation {
90 if guard.iter().all(|w| !w.will_wake(waker)) {
91 guard.push(waker.clone());
92 }
93 } else {
94 drop(guard);
98 waker.wake_by_ref();
99 }
100 }
101}
102
103impl MultiWaker {
104 pub fn new() -> Self {
106 Self {
107 inner: Arc::new(MultiWakerInner {
108 generation: AtomicU32::new(0),
109 wakers: Mutex::new(Vec::new()),
110 }),
111 }
112 }
113
114 pub fn waker(&self) -> Waker {
116 self.inner.clone().into()
117 }
118
119 pub fn register(&self) -> MultiWakerRegistration<'_> {
124 MultiWakerRegistration {
125 generation: self.inner.generation.load(Ordering::Relaxed),
126 waker: &self.inner,
127 }
128 }
129
130 pub fn poll_with<T, F: FnOnce(&mut Context<'_>) -> Poll<T>>(
134 &self,
135 cx: &mut Context<'_>,
136 f: F,
137 ) -> Poll<T> {
138 let waker = self.waker();
139 let register = self.register();
140 let mut new_cx = Context::from_waker(&waker);
141 match f(&mut new_cx) {
142 Poll::Ready(v) => Poll::Ready(v),
143 Poll::Pending => {
144 register.register(cx.waker());
145 Poll::Pending
146 }
147 }
148 }
149}
150
151impl Default for MultiWaker {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156
157pub async fn shutdown_signal() {
159 let ctrl_c = async {
160 tokio::signal::ctrl_c()
161 .await
162 .expect("failed to install Ctrl+C handler");
163 };
164
165 let terminate = async {
166 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
167 .expect("failed to install signal handler")
168 .recv()
169 .await;
170 };
171
172 tokio::select! {
173 biased;
174 _ = ctrl_c => {},
175 _ = terminate => {},
176 }
177}
178
179pub async fn with_graceful_shutdown(fut: impl Future<Output = Result<()>>) -> Result<()> {
181 tokio::select! {
182 v = fut => v,
183 _ = shutdown_signal() => Ok(()),
184 }
185}