Skip to main content

mavlink_core/async_connection/
direct_serial.rs

1//! Async Serial MAVLink connection
2
3use core::ops::DerefMut;
4use core::sync::atomic::{self, AtomicU8};
5use std::io;
6
7use async_trait::async_trait;
8use futures::lock::Mutex;
9use tokio::io::{BufReader, ReadHalf, WriteHalf};
10use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
11
12use super::AsyncConnectable;
13use crate::connection::direct_serial::config::SerialConfig;
14use crate::error::MessageReadError;
15use crate::MAVLinkMessageRaw;
16use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message, ReadVersion};
17
18#[cfg(not(feature = "mav2-message-signing"))]
19use crate::{
20    read_versioned_msg_async, read_versioned_raw_message_async, write_versioned_msg_async,
21};
22#[cfg(feature = "mav2-message-signing")]
23use crate::{
24    read_versioned_msg_async_signed, read_versioned_raw_message_async_signed,
25    write_versioned_msg_async_signed, SigningConfig, SigningData,
26};
27
28use super::AsyncMavConnection;
29
30pub struct AsyncSerialConnection {
31    read_port: Mutex<AsyncPeekReader<BufReader<ReadHalf<SerialStream>>>>,
32    write_port: Mutex<WriteHalf<SerialStream>>,
33    sequence: AtomicU8,
34    protocol_version: MavlinkVersion,
35    recv_any_version: bool,
36    #[cfg(feature = "mav2-message-signing")]
37    signing_data: Option<SigningData>,
38}
39
40#[async_trait::async_trait]
41impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
42    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
43        let mut port = self.read_port.lock().await;
44        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
45        loop {
46            #[cfg(not(feature = "mav2-message-signing"))]
47            let result = read_versioned_msg_async(port.deref_mut(), version).await;
48            #[cfg(feature = "mav2-message-signing")]
49            let result = read_versioned_msg_async_signed(
50                port.deref_mut(),
51                version,
52                self.signing_data.as_ref(),
53            )
54            .await;
55            match result {
56                Ok(message) => return Ok(message),
57                Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
58                    return Err(MessageReadError::Io(e));
59                }
60                _ => {}
61            }
62        }
63    }
64
65    async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
66        let mut port = self.read_port.lock().await;
67        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
68        loop {
69            #[cfg(not(feature = "mav2-message-signing"))]
70            let result = read_versioned_raw_message_async::<M, _>(port.deref_mut(), version).await;
71            #[cfg(feature = "mav2-message-signing")]
72            let result = read_versioned_raw_message_async_signed::<M, _>(
73                port.deref_mut(),
74                version,
75                self.signing_data.as_ref(),
76            )
77            .await;
78            match result {
79                Ok(message) => return Ok(message),
80                Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
81                    return Err(MessageReadError::Io(e));
82                }
83                _ => {}
84            }
85        }
86    }
87
88    async fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
89        let mut port = self.read_port.lock().await;
90        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
91
92        #[cfg(not(feature = "mav2-message-signing"))]
93        let result = read_versioned_msg_async(port.deref_mut(), version).await;
94
95        #[cfg(feature = "mav2-message-signing")]
96        let result =
97            read_versioned_msg_async_signed(port.deref_mut(), version, self.signing_data.as_ref())
98                .await;
99
100        result
101    }
102
103    async fn send(
104        &self,
105        header: &MavHeader,
106        data: &M,
107    ) -> Result<usize, crate::error::MessageWriteError> {
108        let mut port = self.write_port.lock().await;
109
110        let sequence = self.sequence.fetch_add(
111            1,
112            // Safety:
113            //
114            // We are using `Ordering::Relaxed` here because:
115            // - We only need a unique sequence number per message
116            // - `Mutex` on `self.write_port` already makes sure the rest of the code is synchronized
117            // - No other thread reads or writes `self.sequence` without going through this `Mutex`
118            //
119            // Warning:
120            //
121            // If we later change this code to access `self.sequence` without locking `self.write_port` with the `Mutex`,
122            // then we should upgrade this ordering to `Ordering::SeqCst`.
123            atomic::Ordering::Relaxed,
124        );
125
126        let header = MavHeader {
127            sequence,
128            system_id: header.system_id,
129            component_id: header.component_id,
130        };
131
132        #[cfg(not(feature = "mav2-message-signing"))]
133        let result =
134            write_versioned_msg_async(&mut *port, self.protocol_version, header, data).await;
135        #[cfg(feature = "mav2-message-signing")]
136        let result = write_versioned_msg_async_signed(
137            &mut *port,
138            self.protocol_version,
139            header,
140            data,
141            self.signing_data.as_ref(),
142        )
143        .await;
144        result
145    }
146
147    fn set_protocol_version(&mut self, version: MavlinkVersion) {
148        self.protocol_version = version;
149    }
150
151    fn protocol_version(&self) -> MavlinkVersion {
152        self.protocol_version
153    }
154
155    fn set_allow_recv_any_version(&mut self, allow: bool) {
156        self.recv_any_version = allow;
157    }
158
159    fn allow_recv_any_version(&self) -> bool {
160        self.recv_any_version
161    }
162
163    #[cfg(feature = "mav2-message-signing")]
164    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
165        self.signing_data = signing_data.map(SigningData::from_config);
166    }
167}
168
169#[async_trait]
170impl AsyncConnectable for SerialConfig {
171    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
172    where
173        M: Message + Sync + Send,
174    {
175        let mut port = tokio_serial::new(&self.port_name, self.baud_rate).open_native_async()?;
176        port.set_data_bits(tokio_serial::DataBits::Eight)?;
177        port.set_parity(tokio_serial::Parity::None)?;
178        port.set_stop_bits(tokio_serial::StopBits::One)?;
179        port.set_flow_control(tokio_serial::FlowControl::None)?;
180
181        let (reader, writer) = tokio::io::split(port);
182        let read_buffer_capacity = self.buffer_capacity();
183        let buf_reader = BufReader::with_capacity(read_buffer_capacity, reader);
184
185        Ok(Box::new(AsyncSerialConnection {
186            read_port: Mutex::new(AsyncPeekReader::new(buf_reader)),
187            write_port: Mutex::new(writer),
188            sequence: AtomicU8::new(0),
189            protocol_version: MavlinkVersion::V2,
190            recv_any_version: false,
191            #[cfg(feature = "mav2-message-signing")]
192            signing_data: None,
193        }))
194    }
195}