use anyhow::{anyhow, bail, Result};
use mio::{Events, Interest, Poll, Token};
use regex::{Captures, Regex};
use std::fs::File;
use std::io::{ErrorKind, Read, Write};
use std::os::fd::{AsFd, AsRawFd};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant, SystemTime};
use crate::io::console::{ConsoleDevice, ConsoleError};
use crate::util::file;
#[derive(Default)]
pub struct UartConsole {
pub logfile: Option<File>,
pub timeout: Option<Duration>,
pub deadline: Option<Instant>,
pub exit_success: Option<Regex>,
pub exit_failure: Option<Regex>,
pub timestamp: bool,
pub buffer: String,
pub newline: bool,
pub break_en: bool,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ExitStatus {
None,
CtrlC,
Timeout,
ExitSuccess,
ExitFailure,
}
pub trait ReadAsFd: Read + AsFd {}
impl<T: Read + AsFd> ReadAsFd for T {}
impl UartConsole {
const CTRL_B: u8 = 2;
const CTRL_C: u8 = 3;
const BUFFER_LEN: usize = 16384;
pub fn interact<T>(
&mut self,
device: &T,
mut stdin: Option<&mut dyn ReadAsFd>,
mut stdout: Option<&mut dyn Write>,
) -> Result<ExitStatus>
where
T: ConsoleDevice + ?Sized,
{
if let Some(timeout) = &self.timeout {
self.deadline = Some(Instant::now() + *timeout);
}
if device.supports_nonblocking_read()? {
return self.interact_mio(device, stdin, stdout);
}
loop {
match self.interact_once(device, &mut stdin, &mut stdout)? {
ExitStatus::None => {}
status => return Ok(status),
}
}
}
fn interact_mio<T>(
&mut self,
device: &T,
mut stdin: Option<&mut dyn ReadAsFd>,
mut stdout: Option<&mut dyn Write>,
) -> Result<ExitStatus>
where
T: ConsoleDevice + ?Sized,
{
if self.exit_success.as_ref().map(|rx| rx.is_match("")) == Some(true) {
self.uart_read(device, Duration::from_millis(10), &mut stdout)?;
return Ok(ExitStatus::ExitSuccess);
}
while self.uart_read(device, Duration::from_millis(0), &mut stdout)? {
if self
.exit_success
.as_ref()
.map(|rx| rx.is_match(&self.buffer))
== Some(true)
{
return Ok(ExitStatus::ExitSuccess);
}
if self
.exit_failure
.as_ref()
.map(|rx| rx.is_match(&self.buffer))
== Some(true)
{
return Ok(ExitStatus::ExitFailure);
}
}
let mut poll = Poll::new()?;
let transport_help_token = Self::get_next_token();
let nonblocking_help = device.nonblocking_help()?;
nonblocking_help.register_nonblocking_help(poll.registry(), transport_help_token)?;
let stdin_token = Self::get_next_token();
if stdin.is_some() {
poll.registry().register(
&mut mio::unix::SourceFd(&stdin.as_mut().unwrap().as_fd().as_raw_fd()),
stdin_token,
Interest::READABLE,
)?;
}
let uart_token = Self::get_next_token();
device.register_nonblocking_read(poll.registry(), uart_token)?;
let mut events = Events::with_capacity(2);
loop {
let now = Instant::now();
let poll_timeout = if let Some(deadline) = &self.deadline {
if now >= *deadline {
return Ok(ExitStatus::Timeout);
}
Some(*deadline - now)
} else {
None
};
match poll.poll(&mut events, poll_timeout) {
Ok(()) => (),
Err(err) if err.kind() == ErrorKind::Interrupted => {
continue;
}
Err(err) => bail!("poll: {}", err),
}
for event in events.iter() {
if event.token() == transport_help_token {
nonblocking_help.nonblocking_help()?;
} else if event.token() == stdin_token {
match self.process_input(device, &mut stdin)? {
ExitStatus::None => {}
status => return Ok(status),
}
} else if event.token() == uart_token {
while self.uart_read(device, Duration::from_millis(1), &mut stdout)? {
if self
.exit_success
.as_ref()
.map(|rx| rx.is_match(&self.buffer))
== Some(true)
{
return Ok(ExitStatus::ExitSuccess);
}
if self
.exit_failure
.as_ref()
.map(|rx| rx.is_match(&self.buffer))
== Some(true)
{
return Ok(ExitStatus::ExitFailure);
}
}
}
}
}
}
fn get_next_token() -> Token {
static TOKEN_COUNTER: AtomicUsize = AtomicUsize::new(0);
Token(TOKEN_COUNTER.fetch_add(1, Ordering::Relaxed))
}
fn uses_regex(&self) -> bool {
self.exit_success.is_some() || self.exit_failure.is_some()
}
fn append_buffer(&mut self, data: &[u8]) {
self.buffer.push_str(&String::from_utf8_lossy(data));
while self.buffer.len() > UartConsole::BUFFER_LEN {
self.buffer.remove(0);
}
}
fn uart_read<T>(
&mut self,
device: &T,
timeout: Duration,
stdout: &mut Option<&mut dyn Write>,
) -> Result<bool>
where
T: ConsoleDevice + ?Sized,
{
let mut buf = [0u8; 1024];
let effective_buf = if self.uses_regex() {
&mut buf[..1]
} else {
&mut buf
};
let len = device.console_read(effective_buf, timeout)?;
if len == 0 {
return Ok(false);
}
for i in 0..len {
if self.timestamp && self.newline {
let t = humantime::format_rfc3339_millis(SystemTime::now());
stdout.as_mut().map_or(Ok(()), |out| {
out.write_fmt(format_args!("[{} console]", t))
})?;
self.newline = false;
}
self.newline = buf[i] == b'\n';
stdout
.as_mut()
.map_or(Ok(()), |out| out.write_all(&buf[i..i + 1]))?;
}
stdout.as_mut().map_or(Ok(()), |out| out.flush())?;
self.logfile
.as_mut()
.map_or(Ok(()), |f| f.write_all(&buf[..len]))?;
if self.uses_regex() {
self.append_buffer(&buf[..len]);
}
Ok(true)
}
fn process_input<T>(
&mut self,
device: &T,
stdin: &mut Option<&mut (dyn ReadAsFd)>,
) -> Result<ExitStatus>
where
T: ConsoleDevice + ?Sized,
{
if let Some(ref mut input) = stdin.as_mut() {
while file::wait_read_timeout(&input.as_fd(), Duration::from_millis(0)).is_ok() {
let mut buf = [0u8; 256];
let len = input.read(&mut buf)?;
if len == 1 {
if buf[0] == UartConsole::CTRL_C {
return Ok(ExitStatus::CtrlC);
}
if buf[0] == UartConsole::CTRL_B {
self.break_en = !self.break_en;
eprint!(
"\r\n{} break",
if self.break_en { "Setting" } else { "Clearing" }
);
let b = device.set_break(self.break_en);
if b.is_err() {
eprint!(": {:?}", b);
}
eprint!("\r\n");
break;
}
}
if len > 0 {
device.console_write(&buf[..len])?;
} else {
break;
}
}
}
Ok(ExitStatus::None)
}
fn interact_once<T>(
&mut self,
device: &T,
stdin: &mut Option<&mut (dyn ReadAsFd)>,
stdout: &mut Option<&mut dyn Write>,
) -> Result<ExitStatus>
where
T: ConsoleDevice + ?Sized,
{
if let Some(deadline) = &self.deadline {
if Instant::now() > *deadline {
return Ok(ExitStatus::Timeout);
}
}
self.uart_read(device, Duration::from_millis(10), stdout)?;
if self
.exit_success
.as_ref()
.map(|rx| rx.is_match(&self.buffer))
== Some(true)
{
return Ok(ExitStatus::ExitSuccess);
}
if self
.exit_failure
.as_ref()
.map(|rx| rx.is_match(&self.buffer))
== Some(true)
{
return Ok(ExitStatus::ExitFailure);
}
self.process_input(device, stdin)
}
pub fn captures(&self, status: ExitStatus) -> Option<Captures> {
match status {
ExitStatus::ExitSuccess => self
.exit_success
.as_ref()
.and_then(|rx| rx.captures(&self.buffer)),
ExitStatus::ExitFailure => self
.exit_failure
.as_ref()
.and_then(|rx| rx.captures(&self.buffer)),
_ => None,
}
}
pub fn wait_for<T>(device: &T, rx: &str, timeout: Duration) -> Result<Vec<String>>
where
T: ConsoleDevice + ?Sized,
{
let mut console = UartConsole {
timestamp: true,
newline: true,
timeout: Some(timeout),
exit_success: Some(Regex::new(rx)?),
..Default::default()
};
let mut stdout = std::io::stdout();
let result = console.interact(device, None, Some(&mut stdout))?;
println!();
match result {
ExitStatus::ExitSuccess => {
let caps = console.captures(ExitStatus::ExitSuccess).expect("capture");
let mut vec = Vec::new();
for c in caps.iter() {
match c {
None => vec.push(String::new()),
_ => vec.push(c.unwrap().as_str().to_owned()),
}
}
Ok(vec)
}
ExitStatus::Timeout => Err(ConsoleError::GenericError("Timed Out".into()).into()),
_ => Err(anyhow!("Impossible result: {:?}", result)),
}
}
}