mavlink_core/async_connection/
direct_serial.rs1use core::ops::DerefMut;
4use core::sync::atomic::{self, AtomicU8};
5use std::io;
6
7use async_trait::async_trait;
8use futures::lock::Mutex;
9use tokio::io::{BufReader, ReadHalf, WriteHalf};
10use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
11
12use super::AsyncConnectable;
13use crate::connection::direct_serial::config::SerialConfig;
14use crate::error::MessageReadError;
15use crate::MAVLinkMessageRaw;
16use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message, ReadVersion};
17
18#[cfg(not(feature = "mav2-message-signing"))]
19use crate::{
20 read_versioned_msg_async, read_versioned_raw_message_async, write_versioned_msg_async,
21};
22#[cfg(feature = "mav2-message-signing")]
23use crate::{
24 read_versioned_msg_async_signed, read_versioned_raw_message_async_signed,
25 write_versioned_msg_async_signed, SigningConfig, SigningData,
26};
27
28use super::AsyncMavConnection;
29
30pub struct AsyncSerialConnection {
31 read_port: Mutex<AsyncPeekReader<BufReader<ReadHalf<SerialStream>>>>,
32 write_port: Mutex<WriteHalf<SerialStream>>,
33 sequence: AtomicU8,
34 protocol_version: MavlinkVersion,
35 recv_any_version: bool,
36 #[cfg(feature = "mav2-message-signing")]
37 signing_data: Option<SigningData>,
38}
39
40#[async_trait::async_trait]
41impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
42 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
43 let mut port = self.read_port.lock().await;
44 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
45 loop {
46 #[cfg(not(feature = "mav2-message-signing"))]
47 let result = read_versioned_msg_async(port.deref_mut(), version).await;
48 #[cfg(feature = "mav2-message-signing")]
49 let result = read_versioned_msg_async_signed(
50 port.deref_mut(),
51 version,
52 self.signing_data.as_ref(),
53 )
54 .await;
55 match result {
56 Ok(message) => return Ok(message),
57 Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
58 return Err(MessageReadError::Io(e));
59 }
60 _ => {}
61 }
62 }
63 }
64
65 async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
66 let mut port = self.read_port.lock().await;
67 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
68 loop {
69 #[cfg(not(feature = "mav2-message-signing"))]
70 let result = read_versioned_raw_message_async::<M, _>(port.deref_mut(), version).await;
71 #[cfg(feature = "mav2-message-signing")]
72 let result = read_versioned_raw_message_async_signed::<M, _>(
73 port.deref_mut(),
74 version,
75 self.signing_data.as_ref(),
76 )
77 .await;
78 match result {
79 Ok(message) => return Ok(message),
80 Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
81 return Err(MessageReadError::Io(e));
82 }
83 _ => {}
84 }
85 }
86 }
87
88 async fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
89 let mut port = self.read_port.lock().await;
90 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
91
92 #[cfg(not(feature = "mav2-message-signing"))]
93 let result = read_versioned_msg_async(port.deref_mut(), version).await;
94
95 #[cfg(feature = "mav2-message-signing")]
96 let result =
97 read_versioned_msg_async_signed(port.deref_mut(), version, self.signing_data.as_ref())
98 .await;
99
100 result
101 }
102
103 async fn send(
104 &self,
105 header: &MavHeader,
106 data: &M,
107 ) -> Result<usize, crate::error::MessageWriteError> {
108 let mut port = self.write_port.lock().await;
109
110 let sequence = self.sequence.fetch_add(
111 1,
112 atomic::Ordering::Relaxed,
124 );
125
126 let header = MavHeader {
127 sequence,
128 system_id: header.system_id,
129 component_id: header.component_id,
130 };
131
132 #[cfg(not(feature = "mav2-message-signing"))]
133 let result =
134 write_versioned_msg_async(&mut *port, self.protocol_version, header, data).await;
135 #[cfg(feature = "mav2-message-signing")]
136 let result = write_versioned_msg_async_signed(
137 &mut *port,
138 self.protocol_version,
139 header,
140 data,
141 self.signing_data.as_ref(),
142 )
143 .await;
144 result
145 }
146
147 fn set_protocol_version(&mut self, version: MavlinkVersion) {
148 self.protocol_version = version;
149 }
150
151 fn protocol_version(&self) -> MavlinkVersion {
152 self.protocol_version
153 }
154
155 fn set_allow_recv_any_version(&mut self, allow: bool) {
156 self.recv_any_version = allow;
157 }
158
159 fn allow_recv_any_version(&self) -> bool {
160 self.recv_any_version
161 }
162
163 #[cfg(feature = "mav2-message-signing")]
164 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
165 self.signing_data = signing_data.map(SigningData::from_config);
166 }
167}
168
169#[async_trait]
170impl AsyncConnectable for SerialConfig {
171 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
172 where
173 M: Message + Sync + Send,
174 {
175 let mut port = tokio_serial::new(&self.port_name, self.baud_rate).open_native_async()?;
176 port.set_data_bits(tokio_serial::DataBits::Eight)?;
177 port.set_parity(tokio_serial::Parity::None)?;
178 port.set_stop_bits(tokio_serial::StopBits::One)?;
179 port.set_flow_control(tokio_serial::FlowControl::None)?;
180
181 let (reader, writer) = tokio::io::split(port);
182 let read_buffer_capacity = self.buffer_capacity();
183 let buf_reader = BufReader::with_capacity(read_buffer_capacity, reader);
184
185 Ok(Box::new(AsyncSerialConnection {
186 read_port: Mutex::new(AsyncPeekReader::new(buf_reader)),
187 write_port: Mutex::new(writer),
188 sequence: AtomicU8::new(0),
189 protocol_version: MavlinkVersion::V2,
190 recv_any_version: false,
191 #[cfg(feature = "mav2-message-signing")]
192 signing_data: None,
193 }))
194 }
195}