mavlink_core/connection/
direct_serial.rs

1//! Serial MAVLINK connection
2
3use crate::connectable::SerialConnectable;
4use crate::connection::MavConnection;
5use crate::error::{MessageReadError, MessageWriteError};
6use crate::peek_reader::PeekReader;
7use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
8use core::ops::DerefMut;
9use core::sync::atomic::{self, AtomicU8};
10use std::io;
11use std::sync::Mutex;
12
13use serialport::{DataBits, FlowControl, Parity, SerialPort, StopBits};
14
15#[cfg(not(feature = "signing"))]
16use crate::{read_versioned_msg, write_versioned_msg};
17#[cfg(feature = "signing")]
18use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
19
20use super::Connectable;
21
22pub struct SerialConnection {
23    // Separate ports for reading and writing as it's safe to use concurrently.
24    // See the official ref: https://github.com/serialport/serialport-rs/blob/321f85e1886eaa1302aef8a600a631bc1c88703a/examples/duplex.rs
25    read_port: Mutex<PeekReader<Box<dyn SerialPort>>>,
26    write_port: Mutex<Box<dyn SerialPort>>,
27    sequence: AtomicU8,
28    protocol_version: MavlinkVersion,
29    recv_any_version: bool,
30    #[cfg(feature = "signing")]
31    signing_data: Option<SigningData>,
32}
33
34impl<M: Message> MavConnection<M> for SerialConnection {
35    fn recv(&self) -> Result<(MavHeader, M), MessageReadError> {
36        let mut port = self.read_port.lock().unwrap();
37        loop {
38            let version = ReadVersion::from_conn_cfg::<_, M>(self);
39            #[cfg(not(feature = "signing"))]
40            let result = read_versioned_msg(port.deref_mut(), version);
41            #[cfg(feature = "signing")]
42            let result =
43                read_versioned_msg_signed(port.deref_mut(), version, self.signing_data.as_ref());
44            match result {
45                ok @ Ok(..) => {
46                    return ok;
47                }
48                Err(MessageReadError::Io(e)) => {
49                    if e.kind() == io::ErrorKind::UnexpectedEof {
50                        return Err(MessageReadError::Io(e));
51                    }
52                }
53                _ => {}
54            }
55        }
56    }
57
58    fn try_recv(&self) -> Result<(MavHeader, M), MessageReadError> {
59        let mut port = self.read_port.lock().unwrap();
60        let version = ReadVersion::from_conn_cfg::<_, M>(self);
61
62        #[cfg(not(feature = "signing"))]
63        let result = read_versioned_msg(port.deref_mut(), version);
64
65        #[cfg(feature = "signing")]
66        let result =
67            read_versioned_msg_signed(port.deref_mut(), version, self.signing_data.as_ref());
68
69        result
70    }
71
72    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, MessageWriteError> {
73        let mut port = self.write_port.lock().unwrap();
74
75        let sequence = self.sequence.fetch_add(
76            1,
77            // Safety:
78            //
79            // We are using `Ordering::Relaxed` here because:
80            // - We only need a unique sequence number per message
81            // - `Mutex` on `self.write_port` already makes sure the rest of the code is synchronized
82            // - No other thread reads or writes `self.sequence` without going through this `Mutex`
83            //
84            // Warning:
85            //
86            // If we later change this code to access `self.sequence` without locking `self.write_port` with the `Mutex`,
87            // then we should upgrade this ordering to `Ordering::SeqCst`.
88            atomic::Ordering::Relaxed,
89        );
90
91        let header = MavHeader {
92            sequence,
93            system_id: header.system_id,
94            component_id: header.component_id,
95        };
96
97        #[cfg(not(feature = "signing"))]
98        let result = write_versioned_msg(port.deref_mut(), self.protocol_version, header, data);
99        #[cfg(feature = "signing")]
100        let result = write_versioned_msg_signed(
101            port.deref_mut(),
102            self.protocol_version,
103            header,
104            data,
105            self.signing_data.as_ref(),
106        );
107        result
108    }
109
110    fn set_protocol_version(&mut self, version: MavlinkVersion) {
111        self.protocol_version = version;
112    }
113
114    fn protocol_version(&self) -> MavlinkVersion {
115        self.protocol_version
116    }
117
118    fn set_allow_recv_any_version(&mut self, allow: bool) {
119        self.recv_any_version = allow
120    }
121
122    fn allow_recv_any_version(&self) -> bool {
123        self.recv_any_version
124    }
125
126    #[cfg(feature = "signing")]
127    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
128        self.signing_data = signing_data.map(SigningData::from_config)
129    }
130}
131
132impl Connectable for SerialConnectable {
133    fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
134        let read_port = serialport::new(&self.port_name, self.baud_rate)
135            .data_bits(DataBits::Eight)
136            .parity(Parity::None)
137            .stop_bits(StopBits::One)
138            .flow_control(FlowControl::None)
139            .open()?;
140
141        let write_port = read_port.try_clone()?;
142
143        Ok(Box::new(SerialConnection {
144            read_port: Mutex::new(PeekReader::new(read_port)),
145            write_port: Mutex::new(write_port),
146            sequence: AtomicU8::new(0),
147            protocol_version: MavlinkVersion::V2,
148            #[cfg(feature = "signing")]
149            signing_data: None,
150            recv_any_version: false,
151        }))
152    }
153}