mavlink_core/async_connection/
tcp.rs

1//! Async TCP MAVLink connection
2
3use std::io;
4
5use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection};
6use crate::async_peek_reader::AsyncPeekReader;
7use crate::connection::tcp::config::{TcpConfig, TcpMode};
8use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message, ReadVersion};
9
10use async_trait::async_trait;
11use core::ops::DerefMut;
12use futures::lock::Mutex;
13use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
14use tokio::net::{TcpListener, TcpStream};
15
16#[cfg(not(feature = "signing"))]
17use crate::{read_raw_versioned_msg_async, read_versioned_msg_async, write_versioned_msg_async};
18#[cfg(feature = "signing")]
19use crate::{
20    read_raw_versioned_msg_async_signed, read_versioned_msg_async_signed,
21    write_versioned_msg_async_signed, SigningConfig, SigningData,
22};
23
24pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
25    let addr = get_socket_addr(address)?;
26
27    let socket = TcpStream::connect(addr).await?;
28
29    let (reader, writer) = socket.into_split();
30
31    Ok(AsyncTcpConnection {
32        reader: Mutex::new(AsyncPeekReader::new(reader)),
33        writer: Mutex::new(TcpWrite {
34            socket: writer,
35            sequence: 0,
36        }),
37        protocol_version: MavlinkVersion::V2,
38        recv_any_version: false,
39        #[cfg(feature = "signing")]
40        signing_data: None,
41    })
42}
43
44pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
45    let addr = get_socket_addr(address)?;
46    let listener = TcpListener::bind(addr).await?;
47
48    //For now we only accept one incoming stream: this yields until we get one
49    match listener.accept().await {
50        Ok((socket, _)) => {
51            let (reader, writer) = socket.into_split();
52            return Ok(AsyncTcpConnection {
53                reader: Mutex::new(AsyncPeekReader::new(reader)),
54                writer: Mutex::new(TcpWrite {
55                    socket: writer,
56                    sequence: 0,
57                }),
58                protocol_version: MavlinkVersion::V2,
59                recv_any_version: false,
60                #[cfg(feature = "signing")]
61                signing_data: None,
62            });
63        }
64        Err(e) => {
65            //TODO don't println in lib
66            println!("listener err: {e}");
67        }
68    }
69    Err(io::Error::new(
70        io::ErrorKind::NotConnected,
71        "No incoming connections!",
72    ))
73}
74
75pub struct AsyncTcpConnection {
76    reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
77    writer: Mutex<TcpWrite>,
78    protocol_version: MavlinkVersion,
79    recv_any_version: bool,
80    #[cfg(feature = "signing")]
81    signing_data: Option<SigningData>,
82}
83
84struct TcpWrite {
85    socket: OwnedWriteHalf,
86    sequence: u8,
87}
88
89#[async_trait::async_trait]
90impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
91    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
92        let mut reader = self.reader.lock().await;
93        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
94        #[cfg(not(feature = "signing"))]
95        let result = read_versioned_msg_async(reader.deref_mut(), version).await;
96        #[cfg(feature = "signing")]
97        let result = read_versioned_msg_async_signed(
98            reader.deref_mut(),
99            version,
100            self.signing_data.as_ref(),
101        )
102        .await;
103        result
104    }
105
106    async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
107        let mut reader = self.reader.lock().await;
108        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
109        #[cfg(not(feature = "signing"))]
110        let result = read_raw_versioned_msg_async::<M, _>(reader.deref_mut(), version).await;
111        #[cfg(feature = "signing")]
112        let result = read_raw_versioned_msg_async_signed::<M, _>(
113            reader.deref_mut(),
114            version,
115            self.signing_data.as_ref(),
116        )
117        .await;
118        result
119    }
120
121    async fn send(
122        &self,
123        header: &MavHeader,
124        data: &M,
125    ) -> Result<usize, crate::error::MessageWriteError> {
126        let mut lock = self.writer.lock().await;
127
128        let header = MavHeader {
129            sequence: lock.sequence,
130            system_id: header.system_id,
131            component_id: header.component_id,
132        };
133
134        lock.sequence = lock.sequence.wrapping_add(1);
135        #[cfg(not(feature = "signing"))]
136        let result =
137            write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
138        #[cfg(feature = "signing")]
139        let result = write_versioned_msg_async_signed(
140            &mut lock.socket,
141            self.protocol_version,
142            header,
143            data,
144            self.signing_data.as_ref(),
145        )
146        .await;
147        result
148    }
149
150    fn set_protocol_version(&mut self, version: MavlinkVersion) {
151        self.protocol_version = version;
152    }
153
154    fn protocol_version(&self) -> MavlinkVersion {
155        self.protocol_version
156    }
157
158    fn set_allow_recv_any_version(&mut self, allow: bool) {
159        self.recv_any_version = allow;
160    }
161
162    fn allow_recv_any_version(&self) -> bool {
163        self.recv_any_version
164    }
165
166    #[cfg(feature = "signing")]
167    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
168        self.signing_data = signing_data.map(SigningData::from_config);
169    }
170}
171
172#[async_trait]
173impl AsyncConnectable for TcpConfig {
174    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
175    where
176        M: Message + Sync + Send,
177    {
178        let conn = match self.mode {
179            TcpMode::TcpIn => tcpin(&self.address).await,
180            TcpMode::TcpOut => tcpout(&self.address).await,
181        };
182
183        Ok(Box::new(conn?))
184    }
185}