mavlink_core/connection/
direct_serial.rs

1//! Serial MAVLINK connection
2
3use crate::connection::MavConnection;
4use crate::error::{MessageReadError, MessageWriteError};
5use crate::peek_reader::PeekReader;
6use crate::Connectable;
7use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message, ReadVersion};
8use core::ops::DerefMut;
9use core::sync::atomic::{self, AtomicU8};
10use std::io;
11use std::sync::Mutex;
12
13use serialport::{DataBits, FlowControl, Parity, SerialPort, StopBits};
14
15#[cfg(not(feature = "signing"))]
16use crate::{read_raw_versioned_msg, read_versioned_msg, write_versioned_msg};
17#[cfg(feature = "signing")]
18use crate::{
19    read_raw_versioned_msg_signed, read_versioned_msg_signed, write_versioned_msg_signed,
20    SigningConfig, SigningData,
21};
22
23pub mod config;
24
25use config::SerialConfig;
26
27pub struct SerialConnection {
28    // Separate ports for reading and writing as it's safe to use concurrently.
29    // See the official ref: https://github.com/serialport/serialport-rs/blob/321f85e1886eaa1302aef8a600a631bc1c88703a/examples/duplex.rs
30    read_port: Mutex<PeekReader<Box<dyn SerialPort>>>,
31    write_port: Mutex<Box<dyn SerialPort>>,
32    sequence: AtomicU8,
33    protocol_version: MavlinkVersion,
34    recv_any_version: bool,
35    #[cfg(feature = "signing")]
36    signing_data: Option<SigningData>,
37}
38
39impl<M: Message> MavConnection<M> for SerialConnection {
40    fn recv(&self) -> Result<(MavHeader, M), MessageReadError> {
41        let mut port = self.read_port.lock().unwrap();
42        loop {
43            let version = ReadVersion::from_conn_cfg::<_, M>(self);
44            #[cfg(not(feature = "signing"))]
45            let result = read_versioned_msg(port.deref_mut(), version);
46            #[cfg(feature = "signing")]
47            let result =
48                read_versioned_msg_signed(port.deref_mut(), version, self.signing_data.as_ref());
49            match result {
50                ok @ Ok(..) => {
51                    return ok;
52                }
53                Err(MessageReadError::Io(e)) => {
54                    if e.kind() == io::ErrorKind::UnexpectedEof {
55                        return Err(MessageReadError::Io(e));
56                    }
57                }
58                _ => {}
59            }
60        }
61    }
62
63    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, MessageReadError> {
64        let mut port = self.read_port.lock().unwrap();
65        loop {
66            let version = ReadVersion::from_conn_cfg::<_, M>(self);
67            #[cfg(not(feature = "signing"))]
68            let result = read_raw_versioned_msg::<M, _>(port.deref_mut(), version);
69            #[cfg(feature = "signing")]
70            let result = read_raw_versioned_msg_signed::<M, _>(
71                port.deref_mut(),
72                version,
73                self.signing_data.as_ref(),
74            );
75            match result {
76                ok @ Ok(..) => {
77                    return ok;
78                }
79                Err(MessageReadError::Io(e)) => {
80                    if e.kind() == io::ErrorKind::UnexpectedEof {
81                        return Err(MessageReadError::Io(e));
82                    }
83                }
84                _ => {}
85            }
86        }
87    }
88
89    fn try_recv(&self) -> Result<(MavHeader, M), MessageReadError> {
90        let mut port = self.read_port.lock().unwrap();
91        let version = ReadVersion::from_conn_cfg::<_, M>(self);
92
93        #[cfg(not(feature = "signing"))]
94        let result = read_versioned_msg(port.deref_mut(), version);
95
96        #[cfg(feature = "signing")]
97        let result =
98            read_versioned_msg_signed(port.deref_mut(), version, self.signing_data.as_ref());
99
100        result
101    }
102
103    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, MessageWriteError> {
104        let mut port = self.write_port.lock().unwrap();
105
106        let sequence = self.sequence.fetch_add(
107            1,
108            // Safety:
109            //
110            // We are using `Ordering::Relaxed` here because:
111            // - We only need a unique sequence number per message
112            // - `Mutex` on `self.write_port` already makes sure the rest of the code is synchronized
113            // - No other thread reads or writes `self.sequence` without going through this `Mutex`
114            //
115            // Warning:
116            //
117            // If we later change this code to access `self.sequence` without locking `self.write_port` with the `Mutex`,
118            // then we should upgrade this ordering to `Ordering::SeqCst`.
119            atomic::Ordering::Relaxed,
120        );
121
122        let header = MavHeader {
123            sequence,
124            system_id: header.system_id,
125            component_id: header.component_id,
126        };
127
128        #[cfg(not(feature = "signing"))]
129        let result = write_versioned_msg(port.deref_mut(), self.protocol_version, header, data);
130        #[cfg(feature = "signing")]
131        let result = write_versioned_msg_signed(
132            port.deref_mut(),
133            self.protocol_version,
134            header,
135            data,
136            self.signing_data.as_ref(),
137        );
138        result
139    }
140
141    fn set_protocol_version(&mut self, version: MavlinkVersion) {
142        self.protocol_version = version;
143    }
144
145    fn protocol_version(&self) -> MavlinkVersion {
146        self.protocol_version
147    }
148
149    fn set_allow_recv_any_version(&mut self, allow: bool) {
150        self.recv_any_version = allow;
151    }
152
153    fn allow_recv_any_version(&self) -> bool {
154        self.recv_any_version
155    }
156
157    #[cfg(feature = "signing")]
158    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
159        self.signing_data = signing_data.map(SigningData::from_config);
160    }
161}
162
163impl Connectable for SerialConfig {
164    fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
165        let read_port = serialport::new(&self.port_name, self.baud_rate)
166            .data_bits(DataBits::Eight)
167            .parity(Parity::None)
168            .stop_bits(StopBits::One)
169            .flow_control(FlowControl::None)
170            .open()?;
171
172        let write_port = read_port.try_clone()?;
173
174        Ok(Box::new(SerialConnection {
175            read_port: Mutex::new(PeekReader::new(read_port)),
176            write_port: Mutex::new(write_port),
177            sequence: AtomicU8::new(0),
178            protocol_version: MavlinkVersion::V2,
179            #[cfg(feature = "signing")]
180            signing_data: None,
181            recv_any_version: false,
182        }))
183    }
184}