mavlink_core/connection/
tcp.rs

1//! TCP MAVLink connection
2
3use crate::connection::get_socket_addr;
4use crate::connection::MavConnection;
5use crate::peek_reader::PeekReader;
6#[cfg(not(feature = "signing"))]
7use crate::read_raw_versioned_msg;
8#[cfg(feature = "signing")]
9use crate::read_raw_versioned_msg_signed;
10use crate::Connectable;
11use crate::MAVLinkMessageRaw;
12use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
13use core::ops::DerefMut;
14use std::io;
15use std::net::ToSocketAddrs;
16use std::net::{TcpListener, TcpStream};
17use std::sync::Mutex;
18use std::time::Duration;
19
20#[cfg(not(feature = "signing"))]
21use crate::{read_versioned_msg, write_versioned_msg};
22
23#[cfg(feature = "signing")]
24use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
25
26pub mod config;
27
28use config::{TcpConfig, TcpMode};
29
30pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
31    let addr = get_socket_addr(&address)?;
32
33    let socket = TcpStream::connect(addr)?;
34    socket.set_read_timeout(Some(Duration::from_millis(100)))?;
35
36    Ok(TcpConnection {
37        reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
38        writer: Mutex::new(TcpWrite {
39            socket,
40            sequence: 0,
41        }),
42        protocol_version: MavlinkVersion::V2,
43        recv_any_version: false,
44        #[cfg(feature = "signing")]
45        signing_data: None,
46    })
47}
48
49pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
50    let addr = get_socket_addr(&address)?;
51    let listener = TcpListener::bind(addr)?;
52
53    //For now we only accept one incoming stream: this blocks until we get one
54    for incoming in listener.incoming() {
55        match incoming {
56            Ok(socket) => {
57                return Ok(TcpConnection {
58                    reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
59                    writer: Mutex::new(TcpWrite {
60                        socket,
61                        sequence: 0,
62                    }),
63                    protocol_version: MavlinkVersion::V2,
64                    recv_any_version: false,
65                    #[cfg(feature = "signing")]
66                    signing_data: None,
67                })
68            }
69            Err(e) => {
70                //TODO don't println in lib
71                println!("listener err: {e}");
72            }
73        }
74    }
75    Err(io::Error::new(
76        io::ErrorKind::NotConnected,
77        "No incoming connections!",
78    ))
79}
80
81pub struct TcpConnection {
82    reader: Mutex<PeekReader<TcpStream>>,
83    writer: Mutex<TcpWrite>,
84    protocol_version: MavlinkVersion,
85    recv_any_version: bool,
86    #[cfg(feature = "signing")]
87    signing_data: Option<SigningData>,
88}
89
90struct TcpWrite {
91    socket: TcpStream,
92    sequence: u8,
93}
94
95impl<M: Message> MavConnection<M> for TcpConnection {
96    fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
97        let mut reader = self.reader.lock().unwrap();
98        let version = ReadVersion::from_conn_cfg::<_, M>(self);
99        #[cfg(not(feature = "signing"))]
100        let result = read_versioned_msg(reader.deref_mut(), version);
101        #[cfg(feature = "signing")]
102        let result =
103            read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
104        result
105    }
106
107    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
108        let mut reader = self.reader.lock().unwrap();
109        let version = ReadVersion::from_conn_cfg::<_, M>(self);
110        #[cfg(not(feature = "signing"))]
111        let result = read_raw_versioned_msg::<M, _>(reader.deref_mut(), version);
112        #[cfg(feature = "signing")]
113        let result = read_raw_versioned_msg_signed::<M, _>(
114            reader.deref_mut(),
115            version,
116            self.signing_data.as_ref(),
117        );
118        result
119    }
120
121    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
122        self.recv()
123    }
124
125    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
126        let mut lock = self.writer.lock().unwrap();
127
128        let header = MavHeader {
129            sequence: lock.sequence,
130            system_id: header.system_id,
131            component_id: header.component_id,
132        };
133
134        lock.sequence = lock.sequence.wrapping_add(1);
135        #[cfg(not(feature = "signing"))]
136        let result = write_versioned_msg(&mut lock.socket, self.protocol_version, header, data);
137        #[cfg(feature = "signing")]
138        let result = write_versioned_msg_signed(
139            &mut lock.socket,
140            self.protocol_version,
141            header,
142            data,
143            self.signing_data.as_ref(),
144        );
145        result
146    }
147
148    fn set_protocol_version(&mut self, version: MavlinkVersion) {
149        self.protocol_version = version;
150    }
151
152    fn protocol_version(&self) -> MavlinkVersion {
153        self.protocol_version
154    }
155
156    fn set_allow_recv_any_version(&mut self, allow: bool) {
157        self.recv_any_version = allow;
158    }
159
160    fn allow_recv_any_version(&self) -> bool {
161        self.recv_any_version
162    }
163
164    #[cfg(feature = "signing")]
165    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
166        self.signing_data = signing_data.map(SigningData::from_config);
167    }
168}
169
170impl Connectable for TcpConfig {
171    fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
172        let conn = match self.mode {
173            TcpMode::TcpIn => tcpin(&self.address),
174            TcpMode::TcpOut => tcpout(&self.address),
175        };
176
177        Ok(Box::new(conn?))
178    }
179}