mavlink_core/async_connection/
direct_serial.rs

1//! Async Serial MAVLink connection
2
3use core::ops::DerefMut;
4use core::sync::atomic::{self, AtomicU8};
5use std::io;
6
7use async_trait::async_trait;
8use tokio::sync::Mutex;
9use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
10
11use super::AsyncConnectable;
12use crate::{
13    async_peek_reader::AsyncPeekReader, connectable::SerialConnectable, MavHeader, MavlinkVersion,
14    Message, ReadVersion,
15};
16
17#[cfg(not(feature = "signing"))]
18use crate::{read_versioned_msg_async, write_versioned_msg_async};
19#[cfg(feature = "signing")]
20use crate::{
21    read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData,
22};
23
24use super::AsyncMavConnection;
25
26pub struct AsyncSerialConnection {
27    port: Mutex<AsyncPeekReader<SerialStream>>,
28    sequence: AtomicU8,
29    protocol_version: MavlinkVersion,
30    recv_any_version: bool,
31    #[cfg(feature = "signing")]
32    signing_data: Option<SigningData>,
33}
34
35#[async_trait::async_trait]
36impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
37    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
38        let mut port = self.port.lock().await;
39        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
40        #[cfg(not(feature = "signing"))]
41        let result = read_versioned_msg_async(port.deref_mut(), version).await;
42        #[cfg(feature = "signing")]
43        let result =
44            read_versioned_msg_async_signed(port.deref_mut(), version, self.signing_data.as_ref())
45                .await;
46        result
47    }
48
49    async fn send(
50        &self,
51        header: &MavHeader,
52        data: &M,
53    ) -> Result<usize, crate::error::MessageWriteError> {
54        let mut port = self.port.lock().await;
55
56        let sequence = self.sequence.load(
57            // Safety:
58            //
59            // We are using `Ordering::Relaxed` here because:
60            // - We only need a unique sequence number per message
61            // - `Mutex` on `self.port` already makes sure the rest of the code is synchronized
62            // - No other thread reads or writes `self.sequence` without going through this `Mutex`
63            //
64            // Warning:
65            //
66            // If we later change this code to access `self.sequence` without locking `self.port` with the `Mutex`,
67            // then we should upgrade this ordering to `Ordering::SeqCst`.
68            atomic::Ordering::Relaxed,
69        );
70
71        let header = MavHeader {
72            sequence,
73            system_id: header.system_id,
74            component_id: header.component_id,
75        };
76
77        self.sequence.store(
78            sequence.wrapping_add(1),
79            // Safety:
80            //
81            // We are using `Ordering::Relaxed` here because:
82            // - We only need a unique sequence number per message
83            // - `Mutex` on `self.port` already makes sure the rest of the code is synchronized
84            // - No other thread reads or writes `self.sequence` without going through this `Mutex`
85            //
86            // Warning:
87            //
88            // If we later change this code to access `self.sequence` without locking `self.port` with the `Mutex`,
89            // then we should upgrade this ordering to `Ordering::SeqCst`.
90            atomic::Ordering::Relaxed,
91        );
92
93        #[cfg(not(feature = "signing"))]
94        let result =
95            write_versioned_msg_async(port.reader_mut(), self.protocol_version, header, data).await;
96        #[cfg(feature = "signing")]
97        let result = write_versioned_msg_async_signed(
98            port.reader_mut(),
99            self.protocol_version,
100            header,
101            data,
102            self.signing_data.as_ref(),
103        )
104        .await;
105        result
106    }
107
108    fn set_protocol_version(&mut self, version: MavlinkVersion) {
109        self.protocol_version = version;
110    }
111
112    fn protocol_version(&self) -> MavlinkVersion {
113        self.protocol_version
114    }
115
116    fn set_allow_recv_any_version(&mut self, allow: bool) {
117        self.recv_any_version = allow
118    }
119
120    fn allow_recv_any_version(&self) -> bool {
121        self.recv_any_version
122    }
123
124    #[cfg(feature = "signing")]
125    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
126        self.signing_data = signing_data.map(SigningData::from_config)
127    }
128}
129
130#[async_trait]
131impl AsyncConnectable for SerialConnectable {
132    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
133    where
134        M: Message + Sync + Send,
135    {
136        let mut port =
137            tokio_serial::new(&self.port_name, self.baud_rate as u32).open_native_async()?;
138        port.set_data_bits(tokio_serial::DataBits::Eight)?;
139        port.set_parity(tokio_serial::Parity::None)?;
140        port.set_stop_bits(tokio_serial::StopBits::One)?;
141        port.set_flow_control(tokio_serial::FlowControl::None)?;
142
143        Ok(Box::new(AsyncSerialConnection {
144            port: Mutex::new(AsyncPeekReader::new(port)),
145            sequence: AtomicU8::new(0),
146            protocol_version: MavlinkVersion::V2,
147            recv_any_version: false,
148            #[cfg(feature = "signing")]
149            signing_data: None,
150        }))
151    }
152}