mavlink_core/async_connection/
direct_serial.rs1use core::ops::DerefMut;
4use core::sync::atomic::{self, AtomicU8};
5use std::io;
6
7use async_trait::async_trait;
8use futures::lock::Mutex;
9use tokio::io::{BufReader, ReadHalf, WriteHalf};
10use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
11
12use super::AsyncConnectable;
13use crate::connection::direct_serial::config::SerialConfig;
14use crate::MAVLinkMessageRaw;
15use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message, ReadVersion};
16
17#[cfg(not(feature = "signing"))]
18use crate::{
19 read_versioned_msg_async, read_versioned_raw_message_async, write_versioned_msg_async,
20};
21#[cfg(feature = "signing")]
22use crate::{
23 read_versioned_msg_async_signed, read_versioned_raw_message_async_signed,
24 write_versioned_msg_async_signed, SigningConfig, SigningData,
25};
26
27use super::AsyncMavConnection;
28
29pub struct AsyncSerialConnection {
30 read_port: Mutex<AsyncPeekReader<BufReader<ReadHalf<SerialStream>>>>,
31 write_port: Mutex<WriteHalf<SerialStream>>,
32 sequence: AtomicU8,
33 protocol_version: MavlinkVersion,
34 recv_any_version: bool,
35 #[cfg(feature = "signing")]
36 signing_data: Option<SigningData>,
37}
38
39#[async_trait::async_trait]
40impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
41 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
42 let mut port = self.read_port.lock().await;
43 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
44 #[cfg(not(feature = "signing"))]
45 let result = read_versioned_msg_async(port.deref_mut(), version).await;
46 #[cfg(feature = "signing")]
47 let result =
48 read_versioned_msg_async_signed(port.deref_mut(), version, self.signing_data.as_ref())
49 .await;
50 result
51 }
52
53 async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
54 let mut port = self.read_port.lock().await;
55 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
56 #[cfg(not(feature = "signing"))]
57 let result = read_versioned_raw_message_async::<M, _>(port.deref_mut(), version).await;
58 #[cfg(feature = "signing")]
59 let result = read_versioned_raw_message_async_signed::<M, _>(
60 port.deref_mut(),
61 version,
62 self.signing_data.as_ref(),
63 )
64 .await;
65 result
66 }
67
68 async fn send(
69 &self,
70 header: &MavHeader,
71 data: &M,
72 ) -> Result<usize, crate::error::MessageWriteError> {
73 let mut port = self.write_port.lock().await;
74
75 let sequence = self.sequence.fetch_add(
76 1,
77 atomic::Ordering::Relaxed,
89 );
90
91 let header = MavHeader {
92 sequence,
93 system_id: header.system_id,
94 component_id: header.component_id,
95 };
96
97 #[cfg(not(feature = "signing"))]
98 let result =
99 write_versioned_msg_async(&mut *port, self.protocol_version, header, data).await;
100 #[cfg(feature = "signing")]
101 let result = write_versioned_msg_async_signed(
102 &mut *port,
103 self.protocol_version,
104 header,
105 data,
106 self.signing_data.as_ref(),
107 )
108 .await;
109 result
110 }
111
112 fn set_protocol_version(&mut self, version: MavlinkVersion) {
113 self.protocol_version = version;
114 }
115
116 fn protocol_version(&self) -> MavlinkVersion {
117 self.protocol_version
118 }
119
120 fn set_allow_recv_any_version(&mut self, allow: bool) {
121 self.recv_any_version = allow;
122 }
123
124 fn allow_recv_any_version(&self) -> bool {
125 self.recv_any_version
126 }
127
128 #[cfg(feature = "signing")]
129 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
130 self.signing_data = signing_data.map(SigningData::from_config);
131 }
132}
133
134#[async_trait]
135impl AsyncConnectable for SerialConfig {
136 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
137 where
138 M: Message + Sync + Send,
139 {
140 let mut port = tokio_serial::new(&self.port_name, self.baud_rate).open_native_async()?;
141 port.set_data_bits(tokio_serial::DataBits::Eight)?;
142 port.set_parity(tokio_serial::Parity::None)?;
143 port.set_stop_bits(tokio_serial::StopBits::One)?;
144 port.set_flow_control(tokio_serial::FlowControl::None)?;
145
146 let (reader, writer) = tokio::io::split(port);
147 let read_buffer_capacity = self.buffer_capacity();
148 let buf_reader = BufReader::with_capacity(read_buffer_capacity, reader);
149
150 Ok(Box::new(AsyncSerialConnection {
151 read_port: Mutex::new(AsyncPeekReader::new(buf_reader)),
152 write_port: Mutex::new(writer),
153 sequence: AtomicU8::new(0),
154 protocol_version: MavlinkVersion::V2,
155 recv_any_version: false,
156 #[cfg(feature = "signing")]
157 signing_data: None,
158 }))
159 }
160}