mavlink_core/connection/direct_serial/
async.rs1use core::ops::DerefMut;
4use core::sync::atomic::AtomicU8;
5use std::io;
6
7use async_trait::async_trait;
8use futures::lock::Mutex;
9use tokio::io::{BufReader, ReadHalf, WriteHalf};
10use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
11
12use crate::MAVLinkMessageRaw;
13use crate::connection::AsyncConnectable;
14use crate::connection::direct_serial::config::SerialConfig;
15use crate::connection_shared::{
16 ConnectionState, next_atomic_send_header, read_message_async, read_raw_message_async,
17 write_message_async,
18};
19use crate::error::MessageReadError;
20use crate::{MavHeader, MavlinkVersion, Message, async_peek_reader::AsyncPeekReader};
21
22#[cfg(feature = "mav2-message-signing")]
23use crate::SigningConfig;
24
25use crate::connection::AsyncMavConnection;
26
27pub struct AsyncSerialConnection {
28 read_port: Mutex<AsyncPeekReader<BufReader<ReadHalf<SerialStream>>>>,
29 write_port: Mutex<WriteHalf<SerialStream>>,
30 sequence: AtomicU8,
31 state: ConnectionState,
32}
33
34#[async_trait::async_trait]
35impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
36 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
37 let mut port = self.read_port.lock().await;
38 loop {
39 let result = read_message_async::<M, _>(port.deref_mut(), &self.state).await;
40 match result {
41 Ok(message) => return Ok(message),
42 Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
43 return Err(MessageReadError::Io(e));
44 }
45 _ => {}
46 }
47 }
48 }
49
50 async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
51 let mut port = self.read_port.lock().await;
52 loop {
53 let result = read_raw_message_async::<M, _>(port.deref_mut(), &self.state).await;
54 match result {
55 Ok(message) => return Ok(message),
56 Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
57 return Err(MessageReadError::Io(e));
58 }
59 _ => {}
60 }
61 }
62 }
63
64 async fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
65 let mut port = self.read_port.lock().await;
66 read_message_async::<M, _>(port.deref_mut(), &self.state).await
67 }
68
69 async fn send(
70 &self,
71 header: &MavHeader,
72 data: &M,
73 ) -> Result<usize, crate::error::MessageWriteError> {
74 let mut port = self.write_port.lock().await;
75
76 let header = next_atomic_send_header(&self.sequence, header);
77 write_message_async(&mut *port, &self.state, header, data).await
78 }
79
80 fn set_protocol_version(&mut self, version: MavlinkVersion) {
81 self.state.set_protocol_version(version);
82 }
83
84 fn protocol_version(&self) -> MavlinkVersion {
85 self.state.protocol_version()
86 }
87
88 fn set_allow_recv_any_version(&mut self, allow: bool) {
89 self.state.set_allow_recv_any_version(allow);
90 }
91
92 fn allow_recv_any_version(&self) -> bool {
93 self.state.allow_recv_any_version()
94 }
95
96 #[cfg(feature = "mav2-message-signing")]
97 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
98 self.state.setup_signing(signing_data);
99 }
100}
101
102#[async_trait]
103impl AsyncConnectable for SerialConfig {
104 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
105 where
106 M: Message + Sync + Send,
107 {
108 let mut port = tokio_serial::new(&self.port_name, self.baud_rate).open_native_async()?;
109 port.set_data_bits(tokio_serial::DataBits::Eight)?;
110 port.set_parity(tokio_serial::Parity::None)?;
111 port.set_stop_bits(tokio_serial::StopBits::One)?;
112 port.set_flow_control(tokio_serial::FlowControl::None)?;
113
114 let (reader, writer) = tokio::io::split(port);
115 let read_buffer_capacity = self.buffer_capacity();
116 let buf_reader = BufReader::with_capacity(read_buffer_capacity, reader);
117
118 Ok(Box::new(AsyncSerialConnection {
119 read_port: Mutex::new(AsyncPeekReader::new(buf_reader)),
120 write_port: Mutex::new(writer),
121 sequence: AtomicU8::new(0),
122 state: ConnectionState::new(),
123 }))
124 }
125}