mavlink_core/async_connection/
tcp.rs

1//! Async TCP MAVLink connection
2
3use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection};
4use crate::async_peek_reader::AsyncPeekReader;
5use crate::connectable::TcpConnectable;
6use crate::{MavHeader, MavlinkVersion, Message};
7
8use async_trait::async_trait;
9use core::ops::DerefMut;
10use tokio::io;
11use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
12use tokio::net::{TcpListener, TcpStream};
13use tokio::sync::Mutex;
14
15#[cfg(not(feature = "signing"))]
16use crate::{read_versioned_msg_async, write_versioned_msg_async};
17#[cfg(feature = "signing")]
18use crate::{
19    read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData,
20};
21
22pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
23    let addr = get_socket_addr(address)?;
24
25    let socket = TcpStream::connect(addr).await?;
26
27    let (reader, writer) = socket.into_split();
28
29    Ok(AsyncTcpConnection {
30        reader: Mutex::new(AsyncPeekReader::new(reader)),
31        writer: Mutex::new(TcpWrite {
32            socket: writer,
33            sequence: 0,
34        }),
35        protocol_version: MavlinkVersion::V2,
36        #[cfg(feature = "signing")]
37        signing_data: None,
38    })
39}
40
41pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
42    let addr = get_socket_addr(address)?;
43    let listener = TcpListener::bind(addr).await?;
44
45    //For now we only accept one incoming stream: this yields until we get one
46    match listener.accept().await {
47        Ok((socket, _)) => {
48            let (reader, writer) = socket.into_split();
49            return Ok(AsyncTcpConnection {
50                reader: Mutex::new(AsyncPeekReader::new(reader)),
51                writer: Mutex::new(TcpWrite {
52                    socket: writer,
53                    sequence: 0,
54                }),
55                protocol_version: MavlinkVersion::V2,
56                #[cfg(feature = "signing")]
57                signing_data: None,
58            });
59        }
60        Err(e) => {
61            //TODO don't println in lib
62            println!("listener err: {e}");
63        }
64    }
65    Err(io::Error::new(
66        io::ErrorKind::NotConnected,
67        "No incoming connections!",
68    ))
69}
70
71pub struct AsyncTcpConnection {
72    reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
73    writer: Mutex<TcpWrite>,
74    protocol_version: MavlinkVersion,
75    #[cfg(feature = "signing")]
76    signing_data: Option<SigningData>,
77}
78
79struct TcpWrite {
80    socket: OwnedWriteHalf,
81    sequence: u8,
82}
83
84#[async_trait::async_trait]
85impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
86    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
87        let mut reader = self.reader.lock().await;
88        #[cfg(not(feature = "signing"))]
89        let result = read_versioned_msg_async(reader.deref_mut(), self.protocol_version).await;
90        #[cfg(feature = "signing")]
91        let result = read_versioned_msg_async_signed(
92            reader.deref_mut(),
93            self.protocol_version,
94            self.signing_data.as_ref(),
95        )
96        .await;
97        result
98    }
99
100    async fn send(
101        &self,
102        header: &MavHeader,
103        data: &M,
104    ) -> Result<usize, crate::error::MessageWriteError> {
105        let mut lock = self.writer.lock().await;
106
107        let header = MavHeader {
108            sequence: lock.sequence,
109            system_id: header.system_id,
110            component_id: header.component_id,
111        };
112
113        lock.sequence = lock.sequence.wrapping_add(1);
114        #[cfg(not(feature = "signing"))]
115        let result =
116            write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
117        #[cfg(feature = "signing")]
118        let result = write_versioned_msg_async_signed(
119            &mut lock.socket,
120            self.protocol_version,
121            header,
122            data,
123            self.signing_data.as_ref(),
124        )
125        .await;
126        result
127    }
128
129    fn set_protocol_version(&mut self, version: MavlinkVersion) {
130        self.protocol_version = version;
131    }
132
133    fn get_protocol_version(&self) -> MavlinkVersion {
134        self.protocol_version
135    }
136
137    #[cfg(feature = "signing")]
138    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
139        self.signing_data = signing_data.map(SigningData::from_config)
140    }
141}
142
143#[async_trait]
144impl AsyncConnectable for TcpConnectable {
145    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
146    where
147        M: Message + Sync + Send,
148    {
149        let conn = if self.is_out {
150            tcpout(&self.address).await
151        } else {
152            tcpin(&self.address).await
153        };
154        Ok(Box::new(conn?))
155    }
156}