mavlink_core/async_connection/
direct_serial.rs1use core::ops::DerefMut;
4use core::sync::atomic::{self, AtomicU8};
5use std::io;
6
7use async_trait::async_trait;
8use futures::lock::Mutex;
9use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
10
11use super::AsyncConnectable;
12use crate::connection::direct_serial::config::SerialConfig;
13use crate::MAVLinkMessageRaw;
14use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message, ReadVersion};
15
16#[cfg(not(feature = "signing"))]
17use crate::{read_raw_versioned_msg_async, read_versioned_msg_async, write_versioned_msg_async};
18#[cfg(feature = "signing")]
19use crate::{
20 read_raw_versioned_msg_async_signed, read_versioned_msg_async_signed,
21 write_versioned_msg_async_signed, SigningConfig, SigningData,
22};
23
24use super::AsyncMavConnection;
25
26pub struct AsyncSerialConnection {
27 port: Mutex<AsyncPeekReader<SerialStream>>,
28 sequence: AtomicU8,
29 protocol_version: MavlinkVersion,
30 recv_any_version: bool,
31 #[cfg(feature = "signing")]
32 signing_data: Option<SigningData>,
33}
34
35#[async_trait::async_trait]
36impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
37 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
38 let mut port = self.port.lock().await;
39 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
40 #[cfg(not(feature = "signing"))]
41 let result = read_versioned_msg_async(port.deref_mut(), version).await;
42 #[cfg(feature = "signing")]
43 let result =
44 read_versioned_msg_async_signed(port.deref_mut(), version, self.signing_data.as_ref())
45 .await;
46 result
47 }
48
49 async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
50 let mut port = self.port.lock().await;
51 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
52 #[cfg(not(feature = "signing"))]
53 let result = read_raw_versioned_msg_async::<M, _>(port.deref_mut(), version).await;
54 #[cfg(feature = "signing")]
55 let result = read_raw_versioned_msg_async_signed::<M, _>(
56 port.deref_mut(),
57 version,
58 self.signing_data.as_ref(),
59 )
60 .await;
61 result
62 }
63
64 async fn send(
65 &self,
66 header: &MavHeader,
67 data: &M,
68 ) -> Result<usize, crate::error::MessageWriteError> {
69 let mut port = self.port.lock().await;
70
71 let sequence = self.sequence.fetch_add(
72 1,
73 atomic::Ordering::Relaxed,
85 );
86
87 let header = MavHeader {
88 sequence,
89 system_id: header.system_id,
90 component_id: header.component_id,
91 };
92
93 #[cfg(not(feature = "signing"))]
94 let result =
95 write_versioned_msg_async(port.reader_mut(), self.protocol_version, header, data).await;
96 #[cfg(feature = "signing")]
97 let result = write_versioned_msg_async_signed(
98 port.reader_mut(),
99 self.protocol_version,
100 header,
101 data,
102 self.signing_data.as_ref(),
103 )
104 .await;
105 result
106 }
107
108 fn set_protocol_version(&mut self, version: MavlinkVersion) {
109 self.protocol_version = version;
110 }
111
112 fn protocol_version(&self) -> MavlinkVersion {
113 self.protocol_version
114 }
115
116 fn set_allow_recv_any_version(&mut self, allow: bool) {
117 self.recv_any_version = allow;
118 }
119
120 fn allow_recv_any_version(&self) -> bool {
121 self.recv_any_version
122 }
123
124 #[cfg(feature = "signing")]
125 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
126 self.signing_data = signing_data.map(SigningData::from_config);
127 }
128}
129
130#[async_trait]
131impl AsyncConnectable for SerialConfig {
132 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
133 where
134 M: Message + Sync + Send,
135 {
136 let mut port = tokio_serial::new(&self.port_name, self.baud_rate).open_native_async()?;
137 port.set_data_bits(tokio_serial::DataBits::Eight)?;
138 port.set_parity(tokio_serial::Parity::None)?;
139 port.set_stop_bits(tokio_serial::StopBits::One)?;
140 port.set_flow_control(tokio_serial::FlowControl::None)?;
141
142 Ok(Box::new(AsyncSerialConnection {
143 port: Mutex::new(AsyncPeekReader::new(port)),
144 sequence: AtomicU8::new(0),
145 protocol_version: MavlinkVersion::V2,
146 recv_any_version: false,
147 #[cfg(feature = "signing")]
148 signing_data: None,
149 }))
150 }
151}