mavlink_core/async_connection/
tcp.rs

1//! Async TCP MAVLink connection
2
3use std::io;
4
5use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection};
6use crate::async_peek_reader::AsyncPeekReader;
7use crate::connection::tcp::config::{TcpConfig, TcpMode};
8use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message, ReadVersion};
9
10use async_trait::async_trait;
11use core::ops::DerefMut;
12use futures::lock::Mutex;
13use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
14use tokio::net::{TcpListener, TcpStream};
15
16#[cfg(not(feature = "signing"))]
17use crate::{
18    read_versioned_msg_async, read_versioned_raw_message_async, write_versioned_msg_async,
19};
20#[cfg(feature = "signing")]
21use crate::{
22    read_versioned_msg_async_signed, read_versioned_raw_message_async_signed,
23    write_versioned_msg_async_signed, SigningConfig, SigningData,
24};
25
26pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
27    let addr = get_socket_addr(address)?;
28
29    let socket = TcpStream::connect(addr).await?;
30
31    let (reader, writer) = socket.into_split();
32
33    Ok(AsyncTcpConnection {
34        reader: Mutex::new(AsyncPeekReader::new(reader)),
35        writer: Mutex::new(TcpWrite {
36            socket: writer,
37            sequence: 0,
38        }),
39        protocol_version: MavlinkVersion::V2,
40        recv_any_version: false,
41        #[cfg(feature = "signing")]
42        signing_data: None,
43    })
44}
45
46pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
47    let addr = get_socket_addr(address)?;
48    let listener = TcpListener::bind(addr).await?;
49
50    //For now we only accept one incoming stream: this yields until we get one
51    match listener.accept().await {
52        Ok((socket, _)) => {
53            let (reader, writer) = socket.into_split();
54            return Ok(AsyncTcpConnection {
55                reader: Mutex::new(AsyncPeekReader::new(reader)),
56                writer: Mutex::new(TcpWrite {
57                    socket: writer,
58                    sequence: 0,
59                }),
60                protocol_version: MavlinkVersion::V2,
61                recv_any_version: false,
62                #[cfg(feature = "signing")]
63                signing_data: None,
64            });
65        }
66        Err(e) => {
67            //TODO don't println in lib
68            println!("listener err: {e}");
69        }
70    }
71    Err(io::Error::new(
72        io::ErrorKind::NotConnected,
73        "No incoming connections!",
74    ))
75}
76
77pub struct AsyncTcpConnection {
78    reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
79    writer: Mutex<TcpWrite>,
80    protocol_version: MavlinkVersion,
81    recv_any_version: bool,
82    #[cfg(feature = "signing")]
83    signing_data: Option<SigningData>,
84}
85
86struct TcpWrite {
87    socket: OwnedWriteHalf,
88    sequence: u8,
89}
90
91#[async_trait::async_trait]
92impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
93    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
94        let mut reader = self.reader.lock().await;
95        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
96        #[cfg(not(feature = "signing"))]
97        let result = read_versioned_msg_async(reader.deref_mut(), version).await;
98        #[cfg(feature = "signing")]
99        let result = read_versioned_msg_async_signed(
100            reader.deref_mut(),
101            version,
102            self.signing_data.as_ref(),
103        )
104        .await;
105        result
106    }
107
108    async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
109        let mut reader = self.reader.lock().await;
110        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
111        #[cfg(not(feature = "signing"))]
112        let result = read_versioned_raw_message_async::<M, _>(reader.deref_mut(), version).await;
113        #[cfg(feature = "signing")]
114        let result = read_versioned_raw_message_async_signed::<M, _>(
115            reader.deref_mut(),
116            version,
117            self.signing_data.as_ref(),
118        )
119        .await;
120        result
121    }
122
123    async fn send(
124        &self,
125        header: &MavHeader,
126        data: &M,
127    ) -> Result<usize, crate::error::MessageWriteError> {
128        let mut lock = self.writer.lock().await;
129
130        let header = MavHeader {
131            sequence: lock.sequence,
132            system_id: header.system_id,
133            component_id: header.component_id,
134        };
135
136        lock.sequence = lock.sequence.wrapping_add(1);
137        #[cfg(not(feature = "signing"))]
138        let result =
139            write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
140        #[cfg(feature = "signing")]
141        let result = write_versioned_msg_async_signed(
142            &mut lock.socket,
143            self.protocol_version,
144            header,
145            data,
146            self.signing_data.as_ref(),
147        )
148        .await;
149        result
150    }
151
152    fn set_protocol_version(&mut self, version: MavlinkVersion) {
153        self.protocol_version = version;
154    }
155
156    fn protocol_version(&self) -> MavlinkVersion {
157        self.protocol_version
158    }
159
160    fn set_allow_recv_any_version(&mut self, allow: bool) {
161        self.recv_any_version = allow;
162    }
163
164    fn allow_recv_any_version(&self) -> bool {
165        self.recv_any_version
166    }
167
168    #[cfg(feature = "signing")]
169    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
170        self.signing_data = signing_data.map(SigningData::from_config);
171    }
172}
173
174#[async_trait]
175impl AsyncConnectable for TcpConfig {
176    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
177    where
178        M: Message + Sync + Send,
179    {
180        let conn = match self.mode {
181            TcpMode::TcpIn => tcpin(&self.address).await,
182            TcpMode::TcpOut => tcpout(&self.address).await,
183        };
184
185        Ok(Box::new(conn?))
186    }
187}