mavlink_core/async_connection/
tcp.rs

1//! Async TCP MAVLink connection
2
3use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection};
4use crate::async_peek_reader::AsyncPeekReader;
5use crate::connectable::TcpConnectable;
6use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
7
8use async_trait::async_trait;
9use core::ops::DerefMut;
10use tokio::io;
11use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
12use tokio::net::{TcpListener, TcpStream};
13use tokio::sync::Mutex;
14
15#[cfg(not(feature = "signing"))]
16use crate::{read_versioned_msg_async, write_versioned_msg_async};
17#[cfg(feature = "signing")]
18use crate::{
19    read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData,
20};
21
22pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
23    let addr = get_socket_addr(address)?;
24
25    let socket = TcpStream::connect(addr).await?;
26
27    let (reader, writer) = socket.into_split();
28
29    Ok(AsyncTcpConnection {
30        reader: Mutex::new(AsyncPeekReader::new(reader)),
31        writer: Mutex::new(TcpWrite {
32            socket: writer,
33            sequence: 0,
34        }),
35        protocol_version: MavlinkVersion::V2,
36        recv_any_version: false,
37        #[cfg(feature = "signing")]
38        signing_data: None,
39    })
40}
41
42pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
43    let addr = get_socket_addr(address)?;
44    let listener = TcpListener::bind(addr).await?;
45
46    //For now we only accept one incoming stream: this yields until we get one
47    match listener.accept().await {
48        Ok((socket, _)) => {
49            let (reader, writer) = socket.into_split();
50            return Ok(AsyncTcpConnection {
51                reader: Mutex::new(AsyncPeekReader::new(reader)),
52                writer: Mutex::new(TcpWrite {
53                    socket: writer,
54                    sequence: 0,
55                }),
56                protocol_version: MavlinkVersion::V2,
57                recv_any_version: false,
58                #[cfg(feature = "signing")]
59                signing_data: None,
60            });
61        }
62        Err(e) => {
63            //TODO don't println in lib
64            println!("listener err: {e}");
65        }
66    }
67    Err(io::Error::new(
68        io::ErrorKind::NotConnected,
69        "No incoming connections!",
70    ))
71}
72
73pub struct AsyncTcpConnection {
74    reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
75    writer: Mutex<TcpWrite>,
76    protocol_version: MavlinkVersion,
77    recv_any_version: bool,
78    #[cfg(feature = "signing")]
79    signing_data: Option<SigningData>,
80}
81
82struct TcpWrite {
83    socket: OwnedWriteHalf,
84    sequence: u8,
85}
86
87#[async_trait::async_trait]
88impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
89    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
90        let mut reader = self.reader.lock().await;
91        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
92        #[cfg(not(feature = "signing"))]
93        let result = read_versioned_msg_async(reader.deref_mut(), version).await;
94        #[cfg(feature = "signing")]
95        let result = read_versioned_msg_async_signed(
96            reader.deref_mut(),
97            version,
98            self.signing_data.as_ref(),
99        )
100        .await;
101        result
102    }
103
104    async fn send(
105        &self,
106        header: &MavHeader,
107        data: &M,
108    ) -> Result<usize, crate::error::MessageWriteError> {
109        let mut lock = self.writer.lock().await;
110
111        let header = MavHeader {
112            sequence: lock.sequence,
113            system_id: header.system_id,
114            component_id: header.component_id,
115        };
116
117        lock.sequence = lock.sequence.wrapping_add(1);
118        #[cfg(not(feature = "signing"))]
119        let result =
120            write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
121        #[cfg(feature = "signing")]
122        let result = write_versioned_msg_async_signed(
123            &mut lock.socket,
124            self.protocol_version,
125            header,
126            data,
127            self.signing_data.as_ref(),
128        )
129        .await;
130        result
131    }
132
133    fn set_protocol_version(&mut self, version: MavlinkVersion) {
134        self.protocol_version = version;
135    }
136
137    fn protocol_version(&self) -> MavlinkVersion {
138        self.protocol_version
139    }
140
141    fn set_allow_recv_any_version(&mut self, allow: bool) {
142        self.recv_any_version = allow
143    }
144
145    fn allow_recv_any_version(&self) -> bool {
146        self.recv_any_version
147    }
148
149    #[cfg(feature = "signing")]
150    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
151        self.signing_data = signing_data.map(SigningData::from_config)
152    }
153}
154
155#[async_trait]
156impl AsyncConnectable for TcpConnectable {
157    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
158    where
159        M: Message + Sync + Send,
160    {
161        let conn = if self.is_out {
162            tcpout(&self.address).await
163        } else {
164            tcpin(&self.address).await
165        };
166        Ok(Box::new(conn?))
167    }
168}