opentitanlib/io/console/
broadcast.rs1use std::collections::VecDeque;
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll, ready};
8use std::time::Duration;
9
10use anyhow::Result;
11
12use super::{ConsoleDevice, ConsoleExt};
13use crate::io::uart::{FlowControl, Parity, Uart};
14
15pub struct Broadcaster<T> {
21 inner: Arc<Mutex<BroadcasterInner<T>>>,
22 index: usize,
23}
24
25impl<T> Clone for Broadcaster<T> {
26 fn clone(&self) -> Self {
27 let mut inner = self.inner.lock().unwrap();
28
29 let pos = inner.reader_pos[self.index];
30
31 let none_index = inner.reader_pos.iter().position(|x| x.is_none());
33 let index = if let Some(index) = none_index {
34 inner.reader_pos[index] = pos;
35 index
36 } else {
37 let index = inner.reader_pos.len();
38 inner.reader_pos.push(pos);
39 index
40 };
41
42 Self {
43 inner: self.inner.clone(),
44 index,
45 }
46 }
47}
48
49impl<T> Drop for Broadcaster<T> {
50 fn drop(&mut self) {
51 let mut inner = self.inner.lock().unwrap();
52
53 inner.reader_pos[self.index] = None;
56 while let Some(None) = inner.reader_pos.last() {
57 inner.reader_pos.pop();
58 }
59
60 if inner.count() != 0 {
62 inner.shrink();
63 }
64 }
65}
66
67struct BroadcasterInner<T> {
68 buffer: VecDeque<u8>,
70 reader_pos: Vec<Option<usize>>,
72 inner: T,
74}
75
76impl<T> BroadcasterInner<T> {
77 fn count(&self) -> usize {
78 self.reader_pos.iter().filter(|x| x.is_some()).count()
79 }
80
81 fn shrink(&mut self) {
82 let min_pos = self.reader_pos.iter().filter_map(|x| *x).min().unwrap();
84 self.buffer.drain(..min_pos);
85
86 self.reader_pos
87 .iter_mut()
88 .filter_map(|x| x.as_mut())
89 .for_each(|x| *x -= min_pos);
90 }
91}
92
93impl<T> Broadcaster<T> {
94 pub fn new(inner: T) -> Broadcaster<T> {
95 Self {
96 inner: Arc::new(Mutex::new(BroadcasterInner {
97 buffer: VecDeque::new(),
98 reader_pos: vec![Some(0)],
99 inner,
100 })),
101 index: 0,
102 }
103 }
104}
105
106impl<T: ConsoleDevice> ConsoleDevice for Broadcaster<T> {
107 fn poll_read(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
108 let mut inner = self.inner.lock().unwrap();
109
110 let current_pos = inner.reader_pos[self.index].unwrap();
111 if current_pos < inner.buffer.len() {
112 let (front, back) = inner.buffer.as_slices();
115
116 let front_skip = std::cmp::min(current_pos, front.len());
117 let front_copy = std::cmp::min(front.len() - front_skip, buf.len());
118 buf[..front_copy].copy_from_slice(&front[front_skip..][..front_copy]);
119
120 let back_skip = current_pos.saturating_sub(front_skip);
121 let back_copy = std::cmp::min(back.len() - back_skip, buf.len() - front_copy);
122 buf[front_copy..][..back_copy].copy_from_slice(&back[back_skip..][..back_copy]);
123
124 let copy_len = front_copy + back_copy;
125 *inner.reader_pos[self.index].as_mut().unwrap() += copy_len;
126
127 inner.shrink();
128 return Poll::Ready(Ok(copy_len));
129 }
130
131 let len = ready!(inner.inner.poll_read(cx, buf))?;
132
133 let total_readers = inner.count();
135 if total_readers != 1 {
136 inner.buffer.extend(&buf[..len]);
137 *inner.reader_pos[self.index].as_mut().unwrap() += len;
138 }
139
140 Poll::Ready(Ok(len))
141 }
142
143 fn write(&self, buf: &[u8]) -> Result<()> {
144 self.inner.lock().unwrap().inner.write(buf)
145 }
146}
147
148impl<T: Uart> Uart for Broadcaster<T> {
149 fn get_baudrate(&self) -> Result<u32> {
150 self.inner.lock().unwrap().inner.get_baudrate()
151 }
152
153 fn set_baudrate(&self, baudrate: u32) -> Result<()> {
154 self.inner.lock().unwrap().inner.set_baudrate(baudrate)
155 }
156
157 fn get_flow_control(&self) -> Result<FlowControl> {
158 self.inner.lock().unwrap().inner.get_flow_control()
159 }
160
161 fn set_flow_control(&self, flow_control: bool) -> Result<()> {
162 self.inner
163 .lock()
164 .unwrap()
165 .inner
166 .set_flow_control(flow_control)
167 }
168
169 fn get_device_path(&self) -> Result<String> {
170 self.inner.lock().unwrap().inner.get_device_path()
171 }
172
173 fn clear_rx_buffer(&self) -> Result<()> {
174 {
176 let inner = self.inner.lock().unwrap();
177 if inner.count() == 1 {
178 inner.inner.clear_rx_buffer()?;
179 return Ok(());
180 }
181 }
182
183 const TIMEOUT: Duration = Duration::from_millis(5);
186 let mut buf = [0u8; 256];
187 while self.read_timeout(&mut buf, TIMEOUT)? > 0 {}
188 Ok(())
189 }
190
191 fn set_parity(&self, parity: Parity) -> Result<()> {
192 self.inner.lock().unwrap().inner.set_parity(parity)
193 }
194
195 fn get_parity(&self) -> Result<Parity> {
196 self.inner.lock().unwrap().inner.get_parity()
197 }
198
199 fn set_break(&self, enable: bool) -> Result<()> {
200 self.inner.lock().unwrap().inner.set_break(enable)
201 }
202}