Skip to main content

mavlink_core/connection/
direct_serial.rs

1//! Serial MAVLINK connection
2
3use crate::Connectable;
4use crate::connection::{Connection, MavConnection};
5use crate::error::{MessageReadError, MessageWriteError};
6use crate::peek_reader::PeekReader;
7use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message, ReadVersion};
8use core::ops::DerefMut;
9use core::sync::atomic::{self, AtomicU8};
10use std::io::{self, BufReader};
11use std::sync::Mutex;
12
13use serialport::{DataBits, FlowControl, Parity, SerialPort, StopBits};
14
15#[cfg(feature = "mav2-message-signing")]
16use crate::{
17    SigningConfig, SigningData, read_versioned_msg_signed, read_versioned_raw_message_signed,
18    write_versioned_msg_signed,
19};
20#[cfg(not(feature = "mav2-message-signing"))]
21use crate::{read_versioned_msg, read_versioned_raw_message, write_versioned_msg};
22
23pub mod config;
24
25use config::SerialConfig;
26
27pub struct SerialConnection {
28    // Separate ports for reading and writing as it's safe to use concurrently.
29    // See the official ref: https://github.com/serialport/serialport-rs/blob/321f85e1886eaa1302aef8a600a631bc1c88703a/examples/duplex.rs
30    read_port: Mutex<PeekReader<BufReader<Box<dyn SerialPort>>>>,
31    write_port: Mutex<Box<dyn SerialPort>>,
32    sequence: AtomicU8,
33    protocol_version: MavlinkVersion,
34    recv_any_version: bool,
35    #[cfg(feature = "mav2-message-signing")]
36    signing_data: Option<SigningData>,
37}
38
39impl<M: Message> MavConnection<M> for SerialConnection {
40    fn recv(&self) -> Result<(MavHeader, M), MessageReadError> {
41        let mut port = self.read_port.lock().unwrap();
42        let version = ReadVersion::from_conn_cfg::<_, M>(self);
43
44        loop {
45            #[cfg(not(feature = "mav2-message-signing"))]
46            let result = read_versioned_msg(port.deref_mut(), version);
47            #[cfg(feature = "mav2-message-signing")]
48            let result =
49                read_versioned_msg_signed(port.deref_mut(), version, self.signing_data.as_ref());
50            match result {
51                ok @ Ok(..) => {
52                    return ok;
53                }
54                Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
55                    return Err(MessageReadError::Io(e));
56                }
57                _ => {}
58            }
59        }
60    }
61
62    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, MessageReadError> {
63        let mut port = self.read_port.lock().unwrap();
64        let version = ReadVersion::from_conn_cfg::<_, M>(self);
65
66        loop {
67            #[cfg(not(feature = "mav2-message-signing"))]
68            let result = read_versioned_raw_message::<M, _>(port.deref_mut(), version);
69            #[cfg(feature = "mav2-message-signing")]
70            let result = read_versioned_raw_message_signed::<M, _>(
71                port.deref_mut(),
72                version,
73                self.signing_data.as_ref(),
74            );
75            match result {
76                ok @ Ok(..) => {
77                    return ok;
78                }
79                Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
80                    return Err(MessageReadError::Io(e));
81                }
82                _ => {}
83            }
84        }
85    }
86
87    fn try_recv(&self) -> Result<(MavHeader, M), MessageReadError> {
88        let mut port = self.read_port.lock().unwrap();
89        let version = ReadVersion::from_conn_cfg::<_, M>(self);
90
91        #[cfg(not(feature = "mav2-message-signing"))]
92        let result = read_versioned_msg(port.deref_mut(), version);
93
94        #[cfg(feature = "mav2-message-signing")]
95        let result =
96            read_versioned_msg_signed(port.deref_mut(), version, self.signing_data.as_ref());
97
98        result
99    }
100
101    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, MessageWriteError> {
102        let mut port = self.write_port.lock().unwrap();
103
104        let sequence = self.sequence.fetch_add(
105            1,
106            // Safety:
107            //
108            // We are using `Ordering::Relaxed` here because:
109            // - We only need a unique sequence number per message
110            // - `Mutex` on `self.write_port` already makes sure the rest of the code is synchronized
111            // - No other thread reads or writes `self.sequence` without going through this `Mutex`
112            //
113            // Warning:
114            //
115            // If we later change this code to access `self.sequence` without locking `self.write_port` with the `Mutex`,
116            // then we should upgrade this ordering to `Ordering::SeqCst`.
117            atomic::Ordering::Relaxed,
118        );
119
120        let header = MavHeader {
121            sequence,
122            system_id: header.system_id,
123            component_id: header.component_id,
124        };
125
126        #[cfg(not(feature = "mav2-message-signing"))]
127        let result = write_versioned_msg(port.deref_mut(), self.protocol_version, header, data);
128        #[cfg(feature = "mav2-message-signing")]
129        let result = write_versioned_msg_signed(
130            port.deref_mut(),
131            self.protocol_version,
132            header,
133            data,
134            self.signing_data.as_ref(),
135        );
136        result
137    }
138
139    fn set_protocol_version(&mut self, version: MavlinkVersion) {
140        self.protocol_version = version;
141    }
142
143    fn protocol_version(&self) -> MavlinkVersion {
144        self.protocol_version
145    }
146
147    fn set_allow_recv_any_version(&mut self, allow: bool) {
148        self.recv_any_version = allow;
149    }
150
151    fn allow_recv_any_version(&self) -> bool {
152        self.recv_any_version
153    }
154
155    #[cfg(feature = "mav2-message-signing")]
156    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
157        self.signing_data = signing_data.map(SigningData::from_config);
158    }
159}
160
161impl Connectable for SerialConfig {
162    fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
163        let read_port = serialport::new(&self.port_name, self.baud_rate)
164            .data_bits(DataBits::Eight)
165            .parity(Parity::None)
166            .stop_bits(StopBits::One)
167            .flow_control(FlowControl::None)
168            .open()?;
169
170        let write_port = read_port.try_clone()?;
171
172        let read_buffer_capacity = self.buffer_capacity();
173        let buf_reader = BufReader::with_capacity(read_buffer_capacity, read_port);
174
175        Ok(SerialConnection {
176            read_port: Mutex::new(PeekReader::new(buf_reader)),
177            write_port: Mutex::new(write_port),
178            sequence: AtomicU8::new(0),
179            protocol_version: MavlinkVersion::V2,
180            #[cfg(feature = "mav2-message-signing")]
181            signing_data: None,
182            recv_any_version: false,
183        }
184        .into())
185    }
186}