mavlink_core/async_connection/
tcp.rs

1//! Async TCP MAVLink connection
2
3use std::io;
4
5use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection};
6use crate::async_peek_reader::AsyncPeekReader;
7use crate::connectable::TcpConnectable;
8use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
9
10use async_trait::async_trait;
11use core::ops::DerefMut;
12use futures::lock::Mutex;
13use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
14use tokio::net::{TcpListener, TcpStream};
15
16#[cfg(not(feature = "signing"))]
17use crate::{read_versioned_msg_async, write_versioned_msg_async};
18#[cfg(feature = "signing")]
19use crate::{
20    read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData,
21};
22
23pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
24    let addr = get_socket_addr(address)?;
25
26    let socket = TcpStream::connect(addr).await?;
27
28    let (reader, writer) = socket.into_split();
29
30    Ok(AsyncTcpConnection {
31        reader: Mutex::new(AsyncPeekReader::new(reader)),
32        writer: Mutex::new(TcpWrite {
33            socket: writer,
34            sequence: 0,
35        }),
36        protocol_version: MavlinkVersion::V2,
37        recv_any_version: false,
38        #[cfg(feature = "signing")]
39        signing_data: None,
40    })
41}
42
43pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
44    let addr = get_socket_addr(address)?;
45    let listener = TcpListener::bind(addr).await?;
46
47    //For now we only accept one incoming stream: this yields until we get one
48    match listener.accept().await {
49        Ok((socket, _)) => {
50            let (reader, writer) = socket.into_split();
51            return Ok(AsyncTcpConnection {
52                reader: Mutex::new(AsyncPeekReader::new(reader)),
53                writer: Mutex::new(TcpWrite {
54                    socket: writer,
55                    sequence: 0,
56                }),
57                protocol_version: MavlinkVersion::V2,
58                recv_any_version: false,
59                #[cfg(feature = "signing")]
60                signing_data: None,
61            });
62        }
63        Err(e) => {
64            //TODO don't println in lib
65            println!("listener err: {e}");
66        }
67    }
68    Err(io::Error::new(
69        io::ErrorKind::NotConnected,
70        "No incoming connections!",
71    ))
72}
73
74pub struct AsyncTcpConnection {
75    reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
76    writer: Mutex<TcpWrite>,
77    protocol_version: MavlinkVersion,
78    recv_any_version: bool,
79    #[cfg(feature = "signing")]
80    signing_data: Option<SigningData>,
81}
82
83struct TcpWrite {
84    socket: OwnedWriteHalf,
85    sequence: u8,
86}
87
88#[async_trait::async_trait]
89impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
90    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
91        let mut reader = self.reader.lock().await;
92        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
93        #[cfg(not(feature = "signing"))]
94        let result = read_versioned_msg_async(reader.deref_mut(), version).await;
95        #[cfg(feature = "signing")]
96        let result = read_versioned_msg_async_signed(
97            reader.deref_mut(),
98            version,
99            self.signing_data.as_ref(),
100        )
101        .await;
102        result
103    }
104
105    async fn send(
106        &self,
107        header: &MavHeader,
108        data: &M,
109    ) -> Result<usize, crate::error::MessageWriteError> {
110        let mut lock = self.writer.lock().await;
111
112        let header = MavHeader {
113            sequence: lock.sequence,
114            system_id: header.system_id,
115            component_id: header.component_id,
116        };
117
118        lock.sequence = lock.sequence.wrapping_add(1);
119        #[cfg(not(feature = "signing"))]
120        let result =
121            write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
122        #[cfg(feature = "signing")]
123        let result = write_versioned_msg_async_signed(
124            &mut lock.socket,
125            self.protocol_version,
126            header,
127            data,
128            self.signing_data.as_ref(),
129        )
130        .await;
131        result
132    }
133
134    fn set_protocol_version(&mut self, version: MavlinkVersion) {
135        self.protocol_version = version;
136    }
137
138    fn protocol_version(&self) -> MavlinkVersion {
139        self.protocol_version
140    }
141
142    fn set_allow_recv_any_version(&mut self, allow: bool) {
143        self.recv_any_version = allow
144    }
145
146    fn allow_recv_any_version(&self) -> bool {
147        self.recv_any_version
148    }
149
150    #[cfg(feature = "signing")]
151    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
152        self.signing_data = signing_data.map(SigningData::from_config)
153    }
154}
155
156#[async_trait]
157impl AsyncConnectable for TcpConnectable {
158    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
159    where
160        M: Message + Sync + Send,
161    {
162        let conn = if self.is_out {
163            tcpout(&self.address).await
164        } else {
165            tcpin(&self.address).await
166        };
167        Ok(Box::new(conn?))
168    }
169}