mavlink_core/async_connection/
direct_serial.rs1use core::ops::DerefMut;
4use core::sync::atomic::{self, AtomicU8};
5use std::io;
6
7use async_trait::async_trait;
8use futures::lock::Mutex;
9use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
10
11use super::AsyncConnectable;
12use crate::connection::direct_serial::config::SerialConfig;
13use crate::MAVLinkMessageRaw;
14use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message, ReadVersion};
15
16#[cfg(not(feature = "signing"))]
17use crate::{
18 read_versioned_msg_async, read_versioned_raw_message_async, write_versioned_msg_async,
19};
20#[cfg(feature = "signing")]
21use crate::{
22 read_versioned_msg_async_signed, read_versioned_raw_message_async_signed,
23 write_versioned_msg_async_signed, SigningConfig, SigningData,
24};
25
26use super::AsyncMavConnection;
27
28pub struct AsyncSerialConnection {
29 port: Mutex<AsyncPeekReader<SerialStream>>,
30 sequence: AtomicU8,
31 protocol_version: MavlinkVersion,
32 recv_any_version: bool,
33 #[cfg(feature = "signing")]
34 signing_data: Option<SigningData>,
35}
36
37#[async_trait::async_trait]
38impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
39 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
40 let mut port = self.port.lock().await;
41 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
42 #[cfg(not(feature = "signing"))]
43 let result = read_versioned_msg_async(port.deref_mut(), version).await;
44 #[cfg(feature = "signing")]
45 let result =
46 read_versioned_msg_async_signed(port.deref_mut(), version, self.signing_data.as_ref())
47 .await;
48 result
49 }
50
51 async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
52 let mut port = self.port.lock().await;
53 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
54 #[cfg(not(feature = "signing"))]
55 let result = read_versioned_raw_message_async::<M, _>(port.deref_mut(), version).await;
56 #[cfg(feature = "signing")]
57 let result = read_versioned_raw_message_async_signed::<M, _>(
58 port.deref_mut(),
59 version,
60 self.signing_data.as_ref(),
61 )
62 .await;
63 result
64 }
65
66 async fn send(
67 &self,
68 header: &MavHeader,
69 data: &M,
70 ) -> Result<usize, crate::error::MessageWriteError> {
71 let mut port = self.port.lock().await;
72
73 let sequence = self.sequence.fetch_add(
74 1,
75 atomic::Ordering::Relaxed,
87 );
88
89 let header = MavHeader {
90 sequence,
91 system_id: header.system_id,
92 component_id: header.component_id,
93 };
94
95 #[cfg(not(feature = "signing"))]
96 let result =
97 write_versioned_msg_async(port.reader_mut(), self.protocol_version, header, data).await;
98 #[cfg(feature = "signing")]
99 let result = write_versioned_msg_async_signed(
100 port.reader_mut(),
101 self.protocol_version,
102 header,
103 data,
104 self.signing_data.as_ref(),
105 )
106 .await;
107 result
108 }
109
110 fn set_protocol_version(&mut self, version: MavlinkVersion) {
111 self.protocol_version = version;
112 }
113
114 fn protocol_version(&self) -> MavlinkVersion {
115 self.protocol_version
116 }
117
118 fn set_allow_recv_any_version(&mut self, allow: bool) {
119 self.recv_any_version = allow;
120 }
121
122 fn allow_recv_any_version(&self) -> bool {
123 self.recv_any_version
124 }
125
126 #[cfg(feature = "signing")]
127 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
128 self.signing_data = signing_data.map(SigningData::from_config);
129 }
130}
131
132#[async_trait]
133impl AsyncConnectable for SerialConfig {
134 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
135 where
136 M: Message + Sync + Send,
137 {
138 let mut port = tokio_serial::new(&self.port_name, self.baud_rate).open_native_async()?;
139 port.set_data_bits(tokio_serial::DataBits::Eight)?;
140 port.set_parity(tokio_serial::Parity::None)?;
141 port.set_stop_bits(tokio_serial::StopBits::One)?;
142 port.set_flow_control(tokio_serial::FlowControl::None)?;
143
144 Ok(Box::new(AsyncSerialConnection {
145 port: Mutex::new(AsyncPeekReader::new(port)),
146 sequence: AtomicU8::new(0),
147 protocol_version: MavlinkVersion::V2,
148 recv_any_version: false,
149 #[cfg(feature = "signing")]
150 signing_data: None,
151 }))
152 }
153}