use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use serialport::Parity;
use thiserror::Error;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum UartStopBits {
Stop1,
Stop1_5,
Stop2,
}
#[derive(Clone, Debug)]
pub struct UartBitbangConfig {
pub data_bits: u8, pub stop_bits: UartStopBits, pub break_char_cycles: u8,
pub parity: Parity,
}
impl UartBitbangConfig {
pub fn new(
data_bits: u8,
stop_bits: UartStopBits,
break_char_cycles: u8,
parity: Parity,
) -> Result<UartBitbangConfig> {
if !(5..=8).contains(&data_bits) {
bail!("UART bitbanging only supports between 5 and 8 bit data.");
}
if stop_bits == UartStopBits::Stop1_5 {
bail!("UART bitbanging only supports 1 or 2 stop bits.");
}
Ok(Self {
data_bits,
stop_bits,
break_char_cycles,
parity,
})
}
fn stop_bit_time(&self) -> u8 {
match self.stop_bits {
UartStopBits::Stop1 => 1,
_ => 2,
}
}
pub fn bit_time_per_frame(&self) -> u32 {
let start_bit = 1;
let parity_bits = (self.parity != Parity::None) as u8;
let stop_bits = self.stop_bit_time();
(start_bit + self.data_bits + parity_bits + stop_bits).into()
}
pub fn break_bit_time(&self) -> u32 {
self.bit_time_per_frame() * self.break_char_cycles as u32
}
}
#[inline]
pub fn compute_parity(data: u8, parity_bit: Option<bool>) -> bool {
let data_parity = (data.count_ones() % 2) != 0;
if let Some(bit) = parity_bit {
data_parity ^ bit
} else {
data_parity
}
}
#[derive(Debug, PartialEq)]
pub enum UartTransfer {
Byte {
data: u8,
},
Broken {
data: u8,
parity: Option<bool>,
error: UartTransferDecodeError,
},
Break,
}
pub struct UartBitbangEncoder<const TX: u8> {
pub config: UartBitbangConfig,
}
impl<const TX: u8> UartBitbangEncoder<TX> {
pub fn new(config: UartBitbangConfig) -> Self {
Self { config }
}
pub fn encode_break(&self, samples: &mut Vec<u8>) {
for _ in 0..self.config.break_bit_time() {
samples.push(0x00 << TX);
}
for _ in 0..self.config.stop_bit_time() {
samples.push(0x01 << TX);
}
}
pub fn encode_character(&self, data: u8, samples: &mut Vec<u8>) {
samples.push(0x00 << TX);
for bit_index in 0..self.config.data_bits {
let bit = (data >> bit_index) & 0x01;
samples.push(bit << TX);
}
let parity = compute_parity(data, None);
match self.config.parity {
Parity::Even => samples.push((parity as u8) << TX),
Parity::Odd => samples.push((!parity as u8) << TX),
Parity::None => (),
}
for _ in 0..self.config.stop_bit_time() {
samples.push(0x01 << TX);
}
}
pub fn encode_characters(&self, chars: &[u8], samples: &mut Vec<u8>) {
for char in chars {
self.encode_character(*char, samples);
}
}
pub fn encode_transfer(&self, transfer: &UartTransfer, samples: &mut Vec<u8>) -> Result<()> {
match *transfer {
UartTransfer::Broken { .. } => bail!("Cannot encode a broken UART transfer"),
UartTransfer::Break => self.encode_break(samples),
UartTransfer::Byte { data } => self.encode_character(data, samples),
}
Ok(())
}
pub fn encode_transfers(
&self,
transfers: &[UartTransfer],
samples: &mut Vec<u8>,
) -> Result<()> {
for transfer in transfers {
self.encode_transfer(transfer, samples)?;
}
Ok(())
}
}
#[derive(Error, Debug, PartialEq, Serialize, Deserialize)]
pub enum UartTransferDecodeError {
#[error("Computed parity does not match expected parity")]
ParityMismatch,
#[error("Stop was not held high for the full stop time")]
InvalidStop,
#[error(
"RX held low too long for a valid transmission, but not long enough for a break condition"
)]
InvalidBreak,
}
#[derive(Debug, PartialEq)]
enum DecodingState {
Idle,
Data {
data: u8, bits: u8, },
Parity {
data: u8, },
Stop {
data: u8, parity: Option<bool>, stop_data: u8, stop_bits: u8, },
Break {
bits: u32, },
}
pub struct UartBitbangDecoder<const RX: u8> {
pub config: UartBitbangConfig,
state: DecodingState,
}
impl<const RX: u8> UartBitbangDecoder<RX> {
pub fn new(config: UartBitbangConfig) -> Self {
Self {
config,
state: DecodingState::Idle,
}
}
fn get_decoded_break(&mut self) -> Result<UartTransfer> {
let DecodingState::Break { bits } = self.state else {
bail!("`get_decoded_break` called before decoding a break");
};
if bits < self.config.break_bit_time() {
let parity = if self.config.parity != Parity::None {
Some(false)
} else {
None
};
Ok(UartTransfer::Broken {
data: 0x00,
parity,
error: UartTransferDecodeError::InvalidBreak,
})
} else {
Ok(UartTransfer::Break)
}
}
fn get_decoded_character(&mut self) -> Result<UartTransfer> {
let DecodingState::Stop {
data,
parity,
stop_data,
stop_bits,
} = self.state
else {
bail!("`get_decoded_character` called before the end of a transmission");
};
if stop_bits != self.config.stop_bit_time() {
bail!("`get_decoded_character` called before reading all stop bits");
}
if stop_data.count_ones() as u8 != stop_bits {
return Ok(UartTransfer::Broken {
data,
parity,
error: UartTransferDecodeError::InvalidStop,
});
}
if self.config.parity != Parity::None {
let decoded_parity = compute_parity(data, parity);
let expected_parity = self.config.parity != Parity::Even;
if expected_parity != decoded_parity {
return Ok(UartTransfer::Broken {
data,
parity,
error: UartTransferDecodeError::ParityMismatch,
});
}
}
Ok(UartTransfer::Byte { data })
}
pub fn decode_sample(&mut self, sample: u8) -> Result<Option<UartTransfer>> {
let rx = (sample >> RX) & 0x1;
match self.state {
DecodingState::Idle => {
if rx == 0 {
self.state = DecodingState::Data {
data: 0x00,
bits: 0,
};
}
}
DecodingState::Data { mut data, mut bits } => {
data |= rx << bits;
bits += 1;
self.state = if bits >= self.config.data_bits {
if self.config.parity == Parity::None {
DecodingState::Stop {
data,
parity: None,
stop_data: 0x00,
stop_bits: 0,
}
} else {
DecodingState::Parity { data }
}
} else {
DecodingState::Data { data, bits }
}
}
DecodingState::Parity { data } => {
self.state = DecodingState::Stop {
data,
parity: Some(rx != 0),
stop_data: 0x00,
stop_bits: 0,
};
}
DecodingState::Stop {
data,
parity,
mut stop_data,
mut stop_bits,
} => {
stop_data |= rx << stop_bits;
stop_bits += 1;
self.state = DecodingState::Stop {
data,
parity,
stop_data,
stop_bits,
};
if stop_bits >= self.config.stop_bit_time() {
if data != 0x00 || parity == Some(true) || stop_data != 0x00 {
let decoded = self.get_decoded_character()?;
self.state = DecodingState::Idle;
return Ok(Some(decoded));
}
self.state = DecodingState::Break {
bits: self.config.bit_time_per_frame(),
}
}
}
DecodingState::Break { bits } => {
if rx != 0 {
let decoded = self.get_decoded_break()?;
self.state = DecodingState::Idle;
return Ok(Some(decoded));
}
self.state = DecodingState::Break { bits: bits + 1 };
}
}
Ok(None)
}
pub fn decode_samples(&mut self, samples: &Vec<u8>) -> Result<Vec<UartTransfer>> {
let mut transfers = vec![];
for sample in samples {
if let Some(transfer) = self.decode_sample(*sample)? {
transfers.push(transfer);
}
}
Ok(transfers)
}
pub fn is_idle(&self) -> bool {
self.state == DecodingState::Idle
}
pub fn reset(&mut self) {
self.state = DecodingState::Idle;
}
}
#[cfg(test)]
mod test {
use super::*;
fn compare_decoded_result(received: &[UartTransfer], expected: &[u8]) -> Result<()> {
assert_eq!(received.len(), expected.len());
for (transfer, expected) in received.iter().zip(expected.iter()) {
match transfer {
UartTransfer::Byte { data } => {
assert_eq!(data, expected);
}
_ => bail!("Only expected to decode bytes"),
}
}
Ok(())
}
fn uart_encode_decode(config: UartBitbangConfig, message: Option<&[u8]>) -> Result<()> {
let encoder = UartBitbangEncoder::<0>::new(config.clone());
let mut decoder = UartBitbangDecoder::<0>::new(config);
let msg = message.unwrap_or(b"Hello, this is a simple UART test message.");
let mut samples = Vec::new();
encoder.encode_characters(msg, &mut samples);
assert!(!samples.is_empty());
let decoded = decoder
.decode_samples(&samples)
.expect("Should have decoded the bitbanged message");
assert!(decoder.is_idle());
compare_decoded_result(&decoded, msg)
}
#[test]
fn smoke() -> Result<()> {
let config = UartBitbangConfig::new(8, UartStopBits::Stop2, 1, Parity::None)?;
uart_encode_decode(config.clone(), None)?;
uart_encode_decode(config.clone(), Some(b"abc def ghi jkl"))?;
uart_encode_decode(config.clone(), Some(b"12345"))?;
let encoder = UartBitbangEncoder::<0>::new(config);
let bytes = [0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF];
let mut samples = Vec::new();
encoder.encode_characters(&bytes, &mut samples);
let expected = [
0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0,
1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1,
0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1,
1,
];
assert_eq!(samples, expected);
Ok(())
}
#[test]
fn bit_indexes() -> Result<()> {
let config = UartBitbangConfig::new(8, UartStopBits::Stop2, 1, Parity::None)?;
let encoder_1 = UartBitbangEncoder::<1>::new(config.clone());
let encoder_5 = UartBitbangEncoder::<5>::new(config.clone());
let mut decoder_1 = UartBitbangDecoder::<1>::new(config.clone());
let mut decoder_5 = UartBitbangDecoder::<5>::new(config.clone());
let msg = b"UART bit test message";
let mut samples_1 = Vec::new();
let mut samples_5 = Vec::new();
encoder_1.encode_characters(msg, &mut samples_1);
encoder_5.encode_characters(msg, &mut samples_5);
assert_eq!(samples_1.len(), samples_5.len());
let samples = samples_1.iter().zip(samples_5.iter());
for (sample_1, sample_5) in samples {
assert!((sample_1 >> 1) & 0x1 == (sample_5 >> 5) & 0x1);
assert_eq!(sample_1 & !(0x1 << 1), 0x00);
assert_eq!(sample_5 & !(0x1 << 5), 0x00);
}
let decoded_1 = decoder_1
.decode_samples(&samples_1)
.expect("Should have decoded the bitbanged message");
assert!(decoder_1.is_idle());
let decoded_5 = decoder_5
.decode_samples(&samples_5)
.expect("Should have decoded the bitbanged message");
assert!(decoder_5.is_idle());
compare_decoded_result(&decoded_1, msg)?;
compare_decoded_result(&decoded_5, msg)
}
#[test]
fn data_bits() -> Result<()> {
assert!(UartBitbangConfig::new(4, UartStopBits::Stop2, 1, Parity::None).is_err());
assert!(UartBitbangConfig::new(9, UartStopBits::Stop2, 1, Parity::None).is_err());
let test_msg = b"data_bits TEST STRING";
for data_bits in 5..=8 {
let data_mask = ((0x1u16 << data_bits) - 1) as u8;
let msg = test_msg.iter().map(|b| b & data_mask).collect::<Vec<_>>();
uart_encode_decode(
UartBitbangConfig::new(data_bits, UartStopBits::Stop2, 1, Parity::None)?,
Some(&msg),
)?;
}
Ok(())
}
#[test]
fn stop_bits() -> Result<()> {
assert!(UartBitbangConfig::new(8, UartStopBits::Stop1_5, 1, Parity::None).is_err());
for stop_bits in [UartStopBits::Stop1, UartStopBits::Stop2] {
uart_encode_decode(
UartBitbangConfig::new(8, stop_bits, 1, Parity::None)?,
Some(b"hello from the `stop_bits()` test!"),
)?;
}
let mut samples = Vec::new();
UartBitbangEncoder::<0>::new(UartBitbangConfig::new(
8,
UartStopBits::Stop1,
1,
Parity::None,
)?)
.encode_character(0xA5, &mut samples);
assert_eq!(&samples, &[0, 1, 0, 1, 0, 0, 1, 0, 1, 1]);
samples.clear();
UartBitbangEncoder::<0>::new(UartBitbangConfig::new(
8,
UartStopBits::Stop2,
1,
Parity::None,
)?)
.encode_character(0xA5, &mut samples);
assert_eq!(&samples, &[0, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1]);
assert_eq!(
UartBitbangDecoder::<0>::new(UartBitbangConfig::new(
8,
UartStopBits::Stop2,
1,
Parity::None
)?)
.decode_samples(&vec![0, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0])?,
vec![UartTransfer::Broken {
data: 0xA5,
parity: None,
error: UartTransferDecodeError::InvalidStop,
}]
);
Ok(())
}
#[test]
fn parity_bits() -> Result<()> {
let tests = [
(Parity::None, vec![0, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1]),
(Parity::Odd, vec![0, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1, 1]),
(Parity::Even, vec![0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1, 1]),
];
for (parity, expected) in tests {
uart_encode_decode(
UartBitbangConfig::new(8, UartStopBits::Stop2, 1, parity)?,
Some(b"this string is used for testing parity"),
)?;
let config = UartBitbangConfig::new(8, UartStopBits::Stop2, 1, parity)?;
let mut samples = Vec::new();
UartBitbangEncoder::<0>::new(config.clone()).encode_character(0xA5, &mut samples);
assert_eq!(&samples, &expected);
if parity == Parity::None {
continue;
}
let mut decoder = UartBitbangDecoder::<0>::new(config.clone());
let mut invalid_samples = expected.clone();
invalid_samples[9] = !invalid_samples[9] & 0x01;
assert_eq!(
decoder.decode_samples(&invalid_samples)?,
vec![UartTransfer::Broken {
data: 0xA5,
parity: Some(invalid_samples[9] != 0),
error: UartTransferDecodeError::ParityMismatch,
}]
);
decoder.reset();
for data_bit in 0..8 {
invalid_samples = expected.clone();
invalid_samples[1 + data_bit] = !expected[1 + data_bit] & 0x01;
assert_eq!(
decoder.decode_samples(&invalid_samples)?,
vec![UartTransfer::Broken {
data: 0xA5 ^ (0x1 << data_bit),
parity: Some(expected[9] != 0),
error: UartTransferDecodeError::ParityMismatch,
}]
);
decoder.reset();
}
}
Ok(())
}
#[test]
fn breaks() -> Result<()> {
let break_transfer = [
UartTransfer::Byte { data: 0x12 },
UartTransfer::Byte { data: 0x34 },
UartTransfer::Break,
UartTransfer::Byte { data: 0x56 },
UartTransfer::Byte { data: 0x78 },
];
for break_cycles in 1..=5 {
let config =
UartBitbangConfig::new(8, UartStopBits::Stop2, break_cycles, Parity::None)?;
let encoder = UartBitbangEncoder::<0>::new(config.clone());
let mut decoder = UartBitbangDecoder::<0>::new(config.clone());
let mut samples = Vec::new();
encoder.encode_transfers(&break_transfer, &mut samples)?;
assert!(
samples.len() > (config.bit_time_per_frame() as usize) * (break_cycles as usize)
);
let decoded = decoder
.decode_samples(&samples)
.expect("Should have decoded the bitbanged transmission");
assert!(decoder.is_idle());
assert_eq!(&break_transfer, &decoded[..]);
}
assert_eq!(
UartBitbangDecoder::<0>::new(UartBitbangConfig::new(
8,
UartStopBits::Stop2,
2,
Parity::None
)?)
.decode_samples(&vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1])?,
vec![UartTransfer::Broken {
data: 0x00,
parity: None,
error: UartTransferDecodeError::InvalidBreak,
}]
);
Ok(())
}
#[test]
fn partial_transfers() -> Result<()> {
let mut decoder = UartBitbangDecoder::<0>::new(UartBitbangConfig::new(
8,
UartStopBits::Stop2,
1,
Parity::None,
)?);
assert!(decoder.is_idle());
for sample in [0, 1, 0, 1, 0, 0, 1, 0, 1, 1] {
assert!(decoder.decode_samples(&vec![sample])?.is_empty());
assert!(!decoder.is_idle());
}
assert_eq!(
decoder.decode_samples(&vec![1])?,
vec![UartTransfer::Byte { data: 0xA5 }]
);
assert!(decoder.is_idle());
Ok(())
}
}