Skip to main content

mavlink_core/connection/direct_serial/
sync.rs

1//! Serial MAVLINK connection
2
3use crate::Connectable;
4use crate::connection::{Connection, MavConnection};
5use crate::connection_shared::{
6    ConnectionState, next_atomic_send_header, read_message, read_raw_message, write_message,
7};
8use crate::error::{MessageReadError, MessageWriteError};
9use crate::peek_reader::PeekReader;
10use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message};
11use core::ops::DerefMut;
12use core::sync::atomic::AtomicU8;
13use std::io::{self, BufReader};
14use std::sync::Mutex;
15
16use serialport::{DataBits, FlowControl, Parity, SerialPort, StopBits};
17
18#[cfg(feature = "mav2-message-signing")]
19use crate::SigningConfig;
20
21use super::config::SerialConfig;
22
23pub struct SerialConnection {
24    // Separate ports for reading and writing as it's safe to use concurrently.
25    // See the official ref: https://github.com/serialport/serialport-rs/blob/321f85e1886eaa1302aef8a600a631bc1c88703a/examples/duplex.rs
26    read_port: Mutex<PeekReader<BufReader<Box<dyn SerialPort>>>>,
27    write_port: Mutex<Box<dyn SerialPort>>,
28    sequence: AtomicU8,
29    state: ConnectionState,
30}
31
32impl<M: Message> MavConnection<M> for SerialConnection {
33    fn recv(&self) -> Result<(MavHeader, M), MessageReadError> {
34        let mut port = self.read_port.lock().unwrap();
35
36        loop {
37            let result = read_message::<M, _>(port.deref_mut(), &self.state);
38            match result {
39                ok @ Ok(..) => {
40                    return ok;
41                }
42                Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
43                    return Err(MessageReadError::Io(e));
44                }
45                _ => {}
46            }
47        }
48    }
49
50    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, MessageReadError> {
51        let mut port = self.read_port.lock().unwrap();
52
53        loop {
54            let result = read_raw_message::<M, _>(port.deref_mut(), &self.state);
55            match result {
56                ok @ Ok(..) => {
57                    return ok;
58                }
59                Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
60                    return Err(MessageReadError::Io(e));
61                }
62                _ => {}
63            }
64        }
65    }
66
67    fn try_recv(&self) -> Result<(MavHeader, M), MessageReadError> {
68        let mut port = self.read_port.lock().unwrap();
69        read_message::<M, _>(port.deref_mut(), &self.state)
70    }
71
72    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, MessageWriteError> {
73        let mut port = self.write_port.lock().unwrap();
74
75        let header = next_atomic_send_header(&self.sequence, header);
76        write_message(port.deref_mut(), &self.state, header, data)
77    }
78
79    fn set_protocol_version(&mut self, version: MavlinkVersion) {
80        self.state.set_protocol_version(version);
81    }
82
83    fn protocol_version(&self) -> MavlinkVersion {
84        self.state.protocol_version()
85    }
86
87    fn set_allow_recv_any_version(&mut self, allow: bool) {
88        self.state.set_allow_recv_any_version(allow);
89    }
90
91    fn allow_recv_any_version(&self) -> bool {
92        self.state.allow_recv_any_version()
93    }
94
95    #[cfg(feature = "mav2-message-signing")]
96    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
97        self.state.setup_signing(signing_data);
98    }
99}
100
101impl Connectable for SerialConfig {
102    fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
103        let read_port = serialport::new(&self.port_name, self.baud_rate)
104            .data_bits(DataBits::Eight)
105            .parity(Parity::None)
106            .stop_bits(StopBits::One)
107            .flow_control(FlowControl::None)
108            .open()?;
109
110        let write_port = read_port.try_clone()?;
111
112        let read_buffer_capacity = self.buffer_capacity();
113        let buf_reader = BufReader::with_capacity(read_buffer_capacity, read_port);
114
115        Ok(SerialConnection {
116            read_port: Mutex::new(PeekReader::new(buf_reader)),
117            write_port: Mutex::new(write_port),
118            sequence: AtomicU8::new(0),
119            state: ConnectionState::new(),
120        }
121        .into())
122    }
123}