Skip to main content

mavlink_core/async_connection/
direct_serial.rs

1//! Async Serial MAVLink connection
2
3use core::ops::DerefMut;
4use core::sync::atomic::{self, AtomicU8};
5use std::io;
6
7use async_trait::async_trait;
8use futures::lock::Mutex;
9use tokio::io::{BufReader, ReadHalf, WriteHalf};
10use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
11
12use super::AsyncConnectable;
13use crate::connection::direct_serial::config::SerialConfig;
14use crate::MAVLinkMessageRaw;
15use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message, ReadVersion};
16
17#[cfg(not(feature = "signing"))]
18use crate::{
19    read_versioned_msg_async, read_versioned_raw_message_async, write_versioned_msg_async,
20};
21#[cfg(feature = "signing")]
22use crate::{
23    read_versioned_msg_async_signed, read_versioned_raw_message_async_signed,
24    write_versioned_msg_async_signed, SigningConfig, SigningData,
25};
26
27use super::AsyncMavConnection;
28
29pub struct AsyncSerialConnection {
30    read_port: Mutex<AsyncPeekReader<BufReader<ReadHalf<SerialStream>>>>,
31    write_port: Mutex<WriteHalf<SerialStream>>,
32    sequence: AtomicU8,
33    protocol_version: MavlinkVersion,
34    recv_any_version: bool,
35    #[cfg(feature = "signing")]
36    signing_data: Option<SigningData>,
37}
38
39#[async_trait::async_trait]
40impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
41    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
42        let mut port = self.read_port.lock().await;
43        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
44        #[cfg(not(feature = "signing"))]
45        let result = read_versioned_msg_async(port.deref_mut(), version).await;
46        #[cfg(feature = "signing")]
47        let result =
48            read_versioned_msg_async_signed(port.deref_mut(), version, self.signing_data.as_ref())
49                .await;
50        result
51    }
52
53    async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
54        let mut port = self.read_port.lock().await;
55        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
56        #[cfg(not(feature = "signing"))]
57        let result = read_versioned_raw_message_async::<M, _>(port.deref_mut(), version).await;
58        #[cfg(feature = "signing")]
59        let result = read_versioned_raw_message_async_signed::<M, _>(
60            port.deref_mut(),
61            version,
62            self.signing_data.as_ref(),
63        )
64        .await;
65        result
66    }
67
68    async fn send(
69        &self,
70        header: &MavHeader,
71        data: &M,
72    ) -> Result<usize, crate::error::MessageWriteError> {
73        let mut port = self.write_port.lock().await;
74
75        let sequence = self.sequence.fetch_add(
76            1,
77            // Safety:
78            //
79            // We are using `Ordering::Relaxed` here because:
80            // - We only need a unique sequence number per message
81            // - `Mutex` on `self.write_port` already makes sure the rest of the code is synchronized
82            // - No other thread reads or writes `self.sequence` without going through this `Mutex`
83            //
84            // Warning:
85            //
86            // If we later change this code to access `self.sequence` without locking `self.write_port` with the `Mutex`,
87            // then we should upgrade this ordering to `Ordering::SeqCst`.
88            atomic::Ordering::Relaxed,
89        );
90
91        let header = MavHeader {
92            sequence,
93            system_id: header.system_id,
94            component_id: header.component_id,
95        };
96
97        #[cfg(not(feature = "signing"))]
98        let result =
99            write_versioned_msg_async(&mut *port, self.protocol_version, header, data).await;
100        #[cfg(feature = "signing")]
101        let result = write_versioned_msg_async_signed(
102            &mut *port,
103            self.protocol_version,
104            header,
105            data,
106            self.signing_data.as_ref(),
107        )
108        .await;
109        result
110    }
111
112    fn set_protocol_version(&mut self, version: MavlinkVersion) {
113        self.protocol_version = version;
114    }
115
116    fn protocol_version(&self) -> MavlinkVersion {
117        self.protocol_version
118    }
119
120    fn set_allow_recv_any_version(&mut self, allow: bool) {
121        self.recv_any_version = allow;
122    }
123
124    fn allow_recv_any_version(&self) -> bool {
125        self.recv_any_version
126    }
127
128    #[cfg(feature = "signing")]
129    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
130        self.signing_data = signing_data.map(SigningData::from_config);
131    }
132}
133
134#[async_trait]
135impl AsyncConnectable for SerialConfig {
136    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
137    where
138        M: Message + Sync + Send,
139    {
140        let mut port = tokio_serial::new(&self.port_name, self.baud_rate).open_native_async()?;
141        port.set_data_bits(tokio_serial::DataBits::Eight)?;
142        port.set_parity(tokio_serial::Parity::None)?;
143        port.set_stop_bits(tokio_serial::StopBits::One)?;
144        port.set_flow_control(tokio_serial::FlowControl::None)?;
145
146        let (reader, writer) = tokio::io::split(port);
147        let read_buffer_capacity = self.buffer_capacity();
148        let buf_reader = BufReader::with_capacity(read_buffer_capacity, reader);
149
150        Ok(Box::new(AsyncSerialConnection {
151            read_port: Mutex::new(AsyncPeekReader::new(buf_reader)),
152            write_port: Mutex::new(writer),
153            sequence: AtomicU8::new(0),
154            protocol_version: MavlinkVersion::V2,
155            recv_any_version: false,
156            #[cfg(feature = "signing")]
157            signing_data: None,
158        }))
159    }
160}