opentitanlib/io/console/
broadcast.rs

1// Copyright lowRISC contributors (OpenTitan project).
2// Licensed under the Apache License, Version 2.0, see LICENSE for details.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::VecDeque;
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll, ready};
8use std::time::Duration;
9
10use anyhow::Result;
11
12use super::{ConsoleDevice, ConsoleExt};
13use crate::io::uart::{FlowControl, Parity, Uart};
14
15/// Broadcast UART recevied data to multiple users.
16///
17/// Normally, if there are multiple users of `UART`, they share the same buffer (most commonly the kernel buffer).
18/// This means that only one user can read a specific piece of data at a time. `Broadcaster` ensures that all clone
19/// of it can receive all data.
20pub struct Broadcaster<T> {
21    inner: Arc<Mutex<BroadcasterInner<T>>>,
22    index: usize,
23}
24
25impl<T> Clone for Broadcaster<T> {
26    fn clone(&self) -> Self {
27        let mut inner = self.inner.lock().unwrap();
28
29        let pos = inner.reader_pos[self.index];
30
31        // Add a new position to the list. Try to use a freed index preferrably (i.e. None).
32        let none_index = inner.reader_pos.iter().position(|x| x.is_none());
33        let index = if let Some(index) = none_index {
34            inner.reader_pos[index] = pos;
35            index
36        } else {
37            let index = inner.reader_pos.len();
38            inner.reader_pos.push(pos);
39            index
40        };
41
42        Self {
43            inner: self.inner.clone(),
44            index,
45        }
46    }
47}
48
49impl<T> Drop for Broadcaster<T> {
50    fn drop(&mut self) {
51        let mut inner = self.inner.lock().unwrap();
52
53        // Remove the reader position for this clone.
54        // As this array can be sparse, remove all trailing sparse elements as a compactation step.
55        inner.reader_pos[self.index] = None;
56        while let Some(None) = inner.reader_pos.last() {
57            inner.reader_pos.pop();
58        }
59
60        // Dropping a broadcaster instance may cause the buffer to be shrinkable.
61        if inner.count() != 0 {
62            inner.shrink();
63        }
64    }
65}
66
67struct BroadcasterInner<T> {
68    /// Data received. Dequeing of a specific byte is not possible until all readers have consumed it.
69    buffer: VecDeque<u8>,
70    /// Reader positions. Each clone of broadcaster occupies a specific index.
71    reader_pos: Vec<Option<usize>>,
72    /// Inner instance to read from.
73    inner: T,
74}
75
76impl<T> BroadcasterInner<T> {
77    fn count(&self) -> usize {
78        self.reader_pos.iter().filter(|x| x.is_some()).count()
79    }
80
81    fn shrink(&mut self) {
82        // Now go through all reader_pos to see if we can drop some buffer now.
83        let min_pos = self.reader_pos.iter().filter_map(|x| *x).min().unwrap();
84        self.buffer.drain(..min_pos);
85
86        self.reader_pos
87            .iter_mut()
88            .filter_map(|x| x.as_mut())
89            .for_each(|x| *x -= min_pos);
90    }
91}
92
93impl<T> Broadcaster<T> {
94    pub fn new(inner: T) -> Broadcaster<T> {
95        Self {
96            inner: Arc::new(Mutex::new(BroadcasterInner {
97                buffer: VecDeque::new(),
98                reader_pos: vec![Some(0)],
99                inner,
100            })),
101            index: 0,
102        }
103    }
104}
105
106impl<T: ConsoleDevice> ConsoleDevice for Broadcaster<T> {
107    fn poll_read(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
108        let mut inner = self.inner.lock().unwrap();
109
110        let current_pos = inner.reader_pos[self.index].unwrap();
111        if current_pos < inner.buffer.len() {
112            // Still more data to read from the buffer.
113            // Do some reading first.
114            let (front, back) = inner.buffer.as_slices();
115
116            let front_skip = std::cmp::min(current_pos, front.len());
117            let front_copy = std::cmp::min(front.len() - front_skip, buf.len());
118            buf[..front_copy].copy_from_slice(&front[front_skip..][..front_copy]);
119
120            let back_skip = current_pos.saturating_sub(front_skip);
121            let back_copy = std::cmp::min(back.len() - back_skip, buf.len() - front_copy);
122            buf[front_copy..][..back_copy].copy_from_slice(&back[back_skip..][..back_copy]);
123
124            let copy_len = front_copy + back_copy;
125            *inner.reader_pos[self.index].as_mut().unwrap() += copy_len;
126
127            inner.shrink();
128            return Poll::Ready(Ok(copy_len));
129        }
130
131        let len = ready!(inner.inner.poll_read(cx, buf))?;
132
133        // We've read some more data. If there're other readers, we need to push to the buffer.
134        let total_readers = inner.count();
135        if total_readers != 1 {
136            inner.buffer.extend(&buf[..len]);
137            *inner.reader_pos[self.index].as_mut().unwrap() += len;
138        }
139
140        Poll::Ready(Ok(len))
141    }
142
143    fn write(&self, buf: &[u8]) -> Result<()> {
144        self.inner.lock().unwrap().inner.write(buf)
145    }
146}
147
148impl<T: Uart> Uart for Broadcaster<T> {
149    fn get_baudrate(&self) -> Result<u32> {
150        self.inner.lock().unwrap().inner.get_baudrate()
151    }
152
153    fn set_baudrate(&self, baudrate: u32) -> Result<()> {
154        self.inner.lock().unwrap().inner.set_baudrate(baudrate)
155    }
156
157    fn get_flow_control(&self) -> Result<FlowControl> {
158        self.inner.lock().unwrap().inner.get_flow_control()
159    }
160
161    fn set_flow_control(&self, flow_control: bool) -> Result<()> {
162        self.inner
163            .lock()
164            .unwrap()
165            .inner
166            .set_flow_control(flow_control)
167    }
168
169    fn get_device_path(&self) -> Result<String> {
170        self.inner.lock().unwrap().inner.get_device_path()
171    }
172
173    fn clear_rx_buffer(&self) -> Result<()> {
174        // If we're the only user, clear the inner buffer.
175        {
176            let inner = self.inner.lock().unwrap();
177            if inner.count() == 1 {
178                inner.inner.clear_rx_buffer()?;
179                return Ok(());
180            }
181        }
182
183        // If we're not the only user, then we cannot clear RX buffer from `inner`
184        // as it disrupts other readers. Just do a read w/ timeout to clear out.
185        const TIMEOUT: Duration = Duration::from_millis(5);
186        let mut buf = [0u8; 256];
187        while self.read_timeout(&mut buf, TIMEOUT)? > 0 {}
188        Ok(())
189    }
190
191    fn set_parity(&self, parity: Parity) -> Result<()> {
192        self.inner.lock().unwrap().inner.set_parity(parity)
193    }
194
195    fn get_parity(&self) -> Result<Parity> {
196        self.inner.lock().unwrap().inner.get_parity()
197    }
198
199    fn set_break(&self, enable: bool) -> Result<()> {
200        self.inner.lock().unwrap().inner.set_break(enable)
201    }
202}