mavlink_core/async_connection/
direct_serial.rs

1//! Async Serial MAVLink connection
2
3use core::ops::DerefMut;
4use core::sync::atomic::{self, AtomicU8};
5use std::io;
6
7use async_trait::async_trait;
8use futures::lock::Mutex;
9use tokio::io::BufReader;
10use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
11
12use super::AsyncConnectable;
13use crate::connection::direct_serial::config::SerialConfig;
14use crate::MAVLinkMessageRaw;
15use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message, ReadVersion};
16
17#[cfg(not(feature = "signing"))]
18use crate::{
19    read_versioned_msg_async, read_versioned_raw_message_async, write_versioned_msg_async,
20};
21#[cfg(feature = "signing")]
22use crate::{
23    read_versioned_msg_async_signed, read_versioned_raw_message_async_signed,
24    write_versioned_msg_async_signed, SigningConfig, SigningData,
25};
26
27use super::AsyncMavConnection;
28
29pub struct AsyncSerialConnection {
30    port: Mutex<AsyncPeekReader<BufReader<SerialStream>>>,
31    sequence: AtomicU8,
32    protocol_version: MavlinkVersion,
33    recv_any_version: bool,
34    #[cfg(feature = "signing")]
35    signing_data: Option<SigningData>,
36}
37
38#[async_trait::async_trait]
39impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
40    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
41        let mut port = self.port.lock().await;
42        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
43        #[cfg(not(feature = "signing"))]
44        let result = read_versioned_msg_async(port.deref_mut(), version).await;
45        #[cfg(feature = "signing")]
46        let result =
47            read_versioned_msg_async_signed(port.deref_mut(), version, self.signing_data.as_ref())
48                .await;
49        result
50    }
51
52    async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
53        let mut port = self.port.lock().await;
54        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
55        #[cfg(not(feature = "signing"))]
56        let result = read_versioned_raw_message_async::<M, _>(port.deref_mut(), version).await;
57        #[cfg(feature = "signing")]
58        let result = read_versioned_raw_message_async_signed::<M, _>(
59            port.deref_mut(),
60            version,
61            self.signing_data.as_ref(),
62        )
63        .await;
64        result
65    }
66
67    async fn send(
68        &self,
69        header: &MavHeader,
70        data: &M,
71    ) -> Result<usize, crate::error::MessageWriteError> {
72        let mut port = self.port.lock().await;
73
74        let sequence = self.sequence.fetch_add(
75            1,
76            // Safety:
77            //
78            // We are using `Ordering::Relaxed` here because:
79            // - We only need a unique sequence number per message
80            // - `Mutex` on `self.port` already makes sure the rest of the code is synchronized
81            // - No other thread reads or writes `self.sequence` without going through this `Mutex`
82            //
83            // Warning:
84            //
85            // If we later change this code to access `self.sequence` without locking `self.port` with the `Mutex`,
86            // then we should upgrade this ordering to `Ordering::SeqCst`.
87            atomic::Ordering::Relaxed,
88        );
89
90        let header = MavHeader {
91            sequence,
92            system_id: header.system_id,
93            component_id: header.component_id,
94        };
95
96        #[cfg(not(feature = "signing"))]
97        let result =
98            write_versioned_msg_async(port.reader_mut(), self.protocol_version, header, data).await;
99        #[cfg(feature = "signing")]
100        let result = write_versioned_msg_async_signed(
101            port.reader_mut(),
102            self.protocol_version,
103            header,
104            data,
105            self.signing_data.as_ref(),
106        )
107        .await;
108        result
109    }
110
111    fn set_protocol_version(&mut self, version: MavlinkVersion) {
112        self.protocol_version = version;
113    }
114
115    fn protocol_version(&self) -> MavlinkVersion {
116        self.protocol_version
117    }
118
119    fn set_allow_recv_any_version(&mut self, allow: bool) {
120        self.recv_any_version = allow;
121    }
122
123    fn allow_recv_any_version(&self) -> bool {
124        self.recv_any_version
125    }
126
127    #[cfg(feature = "signing")]
128    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
129        self.signing_data = signing_data.map(SigningData::from_config);
130    }
131}
132
133#[async_trait]
134impl AsyncConnectable for SerialConfig {
135    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
136    where
137        M: Message + Sync + Send,
138    {
139        let mut port = tokio_serial::new(&self.port_name, self.baud_rate).open_native_async()?;
140        port.set_data_bits(tokio_serial::DataBits::Eight)?;
141        port.set_parity(tokio_serial::Parity::None)?;
142        port.set_stop_bits(tokio_serial::StopBits::One)?;
143        port.set_flow_control(tokio_serial::FlowControl::None)?;
144
145        let read_buffer_capacity = self.buffer_capacity();
146        let buf_reader = BufReader::with_capacity(read_buffer_capacity, port);
147
148        Ok(Box::new(AsyncSerialConnection {
149            port: Mutex::new(AsyncPeekReader::new(buf_reader)),
150            sequence: AtomicU8::new(0),
151            protocol_version: MavlinkVersion::V2,
152            recv_any_version: false,
153            #[cfg(feature = "signing")]
154            signing_data: None,
155        }))
156    }
157}