mavlink_core/connection/
tcp.rs

1//! TCP MAVLink connection
2
3use crate::connectable::TcpConnectable;
4use crate::connection::MavConnection;
5use crate::peek_reader::PeekReader;
6use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
7use core::ops::DerefMut;
8use std::io;
9use std::net::ToSocketAddrs;
10use std::net::{TcpListener, TcpStream};
11use std::sync::Mutex;
12use std::time::Duration;
13
14use super::{get_socket_addr, Connectable};
15
16#[cfg(not(feature = "signing"))]
17use crate::{read_versioned_msg, write_versioned_msg};
18
19#[cfg(feature = "signing")]
20use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
21
22pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
23    let addr = get_socket_addr(&address)?;
24
25    let socket = TcpStream::connect(addr)?;
26    socket.set_read_timeout(Some(Duration::from_millis(100)))?;
27
28    Ok(TcpConnection {
29        reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
30        writer: Mutex::new(TcpWrite {
31            socket,
32            sequence: 0,
33        }),
34        protocol_version: MavlinkVersion::V2,
35        recv_any_version: false,
36        #[cfg(feature = "signing")]
37        signing_data: None,
38    })
39}
40
41pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
42    let addr = get_socket_addr(&address)?;
43    let listener = TcpListener::bind(addr)?;
44
45    //For now we only accept one incoming stream: this blocks until we get one
46    for incoming in listener.incoming() {
47        match incoming {
48            Ok(socket) => {
49                return Ok(TcpConnection {
50                    reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
51                    writer: Mutex::new(TcpWrite {
52                        socket,
53                        sequence: 0,
54                    }),
55                    protocol_version: MavlinkVersion::V2,
56                    recv_any_version: false,
57                    #[cfg(feature = "signing")]
58                    signing_data: None,
59                })
60            }
61            Err(e) => {
62                //TODO don't println in lib
63                println!("listener err: {e}");
64            }
65        }
66    }
67    Err(io::Error::new(
68        io::ErrorKind::NotConnected,
69        "No incoming connections!",
70    ))
71}
72
73pub struct TcpConnection {
74    reader: Mutex<PeekReader<TcpStream>>,
75    writer: Mutex<TcpWrite>,
76    protocol_version: MavlinkVersion,
77    recv_any_version: bool,
78    #[cfg(feature = "signing")]
79    signing_data: Option<SigningData>,
80}
81
82struct TcpWrite {
83    socket: TcpStream,
84    sequence: u8,
85}
86
87impl<M: Message> MavConnection<M> for TcpConnection {
88    fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
89        let mut reader = self.reader.lock().unwrap();
90        let version = ReadVersion::from_conn_cfg::<_, M>(self);
91        #[cfg(not(feature = "signing"))]
92        let result = read_versioned_msg(reader.deref_mut(), version);
93        #[cfg(feature = "signing")]
94        let result =
95            read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
96        result
97    }
98
99    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
100        self.recv()
101    }
102
103    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
104        let mut lock = self.writer.lock().unwrap();
105
106        let header = MavHeader {
107            sequence: lock.sequence,
108            system_id: header.system_id,
109            component_id: header.component_id,
110        };
111
112        lock.sequence = lock.sequence.wrapping_add(1);
113        #[cfg(not(feature = "signing"))]
114        let result = write_versioned_msg(&mut lock.socket, self.protocol_version, header, data);
115        #[cfg(feature = "signing")]
116        let result = write_versioned_msg_signed(
117            &mut lock.socket,
118            self.protocol_version,
119            header,
120            data,
121            self.signing_data.as_ref(),
122        );
123        result
124    }
125
126    fn set_protocol_version(&mut self, version: MavlinkVersion) {
127        self.protocol_version = version;
128    }
129
130    fn protocol_version(&self) -> MavlinkVersion {
131        self.protocol_version
132    }
133
134    fn set_allow_recv_any_version(&mut self, allow: bool) {
135        self.recv_any_version = allow
136    }
137
138    fn allow_recv_any_version(&self) -> bool {
139        self.recv_any_version
140    }
141
142    #[cfg(feature = "signing")]
143    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
144        self.signing_data = signing_data.map(SigningData::from_config)
145    }
146}
147
148impl Connectable for TcpConnectable {
149    fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
150        let conn = if self.is_out {
151            tcpout(&self.address)
152        } else {
153            tcpin(&self.address)
154        };
155        Ok(Box::new(conn?))
156    }
157}