mavlink_core/connection/
tcp.rs

1//! TCP MAVLink connection
2
3use crate::connectable::TcpConnectable;
4use crate::connection::MavConnection;
5use crate::peek_reader::PeekReader;
6use crate::{MavHeader, MavlinkVersion, Message};
7use core::ops::DerefMut;
8use std::io;
9use std::net::ToSocketAddrs;
10use std::net::{TcpListener, TcpStream};
11use std::sync::Mutex;
12use std::time::Duration;
13
14use super::{get_socket_addr, Connectable};
15
16#[cfg(not(feature = "signing"))]
17use crate::{read_versioned_msg, write_versioned_msg};
18
19#[cfg(feature = "signing")]
20use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
21
22pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
23    let addr = get_socket_addr(&address)?;
24
25    let socket = TcpStream::connect(addr)?;
26    socket.set_read_timeout(Some(Duration::from_millis(100)))?;
27
28    Ok(TcpConnection {
29        reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
30        writer: Mutex::new(TcpWrite {
31            socket,
32            sequence: 0,
33        }),
34        protocol_version: MavlinkVersion::V2,
35        #[cfg(feature = "signing")]
36        signing_data: None,
37    })
38}
39
40pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
41    let addr = get_socket_addr(&address)?;
42    let listener = TcpListener::bind(addr)?;
43
44    //For now we only accept one incoming stream: this blocks until we get one
45    for incoming in listener.incoming() {
46        match incoming {
47            Ok(socket) => {
48                return Ok(TcpConnection {
49                    reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
50                    writer: Mutex::new(TcpWrite {
51                        socket,
52                        sequence: 0,
53                    }),
54                    protocol_version: MavlinkVersion::V2,
55                    #[cfg(feature = "signing")]
56                    signing_data: None,
57                })
58            }
59            Err(e) => {
60                //TODO don't println in lib
61                println!("listener err: {e}");
62            }
63        }
64    }
65    Err(io::Error::new(
66        io::ErrorKind::NotConnected,
67        "No incoming connections!",
68    ))
69}
70
71pub struct TcpConnection {
72    reader: Mutex<PeekReader<TcpStream>>,
73    writer: Mutex<TcpWrite>,
74    protocol_version: MavlinkVersion,
75    #[cfg(feature = "signing")]
76    signing_data: Option<SigningData>,
77}
78
79struct TcpWrite {
80    socket: TcpStream,
81    sequence: u8,
82}
83
84impl<M: Message> MavConnection<M> for TcpConnection {
85    fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
86        let mut reader = self.reader.lock().unwrap();
87        #[cfg(not(feature = "signing"))]
88        let result = read_versioned_msg(reader.deref_mut(), self.protocol_version);
89        #[cfg(feature = "signing")]
90        let result = read_versioned_msg_signed(
91            reader.deref_mut(),
92            self.protocol_version,
93            self.signing_data.as_ref(),
94        );
95        result
96    }
97
98    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
99        let mut lock = self.writer.lock().unwrap();
100
101        let header = MavHeader {
102            sequence: lock.sequence,
103            system_id: header.system_id,
104            component_id: header.component_id,
105        };
106
107        lock.sequence = lock.sequence.wrapping_add(1);
108        #[cfg(not(feature = "signing"))]
109        let result = write_versioned_msg(&mut lock.socket, self.protocol_version, header, data);
110        #[cfg(feature = "signing")]
111        let result = write_versioned_msg_signed(
112            &mut lock.socket,
113            self.protocol_version,
114            header,
115            data,
116            self.signing_data.as_ref(),
117        );
118        result
119    }
120
121    fn set_protocol_version(&mut self, version: MavlinkVersion) {
122        self.protocol_version = version;
123    }
124
125    fn protocol_version(&self) -> MavlinkVersion {
126        self.protocol_version
127    }
128
129    #[cfg(feature = "signing")]
130    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
131        self.signing_data = signing_data.map(SigningData::from_config)
132    }
133}
134
135impl Connectable for TcpConnectable {
136    fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
137        let conn = if self.is_out {
138            tcpout(&self.address)
139        } else {
140            tcpin(&self.address)
141        };
142        Ok(Box::new(conn?))
143    }
144}