Skip to main content

mavlink_core/connection/tcp/
async.rs

1//! Async TCP MAVLink connection
2
3use std::io;
4
5use crate::async_peek_reader::AsyncPeekReader;
6use crate::connection::tcp::config::{TcpConfig, TcpMode};
7use crate::connection::{AsyncConnectable, AsyncMavConnection, get_socket_addr};
8use crate::connection_shared::{
9    ConnectionState, next_send_header, read_message_async, read_raw_message_async,
10    write_message_async,
11};
12use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message};
13
14use async_trait::async_trait;
15use core::ops::DerefMut;
16use futures::{FutureExt, lock::Mutex};
17use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
18use tokio::net::{TcpListener, TcpStream};
19
20#[cfg(feature = "mav2-message-signing")]
21use crate::SigningConfig;
22
23pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
24    let addr = get_socket_addr(&address)?;
25
26    let socket = TcpStream::connect(addr).await?;
27
28    let (reader, writer) = socket.into_split();
29
30    Ok(AsyncTcpConnection {
31        reader: Mutex::new(AsyncPeekReader::new(reader)),
32        writer: Mutex::new(TcpWrite {
33            socket: writer,
34            sequence: 0,
35        }),
36        state: ConnectionState::new(),
37    })
38}
39
40pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
41    let addr = get_socket_addr(&address)?;
42    let listener = TcpListener::bind(addr).await?;
43
44    //For now we only accept one incoming stream: this yields until we get one
45    match listener.accept().await {
46        Ok((socket, _)) => {
47            let (reader, writer) = socket.into_split();
48            return Ok(AsyncTcpConnection {
49                reader: Mutex::new(AsyncPeekReader::new(reader)),
50                writer: Mutex::new(TcpWrite {
51                    socket: writer,
52                    sequence: 0,
53                }),
54                state: ConnectionState::new(),
55            });
56        }
57        Err(e) => {
58            //TODO don't println in lib
59            println!("listener err: {e}");
60        }
61    }
62    Err(io::Error::new(
63        io::ErrorKind::NotConnected,
64        "No incoming connections!",
65    ))
66}
67
68pub struct AsyncTcpConnection {
69    reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
70    writer: Mutex<TcpWrite>,
71    state: ConnectionState,
72}
73
74struct TcpWrite {
75    socket: OwnedWriteHalf,
76    sequence: u8,
77}
78
79#[async_trait::async_trait]
80impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
81    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
82        let mut reader = self.reader.lock().await;
83        read_message_async::<M, _>(reader.deref_mut(), &self.state).await
84    }
85
86    async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
87        let mut reader = self.reader.lock().await;
88        read_raw_message_async::<M, _>(reader.deref_mut(), &self.state).await
89    }
90
91    async fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
92        match self.recv().now_or_never() {
93            Some(result) => result,
94            None => Err(crate::error::MessageReadError::Io(
95                io::ErrorKind::WouldBlock.into(),
96            )),
97        }
98    }
99
100    async fn send(
101        &self,
102        header: &MavHeader,
103        data: &M,
104    ) -> Result<usize, crate::error::MessageWriteError> {
105        let mut lock = self.writer.lock().await;
106
107        let header = next_send_header(&mut lock.sequence, header);
108        write_message_async(&mut lock.socket, &self.state, header, data).await
109    }
110
111    fn set_protocol_version(&mut self, version: MavlinkVersion) {
112        self.state.set_protocol_version(version);
113    }
114
115    fn protocol_version(&self) -> MavlinkVersion {
116        self.state.protocol_version()
117    }
118
119    fn set_allow_recv_any_version(&mut self, allow: bool) {
120        self.state.set_allow_recv_any_version(allow);
121    }
122
123    fn allow_recv_any_version(&self) -> bool {
124        self.state.allow_recv_any_version()
125    }
126
127    #[cfg(feature = "mav2-message-signing")]
128    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
129        self.state.setup_signing(signing_data);
130    }
131}
132
133#[async_trait]
134impl AsyncConnectable for TcpConfig {
135    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
136    where
137        M: Message + Sync + Send,
138    {
139        let conn = match self.mode {
140            TcpMode::TcpIn => tcpin(&self.address).await,
141            TcpMode::TcpOut => tcpout(&self.address).await,
142        };
143
144        Ok(Box::new(conn?))
145    }
146}