mavlink_core/connection/
tcp.rs

1//! TCP MAVLink connection
2
3use crate::connectable::TcpConnectable;
4use crate::connection::MavConnection;
5use crate::peek_reader::PeekReader;
6use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
7use core::ops::DerefMut;
8use std::io;
9use std::net::ToSocketAddrs;
10use std::net::{TcpListener, TcpStream};
11use std::sync::Mutex;
12use std::time::Duration;
13
14use super::{get_socket_addr, Connectable};
15
16#[cfg(not(feature = "signing"))]
17use crate::{read_versioned_msg, write_versioned_msg};
18
19#[cfg(feature = "signing")]
20use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
21
22pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
23    let addr = get_socket_addr(&address)?;
24
25    let socket = TcpStream::connect(addr)?;
26    socket.set_read_timeout(Some(Duration::from_millis(100)))?;
27
28    Ok(TcpConnection {
29        reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
30        writer: Mutex::new(TcpWrite {
31            socket,
32            sequence: 0,
33        }),
34        protocol_version: MavlinkVersion::V2,
35        recv_any_version: false,
36        #[cfg(feature = "signing")]
37        signing_data: None,
38    })
39}
40
41pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
42    let addr = get_socket_addr(&address)?;
43    let listener = TcpListener::bind(addr)?;
44
45    //For now we only accept one incoming stream: this blocks until we get one
46    for incoming in listener.incoming() {
47        match incoming {
48            Ok(socket) => {
49                return Ok(TcpConnection {
50                    reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
51                    writer: Mutex::new(TcpWrite {
52                        socket,
53                        sequence: 0,
54                    }),
55                    protocol_version: MavlinkVersion::V2,
56                    recv_any_version: false,
57                    #[cfg(feature = "signing")]
58                    signing_data: None,
59                })
60            }
61            Err(e) => {
62                //TODO don't println in lib
63                println!("listener err: {e}");
64            }
65        }
66    }
67    Err(io::Error::new(
68        io::ErrorKind::NotConnected,
69        "No incoming connections!",
70    ))
71}
72
73pub struct TcpConnection {
74    reader: Mutex<PeekReader<TcpStream>>,
75    writer: Mutex<TcpWrite>,
76    protocol_version: MavlinkVersion,
77    recv_any_version: bool,
78    #[cfg(feature = "signing")]
79    signing_data: Option<SigningData>,
80}
81
82struct TcpWrite {
83    socket: TcpStream,
84    sequence: u8,
85}
86
87impl<M: Message> MavConnection<M> for TcpConnection {
88    fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
89        let mut reader = self.reader.lock().unwrap();
90        let version = ReadVersion::from_conn_cfg::<_, M>(self);
91        #[cfg(not(feature = "signing"))]
92        let result = read_versioned_msg(reader.deref_mut(), version);
93        #[cfg(feature = "signing")]
94        let result =
95            read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
96        result
97    }
98
99    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
100        let mut lock = self.writer.lock().unwrap();
101
102        let header = MavHeader {
103            sequence: lock.sequence,
104            system_id: header.system_id,
105            component_id: header.component_id,
106        };
107
108        lock.sequence = lock.sequence.wrapping_add(1);
109        #[cfg(not(feature = "signing"))]
110        let result = write_versioned_msg(&mut lock.socket, self.protocol_version, header, data);
111        #[cfg(feature = "signing")]
112        let result = write_versioned_msg_signed(
113            &mut lock.socket,
114            self.protocol_version,
115            header,
116            data,
117            self.signing_data.as_ref(),
118        );
119        result
120    }
121
122    fn set_protocol_version(&mut self, version: MavlinkVersion) {
123        self.protocol_version = version;
124    }
125
126    fn protocol_version(&self) -> MavlinkVersion {
127        self.protocol_version
128    }
129
130    fn set_allow_recv_any_version(&mut self, allow: bool) {
131        self.recv_any_version = allow
132    }
133
134    fn allow_recv_any_version(&self) -> bool {
135        self.recv_any_version
136    }
137
138    #[cfg(feature = "signing")]
139    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
140        self.signing_data = signing_data.map(SigningData::from_config)
141    }
142}
143
144impl Connectable for TcpConnectable {
145    fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
146        let conn = if self.is_out {
147            tcpout(&self.address)
148        } else {
149            tcpin(&self.address)
150        };
151        Ok(Box::new(conn?))
152    }
153}