mavlink_core/async_connection/
direct_serial.rs

1//! Async Serial MAVLink connection
2
3use core::ops::DerefMut;
4use core::sync::atomic::{self, AtomicU8};
5use std::io;
6
7use async_trait::async_trait;
8use futures::lock::Mutex;
9use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
10
11use super::AsyncConnectable;
12use crate::connection::direct_serial::config::SerialConfig;
13use crate::MAVLinkMessageRaw;
14use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message, ReadVersion};
15
16#[cfg(not(feature = "signing"))]
17use crate::{
18    read_versioned_msg_async, read_versioned_raw_message_async, write_versioned_msg_async,
19};
20#[cfg(feature = "signing")]
21use crate::{
22    read_versioned_msg_async_signed, read_versioned_raw_message_async_signed,
23    write_versioned_msg_async_signed, SigningConfig, SigningData,
24};
25
26use super::AsyncMavConnection;
27
28pub struct AsyncSerialConnection {
29    port: Mutex<AsyncPeekReader<SerialStream>>,
30    sequence: AtomicU8,
31    protocol_version: MavlinkVersion,
32    recv_any_version: bool,
33    #[cfg(feature = "signing")]
34    signing_data: Option<SigningData>,
35}
36
37#[async_trait::async_trait]
38impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
39    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
40        let mut port = self.port.lock().await;
41        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
42        #[cfg(not(feature = "signing"))]
43        let result = read_versioned_msg_async(port.deref_mut(), version).await;
44        #[cfg(feature = "signing")]
45        let result =
46            read_versioned_msg_async_signed(port.deref_mut(), version, self.signing_data.as_ref())
47                .await;
48        result
49    }
50
51    async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
52        let mut port = self.port.lock().await;
53        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
54        #[cfg(not(feature = "signing"))]
55        let result = read_versioned_raw_message_async::<M, _>(port.deref_mut(), version).await;
56        #[cfg(feature = "signing")]
57        let result = read_versioned_raw_message_async_signed::<M, _>(
58            port.deref_mut(),
59            version,
60            self.signing_data.as_ref(),
61        )
62        .await;
63        result
64    }
65
66    async fn send(
67        &self,
68        header: &MavHeader,
69        data: &M,
70    ) -> Result<usize, crate::error::MessageWriteError> {
71        let mut port = self.port.lock().await;
72
73        let sequence = self.sequence.fetch_add(
74            1,
75            // Safety:
76            //
77            // We are using `Ordering::Relaxed` here because:
78            // - We only need a unique sequence number per message
79            // - `Mutex` on `self.port` already makes sure the rest of the code is synchronized
80            // - No other thread reads or writes `self.sequence` without going through this `Mutex`
81            //
82            // Warning:
83            //
84            // If we later change this code to access `self.sequence` without locking `self.port` with the `Mutex`,
85            // then we should upgrade this ordering to `Ordering::SeqCst`.
86            atomic::Ordering::Relaxed,
87        );
88
89        let header = MavHeader {
90            sequence,
91            system_id: header.system_id,
92            component_id: header.component_id,
93        };
94
95        #[cfg(not(feature = "signing"))]
96        let result =
97            write_versioned_msg_async(port.reader_mut(), self.protocol_version, header, data).await;
98        #[cfg(feature = "signing")]
99        let result = write_versioned_msg_async_signed(
100            port.reader_mut(),
101            self.protocol_version,
102            header,
103            data,
104            self.signing_data.as_ref(),
105        )
106        .await;
107        result
108    }
109
110    fn set_protocol_version(&mut self, version: MavlinkVersion) {
111        self.protocol_version = version;
112    }
113
114    fn protocol_version(&self) -> MavlinkVersion {
115        self.protocol_version
116    }
117
118    fn set_allow_recv_any_version(&mut self, allow: bool) {
119        self.recv_any_version = allow;
120    }
121
122    fn allow_recv_any_version(&self) -> bool {
123        self.recv_any_version
124    }
125
126    #[cfg(feature = "signing")]
127    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
128        self.signing_data = signing_data.map(SigningData::from_config);
129    }
130}
131
132#[async_trait]
133impl AsyncConnectable for SerialConfig {
134    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
135    where
136        M: Message + Sync + Send,
137    {
138        let mut port = tokio_serial::new(&self.port_name, self.baud_rate).open_native_async()?;
139        port.set_data_bits(tokio_serial::DataBits::Eight)?;
140        port.set_parity(tokio_serial::Parity::None)?;
141        port.set_stop_bits(tokio_serial::StopBits::One)?;
142        port.set_flow_control(tokio_serial::FlowControl::None)?;
143
144        Ok(Box::new(AsyncSerialConnection {
145            port: Mutex::new(AsyncPeekReader::new(port)),
146            sequence: AtomicU8::new(0),
147            protocol_version: MavlinkVersion::V2,
148            recv_any_version: false,
149            #[cfg(feature = "signing")]
150            signing_data: None,
151        }))
152    }
153}