opentitanlib/io/console/
broadcast.rs1use std::collections::VecDeque;
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll, ready};
8use std::time::Duration;
9
10use anyhow::Result;
11
12use super::{ConsoleDevice, ConsoleExt};
13use crate::io::uart::{FlowControl, Parity, Uart};
14
15pub struct Broadcaster<T> {
21 inner: Arc<Mutex<BroadcasterInner<T>>>,
22 index: usize,
23}
24
25impl<T> Clone for Broadcaster<T> {
26 fn clone(&self) -> Self {
27 let mut inner = self.inner.lock().unwrap();
28 let pos = inner.reader_pos[self.index];
29 let index = inner.add_reader(pos);
30 Self {
31 inner: self.inner.clone(),
32 index,
33 }
34 }
35}
36
37impl<T> Drop for Broadcaster<T> {
38 fn drop(&mut self) {
39 let mut inner = self.inner.lock().unwrap();
40
41 inner.reader_pos[self.index] = None;
44 while let Some(None) = inner.reader_pos.last() {
45 inner.reader_pos.pop();
46 }
47
48 inner.shrink();
50 }
51}
52
53struct BroadcasterInner<T> {
54 buffer: VecDeque<u8>,
56 reader_pos: Vec<Option<usize>>,
58 inner: T,
60}
61
62impl<T> BroadcasterInner<T> {
63 fn add_reader(&mut self, pos: Option<usize>) -> usize {
64 let none_index = self.reader_pos.iter().position(|x| x.is_none());
66 if let Some(index) = none_index {
67 self.reader_pos[index] = pos;
68 index
69 } else {
70 let index = self.reader_pos.len();
71 self.reader_pos.push(pos);
72 index
73 }
74 }
75
76 fn count(&self) -> usize {
77 self.reader_pos.iter().filter(|x| x.is_some()).count()
78 }
79
80 fn shrink(&mut self) {
81 let Some(min_pos) = self.reader_pos.iter().filter_map(|x| *x).min() else {
83 self.buffer.clear();
85 return;
86 };
87 self.buffer.drain(..min_pos);
88
89 self.reader_pos
90 .iter_mut()
91 .filter_map(|x| x.as_mut())
92 .for_each(|x| *x -= min_pos);
93 }
94}
95
96impl<T> Broadcaster<T> {
97 pub fn new(inner: T) -> Broadcaster<T> {
98 Self {
99 inner: Arc::new(Mutex::new(BroadcasterInner {
100 buffer: VecDeque::new(),
101 reader_pos: vec![Some(0)],
102 inner,
103 })),
104 index: 0,
105 }
106 }
107
108 pub fn downgrade(&self) -> WeakBroadcaster<T> {
110 WeakBroadcaster {
111 inner: self.inner.clone(),
112 }
113 }
114}
115
116impl<T: ConsoleDevice> ConsoleDevice for Broadcaster<T> {
117 fn poll_read(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
118 let mut inner = self.inner.lock().unwrap();
119
120 let current_pos = inner.reader_pos[self.index].unwrap();
121 if current_pos < inner.buffer.len() {
122 let (front, back) = inner.buffer.as_slices();
125
126 let front_skip = std::cmp::min(current_pos, front.len());
127 let front_copy = std::cmp::min(front.len() - front_skip, buf.len());
128 buf[..front_copy].copy_from_slice(&front[front_skip..][..front_copy]);
129
130 let back_skip = current_pos.saturating_sub(front_skip);
131 let back_copy = std::cmp::min(back.len() - back_skip, buf.len() - front_copy);
132 buf[front_copy..][..back_copy].copy_from_slice(&back[back_skip..][..back_copy]);
133
134 let copy_len = front_copy + back_copy;
135 *inner.reader_pos[self.index].as_mut().unwrap() += copy_len;
136
137 inner.shrink();
138 return Poll::Ready(Ok(copy_len));
139 }
140
141 let len = ready!(inner.inner.poll_read(cx, buf))?;
142
143 let total_readers = inner.count();
145 if total_readers != 1 {
146 inner.buffer.extend(&buf[..len]);
147 *inner.reader_pos[self.index].as_mut().unwrap() += len;
148 }
149
150 Poll::Ready(Ok(len))
151 }
152
153 fn write(&self, buf: &[u8]) -> Result<()> {
154 self.inner.lock().unwrap().inner.write(buf)
155 }
156}
157
158impl<T: Uart> Uart for Broadcaster<T> {
159 fn get_baudrate(&self) -> Result<u32> {
160 self.inner.lock().unwrap().inner.get_baudrate()
161 }
162
163 fn set_baudrate(&self, baudrate: u32) -> Result<()> {
164 self.inner.lock().unwrap().inner.set_baudrate(baudrate)
165 }
166
167 fn get_flow_control(&self) -> Result<FlowControl> {
168 self.inner.lock().unwrap().inner.get_flow_control()
169 }
170
171 fn set_flow_control(&self, flow_control: bool) -> Result<()> {
172 self.inner
173 .lock()
174 .unwrap()
175 .inner
176 .set_flow_control(flow_control)
177 }
178
179 fn get_device_path(&self) -> Result<String> {
180 self.inner.lock().unwrap().inner.get_device_path()
181 }
182
183 fn clear_rx_buffer(&self) -> Result<()> {
184 {
186 let inner = self.inner.lock().unwrap();
187 if inner.count() == 1 {
188 inner.inner.clear_rx_buffer()?;
189 return Ok(());
190 }
191 }
192
193 const TIMEOUT: Duration = Duration::from_millis(5);
196 let mut buf = [0u8; 256];
197 while self.read_timeout(&mut buf, TIMEOUT)? > 0 {}
198 Ok(())
199 }
200
201 fn set_parity(&self, parity: Parity) -> Result<()> {
202 self.inner.lock().unwrap().inner.set_parity(parity)
203 }
204
205 fn get_parity(&self) -> Result<Parity> {
206 self.inner.lock().unwrap().inner.get_parity()
207 }
208
209 fn set_break(&self, enable: bool) -> Result<()> {
210 self.inner.lock().unwrap().inner.set_break(enable)
211 }
212}
213
214pub struct WeakBroadcaster<T> {
219 inner: Arc<Mutex<BroadcasterInner<T>>>,
220}
221
222impl<T> Clone for WeakBroadcaster<T> {
223 fn clone(&self) -> Self {
224 Self {
225 inner: self.inner.clone(),
226 }
227 }
228}
229
230impl<T> WeakBroadcaster<T> {
231 pub fn upgrade(&self) -> Broadcaster<T> {
233 let mut inner = self.inner.lock().unwrap();
234 let pos = inner.buffer.len();
236 let index = inner.add_reader(Some(pos));
237 Broadcaster {
238 inner: self.inner.clone(),
239 index,
240 }
241 }
242}