use super::Bit;
use serde::{Deserialize, Serialize};
use std::iter::Peekable;
use thiserror::Error;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SpiEndpoint {
Host, Device, }
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SpiDataMode {
Single,
Dual,
Quad,
}
#[derive(Clone, Debug)]
pub struct SpiBitbangConfig {
pub cpol: bool, pub cpha: bool, pub data_mode: SpiDataMode,
pub bits_per_word: u32,
}
#[derive(Clone, Debug)]
pub struct SpiEncodingDelays {
pub inter_word_delay: u32,
pub cs_hold_delay: u32,
pub cs_release_delay: u32,
}
#[derive(Error, Debug, PartialEq, Serialize, Deserialize)]
pub enum SpiTransferEncodeError {
#[error("CS must be asserted before bitbanging a SPI transaction.")]
CsNotAsserted,
}
pub struct SpiBitbangEncoder<
const D0: u8,
const D1: u8,
const D2: u8,
const D3: u8,
const CLK: u8,
const CS: u8,
> {
pub config: SpiBitbangConfig,
pub delays: SpiEncodingDelays,
first_word: bool,
cs_asserted: bool,
}
const UNUSED: Bit = Bit::Low;
const CS_LOW: Bit = Bit::Low;
const CS_HIGH: Bit = Bit::High;
impl<const D0: u8, const D1: u8, const D2: u8, const D3: u8, const CLK: u8, const CS: u8>
SpiBitbangEncoder<D0, D1, D2, D3, CLK, CS>
{
pub fn new(config: SpiBitbangConfig, delays: SpiEncodingDelays) -> Self {
Self {
config,
delays,
first_word: true,
cs_asserted: false,
}
}
pub fn reset(&mut self) {
self.first_word = true;
self.cs_asserted = false;
}
pub fn set_data_mode(&mut self, mode: SpiDataMode) {
self.config.data_mode = mode;
}
fn sample(&self, d0: Bit, d1: Bit, d2: Bit, d3: Bit, clk: Bit, cs: Bit) -> u8 {
((d0 as u8) << D0)
| ((d1 as u8) << D1)
| ((d2 as u8) << D2)
| ((d3 as u8) << D3)
| ((clk as u8) << CLK)
| ((cs as u8) << CS)
}
fn encode_data(&self, d0: Bit, d1: Bit, d2: Bit, d3: Bit, samples: &mut Vec<u8>) {
let clk_idle = Bit::from(self.config.cpol);
let clk_active = Bit::from(!self.config.cpol);
samples.extend(if self.config.cpha {
[
self.sample(d0, d1, d2, d3, clk_active, CS_LOW),
self.sample(d0, d1, d2, d3, clk_idle, CS_LOW),
]
} else {
[
self.sample(d0, d1, d2, d3, clk_idle, CS_LOW),
self.sample(d0, d1, d2, d3, clk_active, CS_LOW),
]
})
}
fn encode_word(
&mut self,
words: &[u8],
samples: &mut Vec<u8>,
) -> Result<(), SpiTransferEncodeError> {
if !self.cs_asserted {
return Err(SpiTransferEncodeError::CsNotAsserted);
}
if self.first_word {
self.first_word = false;
}
let mut byte = 0x00u8;
let mut encoded_bits = 0u32; let mut words = words.iter();
while encoded_bits < self.config.bits_per_word {
let bits = encoded_bits % 8;
if bits == 0 {
if let Some(&next_byte) = words.next() {
byte = next_byte;
} else {
break;
}
}
match self.config.data_mode {
SpiDataMode::Single => {
let d0 = Bit::from((byte >> (7 - bits)) & 0x01);
self.encode_data(d0, UNUSED, UNUSED, UNUSED, samples);
encoded_bits += 1;
}
SpiDataMode::Dual => {
let d1 = Bit::from((byte >> (7 - bits)) & 0x01);
let d0 = Bit::from((byte >> (7 - (bits + 1))) & 0x01);
self.encode_data(d0, d1, UNUSED, UNUSED, samples);
encoded_bits += 2;
}
SpiDataMode::Quad => {
let d3 = Bit::from((byte >> (7 - bits)) & 0x01);
let d2 = Bit::from((byte >> (7 - (bits + 1))) & 0x01);
let d1 = Bit::from((byte >> (7 - (bits + 2))) & 0x01);
let d0 = Bit::from((byte >> (7 - (bits + 3))) & 0x01);
self.encode_data(d0, d1, d2, d3, samples);
encoded_bits += 4;
}
}
}
while encoded_bits != 0 && encoded_bits < self.config.bits_per_word {
match self.config.data_mode {
SpiDataMode::Single => {
self.encode_data(Bit::Low, UNUSED, UNUSED, UNUSED, samples);
encoded_bits += 1;
}
SpiDataMode::Dual => {
self.encode_data(Bit::Low, Bit::Low, UNUSED, UNUSED, samples);
encoded_bits += 2;
}
SpiDataMode::Quad => {
self.encode_data(Bit::Low, Bit::Low, Bit::Low, Bit::Low, samples);
encoded_bits += 4;
}
}
}
Ok(())
}
fn encode_words(
&mut self,
words: &[u8],
samples: &mut Vec<u8>,
) -> Result<(), SpiTransferEncodeError> {
let clk_idle = Bit::from(self.config.cpol);
let bytes_per_word = self.config.bits_per_word.div_ceil(8) as usize;
for word in words.chunks(bytes_per_word) {
if !self.first_word {
for _ in 0..(self.delays.inter_word_delay * 2) {
samples.push(self.sample(UNUSED, UNUSED, UNUSED, UNUSED, clk_idle, CS_LOW))
}
}
self.encode_word(word, samples)?;
}
Ok(())
}
pub fn assert_cs(&mut self, assert: bool, samples: &mut Vec<u8>) {
if (assert && self.cs_asserted) || (!assert && !self.cs_asserted) {
return;
}
self.cs_asserted = assert;
let clk_idle = Bit::from(self.config.cpol);
let cs_low = self.sample(UNUSED, UNUSED, UNUSED, UNUSED, clk_idle, CS_LOW);
if assert {
let wait_samples = match self.config.cpha {
true => self.delays.cs_hold_delay * 2 + 1,
false => self.delays.cs_hold_delay * 2,
};
for _ in 0..wait_samples {
samples.push(cs_low);
}
self.first_word = true;
} else {
let wait_samples = match self.config.cpha {
true => self.delays.cs_release_delay * 2,
false => self.delays.cs_release_delay * 2 + 1,
};
for _ in 0..wait_samples {
samples.push(cs_low);
}
samples.push(self.sample(UNUSED, UNUSED, UNUSED, UNUSED, clk_idle, CS_HIGH));
}
}
pub fn encode_read(
&mut self,
words: usize,
samples: &mut Vec<u8>,
) -> Result<(), SpiTransferEncodeError> {
self.encode_words(&vec![0; words], samples)
}
pub fn encode_write(
&mut self,
data: &[u8],
samples: &mut Vec<u8>,
) -> Result<(), SpiTransferEncodeError> {
self.encode_words(data, samples)
}
pub fn encode_transaction(
&mut self,
data: &[u8],
samples: &mut Vec<u8>,
) -> Result<(), SpiTransferEncodeError> {
self.assert_cs(true, samples);
self.encode_words(data, samples)?;
self.assert_cs(false, samples);
Ok(())
}
}
#[derive(Clone, Debug)]
struct Sample<const D0: u8, const D1: u8, const D2: u8, const D3: u8, const CLK: u8, const CS: u8> {
raw: u8,
}
impl<const D0: u8, const D1: u8, const D2: u8, const D3: u8, const CLK: u8, const CS: u8>
Sample<D0, D1, D2, D3, CLK, CS>
{
fn d0(&self) -> Bit {
((self.raw >> D0) & 0x01).into()
}
fn d1(&self) -> Bit {
((self.raw >> D1) & 0x01).into()
}
fn d2(&self) -> Bit {
((self.raw >> D2) & 0x01).into()
}
fn d3(&self) -> Bit {
((self.raw >> D3) & 0x01).into()
}
fn clk(&self) -> Bit {
((self.raw >> CLK) & 0x01).into()
}
fn cs(&self) -> Bit {
((self.raw >> CS) & 0x01).into()
}
}
#[derive(Error, Debug, PartialEq, Serialize, Deserialize)]
pub enum SpiTransferDecodeError {
#[error("Settings mismatch: Clock level when idle is {0:?}, but cpol expects {1:?}")]
ClockPolarityMismatch(Bit, Bit),
#[error("Chip select was de-asserted while a SPI transaction was in progress")]
ChipSelectDeassertedEarly,
#[error("Not enough samples were given to complete the SPI transaction")]
UnfinishedTransaction,
}
pub struct SpiBitbangDecoder<
const D0: u8,
const D1: u8,
const D2: u8,
const D3: u8,
const CLK: u8,
const CS: u8,
> {
pub config: SpiBitbangConfig,
pub endpoint: SpiEndpoint,
}
impl<const D0: u8, const D1: u8, const D2: u8, const D3: u8, const CLK: u8, const CS: u8>
SpiBitbangDecoder<D0, D1, D2, D3, CLK, CS>
{
pub fn new(config: SpiBitbangConfig, endpoint: SpiEndpoint) -> Self {
Self { config, endpoint }
}
pub fn set_data_mode(&mut self, mode: SpiDataMode) {
self.config.data_mode = mode;
}
fn wait_cs<I>(&self, samples: &mut Peekable<I>) -> Result<bool, SpiTransferDecodeError>
where
I: Iterator<Item = Sample<D0, D1, D2, D3, CLK, CS>>,
{
let samples = samples.by_ref();
let clk_idle_level = Bit::from(self.config.cpol);
while let Some(sample) = samples.peek() {
if sample.cs() != Bit::Low {
samples.next();
continue;
}
return if sample.clk() == clk_idle_level {
Ok(true)
} else {
Err(SpiTransferDecodeError::ClockPolarityMismatch(
sample.clk(),
clk_idle_level,
))
};
}
Ok(false)
}
fn sample_on_edge<I>(
&self,
samples: &mut I,
first_edge: bool,
) -> Result<Option<Sample<D0, D1, D2, D3, CLK, CS>>, SpiTransferDecodeError>
where
I: Iterator<Item = Sample<D0, D1, D2, D3, CLK, CS>>,
{
let (wait_level, sample_level) = if self.config.cpol == self.config.cpha {
(Bit::Low, Bit::High)
} else {
(Bit::High, Bit::Low)
};
let mut last_sample = None;
for level in [wait_level, sample_level] {
let Some(sample) = samples
.by_ref()
.find(|sample| sample.clk() == level || sample.cs() == Bit::High)
else {
if !first_edge {
return Err(SpiTransferDecodeError::UnfinishedTransaction);
}
return Ok(None);
};
if sample.cs() == Bit::High {
if !first_edge {
return Err(SpiTransferDecodeError::ChipSelectDeassertedEarly);
}
return Ok(None);
}
last_sample = Some(sample);
}
Ok(last_sample)
}
fn decode_word<I>(&self, samples: &mut I) -> Result<Vec<u8>, SpiTransferDecodeError>
where
I: Iterator<Item = Sample<D0, D1, D2, D3, CLK, CS>>,
{
let bytes_per_word = self.config.bits_per_word.div_ceil(8) as usize;
let mut byte: u8 = 0x00;
let mut decoded_bits = 0u32;
let mut word: Vec<u8> = Vec::with_capacity(bytes_per_word);
while decoded_bits < self.config.bits_per_word {
let Some(sample) = self.sample_on_edge(samples, decoded_bits == 0)? else {
break;
};
match self.config.data_mode {
SpiDataMode::Single => {
byte <<= 1;
byte |= match self.endpoint {
SpiEndpoint::Device => sample.d0() as u8,
SpiEndpoint::Host => sample.d1() as u8,
};
decoded_bits += 1;
}
SpiDataMode::Dual => {
byte <<= 1;
byte |= sample.d1() as u8;
byte <<= 1;
byte |= sample.d0() as u8;
decoded_bits += 2;
}
SpiDataMode::Quad => {
byte <<= 1;
byte |= sample.d3() as u8;
byte <<= 1;
byte |= sample.d2() as u8;
byte <<= 1;
byte |= sample.d1() as u8;
byte <<= 1;
byte |= sample.d0() as u8;
decoded_bits += 4;
}
}
if decoded_bits % 8 == 0 {
word.push(byte);
byte = 0x00;
}
}
if decoded_bits % 8 != 0 {
byte <<= 8 - (decoded_bits % 8);
word.push(byte);
}
Ok(word)
}
pub fn run(&self, samples: Vec<u8>) -> Result<Vec<u8>, SpiTransferDecodeError> {
let mut samples = samples
.into_iter()
.map(|raw| Sample::<D0, D1, D2, D3, CLK, CS> { raw })
.peekable();
let mut bytes = Vec::new();
if !self.wait_cs(&mut samples)? {
return Ok(bytes);
}
loop {
let word = self.decode_word(&mut samples)?;
if word.is_empty() && !self.wait_cs(&mut samples)? {
break;
}
bytes.extend(word);
}
Ok(bytes)
}
}
#[cfg(test)]
mod test {
use super::*;
use anyhow::Result;
use std::cmp::Ordering;
fn spi_encode_decode(
config: SpiBitbangConfig,
delays: SpiEncodingDelays,
data: Option<&[u8]>,
) -> Result<()> {
let bytes_per_word = config.bits_per_word.div_ceil(8) as usize;
let mut encoder = SpiBitbangEncoder::<0, 1, 2, 3, 4, 5>::new(config.clone(), delays);
let decoder =
SpiBitbangDecoder::<0, 1, 2, 3, 4, 5>::new(config.clone(), SpiEndpoint::Device);
let mut data = Vec::from(data.unwrap_or(b"Hello, this is a simple SPI test message."));
while data.len() % bytes_per_word != 0 {
data.push(0);
}
let mut samples = Vec::new();
encoder.encode_transaction(&data, &mut samples)?;
assert!(!samples.is_empty());
let decoded = decoder
.run(samples)
.expect("Should have decoded the bitbanged message");
assert_eq!(decoded, data);
Ok(())
}
#[test]
fn smoke() -> Result<()> {
let config = SpiBitbangConfig {
cpol: false,
cpha: false,
data_mode: SpiDataMode::Single,
bits_per_word: 8,
};
let delays = SpiEncodingDelays {
inter_word_delay: 0,
cs_hold_delay: 1,
cs_release_delay: 1,
};
spi_encode_decode(config.clone(), delays.clone(), None)?;
spi_encode_decode(config.clone(), delays.clone(), Some(b"abc def GHI JKL"))?;
spi_encode_decode(config.clone(), delays.clone(), Some(b"12345678"))?;
let mut encoder = SpiBitbangEncoder::<2, 3, 4, 5, 0, 1>::new(config, delays);
let bytes = [0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF];
let mut samples = Vec::new();
encoder.encode_transaction(&bytes, &mut samples)?;
let expected = [
0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 4, 5, 0, 1, 0, 1, 4, 5, 0, 1, 0, 1, 0,
1, 4, 5, 4, 5, 0, 1, 4, 5, 0, 1, 0, 1, 0, 1, 4, 5, 0, 1, 4, 5, 0, 1, 4, 5, 4, 5, 0, 1,
0, 1, 4, 5, 4, 5, 4, 5, 4, 5, 0, 1, 0, 1, 0, 1, 4, 5, 0, 1, 0, 1, 4, 5, 4, 5, 0, 1, 4,
5, 0, 1, 4, 5, 0, 1, 4, 5, 4, 5, 4, 5, 4, 5, 0, 1, 0, 1, 4, 5, 4, 5, 0, 1, 4, 5, 4, 5,
4, 5, 4, 5, 0, 1, 4, 5, 4, 5, 4, 5, 4, 5, 0, 0, 0, 2,
];
assert_eq!(samples, expected);
Ok(())
}
#[test]
fn communication_modes() -> Result<()> {
for config_bitmap in 0..32 {
let config = SpiBitbangConfig {
cpol: config_bitmap & 0x01 > 0,
cpha: (config_bitmap >> 1) & 0x01 > 0,
data_mode: SpiDataMode::Single,
bits_per_word: 8,
};
let delays = SpiEncodingDelays {
inter_word_delay: (config_bitmap >> 2) & 0x01,
cs_hold_delay: (config_bitmap >> 3) & 0x01,
cs_release_delay: (config_bitmap >> 4) & 0x01,
};
spi_encode_decode(config.clone(), delays.clone(), None)?;
spi_encode_decode(config, delays, Some(b"communication mode test message"))?;
}
let tests = [
((false, false), vec![4, 5, 0, 1]),
((false, true), vec![0, 5, 4, 1]),
((true, false), vec![5, 4, 1, 0]),
((true, true), vec![1, 4, 5, 0]),
];
let delays = SpiEncodingDelays {
inter_word_delay: 0,
cs_hold_delay: 0,
cs_release_delay: 0,
};
for ((cpol, cpha), expected) in tests {
let config = SpiBitbangConfig {
cpol,
cpha,
data_mode: SpiDataMode::Single,
bits_per_word: 8,
};
let mut samples = Vec::new();
SpiBitbangEncoder::<2, 3, 4, 5, 0, 1>::new(config, delays.clone())
.encode_transaction(&[0xA5], &mut samples)?;
assert_eq!(samples[..4], expected);
}
Ok(())
}
#[test]
fn data_modes() -> Result<()> {
let delays = SpiEncodingDelays {
inter_word_delay: 0,
cs_hold_delay: 1,
cs_release_delay: 1,
};
for data_mode in [SpiDataMode::Single, SpiDataMode::Dual, SpiDataMode::Quad] {
let config = SpiBitbangConfig {
cpol: false,
cpha: false,
data_mode,
bits_per_word: 8,
};
spi_encode_decode(config.clone(), delays.clone(), None)?;
spi_encode_decode(
config,
delays.clone(),
Some(b"A slightly longer message that can be used to test SPI data modes"),
)?;
}
let delays = SpiEncodingDelays {
inter_word_delay: 0,
cs_hold_delay: 0,
cs_release_delay: 0,
};
let tests = [
(SpiDataMode::Single, 64 * 2 + 2),
(SpiDataMode::Dual, (64 / 2) * 2 + 2),
(SpiDataMode::Quad, (64 / 4) * 2 + 2),
];
for (data_mode, expected_half_clocks) in tests {
let config = SpiBitbangConfig {
cpol: false,
cpha: false,
data_mode,
bits_per_word: 8,
};
let mut samples = Vec::new();
SpiBitbangEncoder::<2, 3, 4, 5, 0, 1>::new(config, delays.clone()).encode_transaction(
&[0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF],
&mut samples,
)?;
assert_eq!(samples.len(), expected_half_clocks);
}
Ok(())
}
#[test]
fn bits_per_word() -> Result<()> {
let delays = SpiEncodingDelays {
inter_word_delay: 0,
cs_hold_delay: 1,
cs_release_delay: 1,
};
for bits_per_word in 1..=64 {
let config = SpiBitbangConfig {
cpol: false,
cpha: false,
data_mode: SpiDataMode::Single,
bits_per_word,
};
let mut bytes = (0..64).map(|b: u8| b.reverse_bits()).collect::<Vec<u8>>();
let mut bits_in_current_word = 0;
for byte in bytes.iter_mut() {
let mask_size = bits_per_word - bits_in_current_word;
match mask_size.cmp(&8) {
Ordering::Greater => {
bits_in_current_word += 8;
}
Ordering::Equal => {
bits_in_current_word = 0;
}
Ordering::Less => {
let zero_mask = (0x01 << (8 - mask_size)) - 1;
*byte &= !zero_mask;
bits_in_current_word = 0;
}
}
}
spi_encode_decode(config, delays.clone(), Some(&bytes))?;
}
Ok(())
}
#[test]
fn encoding_delays() -> Result<()> {
let tests = [
(
(0, 0, 0),
vec![4, 5, 0, 1, 4, 5, 0, 1, 0, 1, 4, 5, 0, 1, 4, 5, 0, 2],
),
(
(3, 0, 0),
vec![
4, 5, 0, 1, 4, 5, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 4, 5, 0, 1, 4, 5, 0, 2,
],
),
(
(0, 3, 0),
vec![
0, 0, 0, 0, 0, 0, 4, 5, 0, 1, 4, 5, 0, 1, 0, 1, 4, 5, 0, 1, 4, 5, 0, 2,
],
),
(
(0, 0, 3),
vec![
4, 5, 0, 1, 4, 5, 0, 1, 0, 1, 4, 5, 0, 1, 4, 5, 0, 0, 0, 0, 0, 0, 0, 2,
],
),
(
(1, 2, 3),
vec![
0, 0, 0, 0, 4, 5, 0, 1, 4, 5, 0, 1, 0, 0, 0, 1, 4, 5, 0, 1, 4, 5, 0, 0, 0, 0,
0, 0, 0, 2,
],
),
];
for ((inter_word_delay, cs_hold_delay, cs_release_delay), expected) in tests {
let delays = SpiEncodingDelays {
inter_word_delay,
cs_hold_delay,
cs_release_delay,
};
for cpha in [false, true] {
let config = SpiBitbangConfig {
cpol: false,
cpha,
data_mode: SpiDataMode::Single,
bits_per_word: 8,
};
spi_encode_decode(config, delays.clone(), None)?;
}
let config = SpiBitbangConfig {
cpol: false,
cpha: false,
data_mode: SpiDataMode::Single,
bits_per_word: 4,
};
let mut samples = Vec::new();
SpiBitbangEncoder::<2, 3, 4, 5, 0, 1>::new(config, delays)
.encode_transaction(&[0xA << 4, 0x5 << 4], &mut samples)?;
assert_eq!(samples, expected);
}
Ok(())
}
#[test]
fn decoding_endpoints() -> Result<()> {
let config = SpiBitbangConfig {
cpol: false,
cpha: false,
data_mode: SpiDataMode::Single,
bits_per_word: 8,
};
let delays = SpiEncodingDelays {
inter_word_delay: 0,
cs_hold_delay: 1,
cs_release_delay: 1,
};
let bytes = [0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF];
let mut samples = Vec::new();
SpiBitbangEncoder::<0, 1, 2, 3, 4, 5>::new(config.clone(), delays.clone())
.encode_transaction(&bytes, &mut samples)?;
let decoder =
SpiBitbangDecoder::<0, 1, 2, 3, 4, 5>::new(config.clone(), SpiEndpoint::Device);
let mut decoded = decoder
.run(samples.clone())
.expect("Should have decoded the bitbanged message");
assert_eq!(decoded, &bytes);
let decoder = SpiBitbangDecoder::<1, 0, 2, 3, 4, 5>::new(config.clone(), SpiEndpoint::Host);
decoded = decoder
.run(samples)
.expect("Should have decoded the bitbanged message");
assert_eq!(decoded, &bytes);
Ok(())
}
#[test]
fn encode_host_reads() -> Result<()> {
let mut encoder = SpiBitbangEncoder::<2, 3, 4, 5, 0, 1>::new(
SpiBitbangConfig {
cpol: false,
cpha: false,
data_mode: SpiDataMode::Single,
bits_per_word: 8,
},
SpiEncodingDelays {
inter_word_delay: 0,
cs_hold_delay: 0,
cs_release_delay: 0,
},
);
let mut samples = Vec::new();
encoder.assert_cs(true, &mut samples);
encoder.encode_read(5, &mut samples)?;
encoder.assert_cs(false, &mut samples);
let mut expected = vec![0];
expected.append(&mut [1, 0].repeat(8 * 5)); expected.push(2); assert_eq!(samples, expected);
Ok(())
}
#[test]
fn encode_cs_assertions() -> Result<()> {
let mut encoder = SpiBitbangEncoder::<2, 3, 4, 5, 0, 1>::new(
SpiBitbangConfig {
cpol: false,
cpha: true,
data_mode: SpiDataMode::Single,
bits_per_word: 8,
},
SpiEncodingDelays {
inter_word_delay: 0,
cs_hold_delay: 0,
cs_release_delay: 0,
},
);
let mut samples = vec![];
encoder.assert_cs(false, &mut samples);
assert_eq!(samples.len(), 0);
assert_eq!(
encoder.encode_write(&[0], &mut samples),
Err(SpiTransferEncodeError::CsNotAsserted)
);
assert_eq!(
encoder.encode_read(1, &mut samples),
Err(SpiTransferEncodeError::CsNotAsserted)
);
assert_eq!(samples.len(), 0);
encoder.assert_cs(true, &mut samples);
let mut sample_len = samples.len();
assert!(sample_len > 0);
encoder.assert_cs(true, &mut samples);
assert_eq!(samples.len(), sample_len);
encoder.assert_cs(false, &mut samples);
assert!(samples.len() > sample_len);
sample_len = samples.len();
encoder.assert_cs(false, &mut samples);
assert_eq!(samples.len(), sample_len);
Ok(())
}
#[test]
fn spi_decoding_errors() -> Result<()> {
assert_eq!(
SpiBitbangDecoder::<2, 3, 4, 5, 0, 1>::new(
SpiBitbangConfig {
cpol: false,
cpha: false,
data_mode: SpiDataMode::Single,
bits_per_word: 2,
},
SpiEndpoint::Device
)
.run(vec![3, 1, 0, 1, 0, 1, 0, 2]),
Err(SpiTransferDecodeError::ClockPolarityMismatch(
Bit::High,
Bit::Low
))
);
assert_eq!(
SpiBitbangDecoder::<2, 3, 4, 5, 0, 1>::new(
SpiBitbangConfig {
cpol: true,
cpha: false,
data_mode: SpiDataMode::Single,
bits_per_word: 2,
},
SpiEndpoint::Device
)
.run(vec![2, 0, 1, 0, 1, 0, 1, 3]),
Err(SpiTransferDecodeError::ClockPolarityMismatch(
Bit::Low,
Bit::High
))
);
let valid_samples = vec![4, 5, 0, 1, 4, 5, 0, 1, 0, 1, 4, 5, 0, 1, 4, 5, 0, 2];
let decoder = SpiBitbangDecoder::<2, 3, 4, 5, 0, 1>::new(
SpiBitbangConfig {
cpol: false,
cpha: false,
data_mode: SpiDataMode::Single,
bits_per_word: 8,
},
SpiEndpoint::Device,
);
assert!(decoder.run(valid_samples.clone()).is_ok());
for index in 1..(valid_samples.len() - 2) {
let mut invalid_samples = valid_samples.clone();
invalid_samples[index] |= 0x01 << 1;
assert_eq!(
decoder.run(invalid_samples),
Err(SpiTransferDecodeError::ChipSelectDeassertedEarly)
);
}
for index in 2..(valid_samples.len() - 2) {
let mut invalid_samples = valid_samples.clone();
invalid_samples.truncate(index);
assert_eq!(
decoder.run(invalid_samples),
Err(SpiTransferDecodeError::UnfinishedTransaction)
)
}
Ok(())
}
}