opentitanlib/io/console/
broadcast.rs

1// Copyright lowRISC contributors (OpenTitan project).
2// Licensed under the Apache License, Version 2.0, see LICENSE for details.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::VecDeque;
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll, ready};
8use std::time::Duration;
9
10use anyhow::Result;
11
12use super::{ConsoleDevice, ConsoleExt};
13use crate::io::uart::{FlowControl, Parity, Uart};
14
15/// Broadcast UART recevied data to multiple users.
16///
17/// Normally, if there are multiple users of `UART`, they share the same buffer (most commonly the kernel buffer).
18/// This means that only one user can read a specific piece of data at a time. `Broadcaster` ensures that all clone
19/// of it can receive all data.
20pub struct Broadcaster<T> {
21    inner: Arc<Mutex<BroadcasterInner<T>>>,
22    index: usize,
23}
24
25impl<T> Clone for Broadcaster<T> {
26    fn clone(&self) -> Self {
27        let mut inner = self.inner.lock().unwrap();
28        let pos = inner.reader_pos[self.index];
29        let index = inner.add_reader(pos);
30        Self {
31            inner: self.inner.clone(),
32            index,
33        }
34    }
35}
36
37impl<T> Drop for Broadcaster<T> {
38    fn drop(&mut self) {
39        let mut inner = self.inner.lock().unwrap();
40
41        // Remove the reader position for this clone.
42        // As this array can be sparse, remove all trailing sparse elements as a compactation step.
43        inner.reader_pos[self.index] = None;
44        while let Some(None) = inner.reader_pos.last() {
45            inner.reader_pos.pop();
46        }
47
48        // Dropping a broadcaster instance may cause the buffer to be shrinkable.
49        inner.shrink();
50    }
51}
52
53struct BroadcasterInner<T> {
54    /// Data received. Dequeing of a specific byte is not possible until all readers have consumed it.
55    buffer: VecDeque<u8>,
56    /// Reader positions. Each clone of broadcaster occupies a specific index.
57    reader_pos: Vec<Option<usize>>,
58    /// Inner instance to read from.
59    inner: T,
60}
61
62impl<T> BroadcasterInner<T> {
63    fn add_reader(&mut self, pos: Option<usize>) -> usize {
64        // Add a new position to the list. Try to use a freed index preferrably (i.e. None).
65        let none_index = self.reader_pos.iter().position(|x| x.is_none());
66        if let Some(index) = none_index {
67            self.reader_pos[index] = pos;
68            index
69        } else {
70            let index = self.reader_pos.len();
71            self.reader_pos.push(pos);
72            index
73        }
74    }
75
76    fn count(&self) -> usize {
77        self.reader_pos.iter().filter(|x| x.is_some()).count()
78    }
79
80    fn shrink(&mut self) {
81        // Now go through all reader_pos to see if we can drop some buffer now.
82        let Some(min_pos) = self.reader_pos.iter().filter_map(|x| *x).min() else {
83            // Dropped to 0 strong readers.
84            self.buffer.clear();
85            return;
86        };
87        self.buffer.drain(..min_pos);
88
89        self.reader_pos
90            .iter_mut()
91            .filter_map(|x| x.as_mut())
92            .for_each(|x| *x -= min_pos);
93    }
94}
95
96impl<T> Broadcaster<T> {
97    pub fn new(inner: T) -> Broadcaster<T> {
98        Self {
99            inner: Arc::new(Mutex::new(BroadcasterInner {
100                buffer: VecDeque::new(),
101                reader_pos: vec![Some(0)],
102                inner,
103            })),
104            index: 0,
105        }
106    }
107
108    /// Obtain a weak instance of this broadcaster that would not consume data.
109    pub fn downgrade(&self) -> WeakBroadcaster<T> {
110        WeakBroadcaster {
111            inner: self.inner.clone(),
112        }
113    }
114}
115
116impl<T: ConsoleDevice> ConsoleDevice for Broadcaster<T> {
117    fn poll_read(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
118        let mut inner = self.inner.lock().unwrap();
119
120        let current_pos = inner.reader_pos[self.index].unwrap();
121        if current_pos < inner.buffer.len() {
122            // Still more data to read from the buffer.
123            // Do some reading first.
124            let (front, back) = inner.buffer.as_slices();
125
126            let front_skip = std::cmp::min(current_pos, front.len());
127            let front_copy = std::cmp::min(front.len() - front_skip, buf.len());
128            buf[..front_copy].copy_from_slice(&front[front_skip..][..front_copy]);
129
130            let back_skip = current_pos.saturating_sub(front_skip);
131            let back_copy = std::cmp::min(back.len() - back_skip, buf.len() - front_copy);
132            buf[front_copy..][..back_copy].copy_from_slice(&back[back_skip..][..back_copy]);
133
134            let copy_len = front_copy + back_copy;
135            *inner.reader_pos[self.index].as_mut().unwrap() += copy_len;
136
137            inner.shrink();
138            return Poll::Ready(Ok(copy_len));
139        }
140
141        let len = ready!(inner.inner.poll_read(cx, buf))?;
142
143        // We've read some more data. If there're other readers, we need to push to the buffer.
144        let total_readers = inner.count();
145        if total_readers != 1 {
146            inner.buffer.extend(&buf[..len]);
147            *inner.reader_pos[self.index].as_mut().unwrap() += len;
148        }
149
150        Poll::Ready(Ok(len))
151    }
152
153    fn write(&self, buf: &[u8]) -> Result<()> {
154        self.inner.lock().unwrap().inner.write(buf)
155    }
156}
157
158impl<T: Uart> Uart for Broadcaster<T> {
159    fn get_baudrate(&self) -> Result<u32> {
160        self.inner.lock().unwrap().inner.get_baudrate()
161    }
162
163    fn set_baudrate(&self, baudrate: u32) -> Result<()> {
164        self.inner.lock().unwrap().inner.set_baudrate(baudrate)
165    }
166
167    fn get_flow_control(&self) -> Result<FlowControl> {
168        self.inner.lock().unwrap().inner.get_flow_control()
169    }
170
171    fn set_flow_control(&self, flow_control: bool) -> Result<()> {
172        self.inner
173            .lock()
174            .unwrap()
175            .inner
176            .set_flow_control(flow_control)
177    }
178
179    fn get_device_path(&self) -> Result<String> {
180        self.inner.lock().unwrap().inner.get_device_path()
181    }
182
183    fn clear_rx_buffer(&self) -> Result<()> {
184        // If we're the only user, clear the inner buffer.
185        {
186            let inner = self.inner.lock().unwrap();
187            if inner.count() == 1 {
188                inner.inner.clear_rx_buffer()?;
189                return Ok(());
190            }
191        }
192
193        // If we're not the only user, then we cannot clear RX buffer from `inner`
194        // as it disrupts other readers. Just do a read w/ timeout to clear out.
195        const TIMEOUT: Duration = Duration::from_millis(5);
196        let mut buf = [0u8; 256];
197        while self.read_timeout(&mut buf, TIMEOUT)? > 0 {}
198        Ok(())
199    }
200
201    fn set_parity(&self, parity: Parity) -> Result<()> {
202        self.inner.lock().unwrap().inner.set_parity(parity)
203    }
204
205    fn get_parity(&self) -> Result<Parity> {
206        self.inner.lock().unwrap().inner.get_parity()
207    }
208
209    fn set_break(&self, enable: bool) -> Result<()> {
210        self.inner.lock().unwrap().inner.set_break(enable)
211    }
212}
213
214/// `Broadcaster` but does not actually read data.
215///
216/// This copy can be used to create proper `Broadcaster`, however it does not create
217/// buffer bloat as data does not need to be kept for this copy.
218pub struct WeakBroadcaster<T> {
219    inner: Arc<Mutex<BroadcasterInner<T>>>,
220}
221
222impl<T> Clone for WeakBroadcaster<T> {
223    fn clone(&self) -> Self {
224        Self {
225            inner: self.inner.clone(),
226        }
227    }
228}
229
230impl<T> WeakBroadcaster<T> {
231    /// Obtain a `Broadcaster` that can receive console data from this weak instance.
232    pub fn upgrade(&self) -> Broadcaster<T> {
233        let mut inner = self.inner.lock().unwrap();
234        // When upgrading from a weak broadcaster, historic data doesn't matter.
235        let pos = inner.buffer.len();
236        let index = inner.add_reader(Some(pos));
237        Broadcaster {
238            inner: self.inner.clone(),
239            index,
240        }
241    }
242}