Skip to main content

mavlink_core/connection/direct_serial/
sync.rs

1//! Serial MAVLINK connection
2
3use crate::Connectable;
4use crate::connection::{Connection, MavConnection};
5use crate::connection_shared::{
6    ConnectionState, next_atomic_send_header, read_message, read_raw_message, write_message,
7    write_raw_message,
8};
9use crate::error::{MessageReadError, MessageWriteError};
10use crate::peek_reader::PeekReader;
11use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message};
12use core::ops::DerefMut;
13use core::sync::atomic::AtomicU8;
14use std::io::{self, BufReader};
15use std::sync::Mutex;
16use std::time::Duration;
17
18use serialport::{DataBits, FlowControl, Parity, SerialPort, StopBits};
19
20#[cfg(feature = "mav2-message-signing")]
21use crate::SigningConfig;
22
23use super::config::SerialConfig;
24
25pub struct SerialConnection {
26    // Separate ports for reading and writing as it's safe to use concurrently.
27    // See the official ref: https://github.com/serialport/serialport-rs/blob/321f85e1886eaa1302aef8a600a631bc1c88703a/examples/duplex.rs
28    read_port: Mutex<PeekReader<BufReader<Box<dyn SerialPort>>>>,
29    write_port: Mutex<Box<dyn SerialPort>>,
30    sequence: AtomicU8,
31    state: ConnectionState,
32}
33
34impl<M: Message> MavConnection<M> for SerialConnection {
35    fn recv(&self) -> Result<(MavHeader, M), MessageReadError> {
36        let mut port = self.read_port.lock().unwrap();
37
38        loop {
39            let result = read_message::<M, _>(port.deref_mut(), &self.state);
40            match result {
41                ok @ Ok(..) => {
42                    return ok;
43                }
44                Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
45                    return Err(MessageReadError::Io(e));
46                }
47                _ => {}
48            }
49        }
50    }
51
52    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, MessageReadError> {
53        let mut port = self.read_port.lock().unwrap();
54
55        loop {
56            let result = read_raw_message::<M, _>(port.deref_mut(), &self.state);
57            match result {
58                ok @ Ok(..) => {
59                    return ok;
60                }
61                Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
62                    return Err(MessageReadError::Io(e));
63                }
64                _ => {}
65            }
66        }
67    }
68
69    fn try_recv(&self) -> Result<(MavHeader, M), MessageReadError> {
70        let mut port = self.read_port.lock().unwrap();
71        read_message::<M, _>(port.deref_mut(), &self.state)
72    }
73
74    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, MessageWriteError> {
75        let mut port = self.write_port.lock().unwrap();
76
77        let header = next_atomic_send_header(&self.sequence, header);
78        write_message(port.deref_mut(), &self.state, header, data)
79    }
80
81    fn send_raw(&self, data: &MAVLinkMessageRaw) -> Result<usize, MessageWriteError> {
82        let mut port = self.write_port.lock().unwrap();
83        write_raw_message(port.deref_mut(), data)
84    }
85
86    fn set_protocol_version(&mut self, version: MavlinkVersion) {
87        self.state.set_protocol_version(version);
88    }
89
90    fn protocol_version(&self) -> MavlinkVersion {
91        self.state.protocol_version()
92    }
93
94    fn set_allow_recv_any_version(&mut self, allow: bool) {
95        self.state.set_allow_recv_any_version(allow);
96    }
97
98    fn allow_recv_any_version(&self) -> bool {
99        self.state.allow_recv_any_version()
100    }
101
102    #[cfg(feature = "mav2-message-signing")]
103    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
104        self.state.setup_signing(signing_data);
105    }
106}
107
108impl Connectable for SerialConfig {
109    fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
110        let read_port = serialport::new(&self.port_name, self.baud_rate)
111            .data_bits(DataBits::Eight)
112            .parity(Parity::None)
113            .stop_bits(StopBits::One)
114            .flow_control(FlowControl::None)
115            .timeout(Duration::from_millis(1))
116            .open()?;
117
118        let write_port = read_port.try_clone()?;
119
120        let read_buffer_capacity = self.buffer_capacity();
121        let buf_reader = BufReader::with_capacity(read_buffer_capacity, read_port);
122
123        Ok(SerialConnection {
124            read_port: Mutex::new(PeekReader::new(buf_reader)),
125            write_port: Mutex::new(write_port),
126            sequence: AtomicU8::new(0),
127            state: ConnectionState::new(),
128        }
129        .into())
130    }
131}