Skip to main content

mavlink_core/connection/direct_serial/
async.rs

1//! Async Serial MAVLink connection
2
3use core::ops::DerefMut;
4use core::sync::atomic::AtomicU8;
5use std::io;
6
7use async_trait::async_trait;
8use futures::lock::Mutex;
9use tokio::io::{BufReader, ReadHalf, WriteHalf};
10use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
11
12use crate::MAVLinkMessageRaw;
13use crate::connection::AsyncConnectable;
14use crate::connection::direct_serial::config::SerialConfig;
15use crate::connection_shared::{
16    ConnectionState, next_atomic_send_header, read_message_async, read_raw_message_async,
17    write_message_async,
18};
19use crate::error::MessageReadError;
20use crate::{MavHeader, MavlinkVersion, Message, async_peek_reader::AsyncPeekReader};
21
22#[cfg(feature = "mav2-message-signing")]
23use crate::SigningConfig;
24
25use crate::connection::AsyncMavConnection;
26
27pub struct AsyncSerialConnection {
28    read_port: Mutex<AsyncPeekReader<BufReader<ReadHalf<SerialStream>>>>,
29    write_port: Mutex<WriteHalf<SerialStream>>,
30    sequence: AtomicU8,
31    state: ConnectionState,
32}
33
34#[async_trait::async_trait]
35impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
36    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
37        let mut port = self.read_port.lock().await;
38        loop {
39            let result = read_message_async::<M, _>(port.deref_mut(), &self.state).await;
40            match result {
41                Ok(message) => return Ok(message),
42                Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
43                    return Err(MessageReadError::Io(e));
44                }
45                _ => {}
46            }
47        }
48    }
49
50    async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
51        let mut port = self.read_port.lock().await;
52        loop {
53            let result = read_raw_message_async::<M, _>(port.deref_mut(), &self.state).await;
54            match result {
55                Ok(message) => return Ok(message),
56                Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
57                    return Err(MessageReadError::Io(e));
58                }
59                _ => {}
60            }
61        }
62    }
63
64    async fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
65        let mut port = self.read_port.lock().await;
66        read_message_async::<M, _>(port.deref_mut(), &self.state).await
67    }
68
69    async fn send(
70        &self,
71        header: &MavHeader,
72        data: &M,
73    ) -> Result<usize, crate::error::MessageWriteError> {
74        let mut port = self.write_port.lock().await;
75
76        let header = next_atomic_send_header(&self.sequence, header);
77        write_message_async(&mut *port, &self.state, header, data).await
78    }
79
80    fn set_protocol_version(&mut self, version: MavlinkVersion) {
81        self.state.set_protocol_version(version);
82    }
83
84    fn protocol_version(&self) -> MavlinkVersion {
85        self.state.protocol_version()
86    }
87
88    fn set_allow_recv_any_version(&mut self, allow: bool) {
89        self.state.set_allow_recv_any_version(allow);
90    }
91
92    fn allow_recv_any_version(&self) -> bool {
93        self.state.allow_recv_any_version()
94    }
95
96    #[cfg(feature = "mav2-message-signing")]
97    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
98        self.state.setup_signing(signing_data);
99    }
100}
101
102#[async_trait]
103impl AsyncConnectable for SerialConfig {
104    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
105    where
106        M: Message + Sync + Send,
107    {
108        let mut port = tokio_serial::new(&self.port_name, self.baud_rate).open_native_async()?;
109        port.set_data_bits(tokio_serial::DataBits::Eight)?;
110        port.set_parity(tokio_serial::Parity::None)?;
111        port.set_stop_bits(tokio_serial::StopBits::One)?;
112        port.set_flow_control(tokio_serial::FlowControl::None)?;
113
114        let (reader, writer) = tokio::io::split(port);
115        let read_buffer_capacity = self.buffer_capacity();
116        let buf_reader = BufReader::with_capacity(read_buffer_capacity, reader);
117
118        Ok(Box::new(AsyncSerialConnection {
119            read_port: Mutex::new(AsyncPeekReader::new(buf_reader)),
120            write_port: Mutex::new(writer),
121            sequence: AtomicU8::new(0),
122            state: ConnectionState::new(),
123        }))
124    }
125}