mavlink_core/async_connection/
direct_serial.rs

1//! Async Serial MAVLINK connection
2
3use core::ops::DerefMut;
4use std::io;
5
6use async_trait::async_trait;
7use tokio::sync::Mutex;
8use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
9
10use super::AsyncConnectable;
11use crate::{
12    async_peek_reader::AsyncPeekReader, connectable::SerialConnectable, MavHeader, MavlinkVersion,
13    Message,
14};
15
16#[cfg(not(feature = "signing"))]
17use crate::{read_versioned_msg_async, write_versioned_msg_async};
18#[cfg(feature = "signing")]
19use crate::{
20    read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData,
21};
22
23use super::AsyncMavConnection;
24
25pub struct AsyncSerialConnection {
26    port: Mutex<AsyncPeekReader<SerialStream>>,
27    sequence: Mutex<u8>,
28    protocol_version: MavlinkVersion,
29    #[cfg(feature = "signing")]
30    signing_data: Option<SigningData>,
31}
32
33#[async_trait::async_trait]
34impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
35    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
36        let mut port = self.port.lock().await;
37
38        #[cfg(not(feature = "signing"))]
39        let result = read_versioned_msg_async(port.deref_mut(), self.protocol_version).await;
40        #[cfg(feature = "signing")]
41        let result = read_versioned_msg_async_signed(
42            port.deref_mut(),
43            self.protocol_version,
44            self.signing_data.as_ref(),
45        )
46        .await;
47        result
48    }
49
50    async fn send(
51        &self,
52        header: &MavHeader,
53        data: &M,
54    ) -> Result<usize, crate::error::MessageWriteError> {
55        let mut port = self.port.lock().await;
56        let mut sequence = self.sequence.lock().await;
57
58        let header = MavHeader {
59            sequence: *sequence,
60            system_id: header.system_id,
61            component_id: header.component_id,
62        };
63
64        *sequence = sequence.wrapping_add(1);
65
66        #[cfg(not(feature = "signing"))]
67        let result =
68            write_versioned_msg_async(port.reader_mut(), self.protocol_version, header, data).await;
69        #[cfg(feature = "signing")]
70        let result = write_versioned_msg_async_signed(
71            port.reader_mut(),
72            self.protocol_version,
73            header,
74            data,
75            self.signing_data.as_ref(),
76        )
77        .await;
78        result
79    }
80
81    fn set_protocol_version(&mut self, version: MavlinkVersion) {
82        self.protocol_version = version;
83    }
84
85    fn get_protocol_version(&self) -> MavlinkVersion {
86        self.protocol_version
87    }
88
89    #[cfg(feature = "signing")]
90    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
91        self.signing_data = signing_data.map(SigningData::from_config)
92    }
93}
94
95#[async_trait]
96impl AsyncConnectable for SerialConnectable {
97    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
98    where
99        M: Message + Sync + Send,
100    {
101        let mut port =
102            tokio_serial::new(&self.port_name, self.baud_rate as u32).open_native_async()?;
103        port.set_data_bits(tokio_serial::DataBits::Eight)?;
104        port.set_parity(tokio_serial::Parity::None)?;
105        port.set_stop_bits(tokio_serial::StopBits::One)?;
106        port.set_flow_control(tokio_serial::FlowControl::None)?;
107
108        Ok(Box::new(AsyncSerialConnection {
109            port: Mutex::new(AsyncPeekReader::new(port)),
110            sequence: Mutex::new(0),
111            protocol_version: MavlinkVersion::V2,
112            #[cfg(feature = "signing")]
113            signing_data: None,
114        }))
115    }
116}