mavlink_core/connection/
tcp.rs

1//! TCP MAVLink connection
2
3use crate::connection::get_socket_addr;
4use crate::connection::MavConnection;
5use crate::peek_reader::PeekReader;
6#[cfg(not(feature = "signing"))]
7use crate::read_raw_versioned_msg;
8#[cfg(feature = "signing")]
9use crate::read_raw_versioned_msg_signed;
10use crate::Connectable;
11use crate::MAVLinkMessageRaw;
12use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
13use core::ops::DerefMut;
14use std::io;
15use std::net::ToSocketAddrs;
16use std::net::{TcpListener, TcpStream};
17use std::sync::Mutex;
18use std::time::Duration;
19
20#[cfg(not(feature = "signing"))]
21use crate::{read_versioned_msg, write_versioned_msg};
22
23#[cfg(feature = "signing")]
24use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
25
26pub mod config;
27
28use config::{TcpConfig, TcpMode};
29
30pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
31    let addr = get_socket_addr(&address)?;
32
33    let socket = TcpStream::connect(addr)?;
34    socket.set_read_timeout(Some(Duration::from_millis(100)))?;
35
36    Ok(TcpConnection {
37        reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
38        writer: Mutex::new(TcpWrite {
39            socket,
40            sequence: 0,
41        }),
42        protocol_version: MavlinkVersion::V2,
43        recv_any_version: false,
44        #[cfg(feature = "signing")]
45        signing_data: None,
46    })
47}
48
49pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
50    let addr = get_socket_addr(&address)?;
51    let listener = TcpListener::bind(addr)?;
52
53    //For now we only accept one incoming stream: this blocks until we get one
54    for incoming in listener.incoming() {
55        match incoming {
56            Ok(socket) => {
57                return Ok(TcpConnection {
58                    reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
59                    writer: Mutex::new(TcpWrite {
60                        socket,
61                        sequence: 0,
62                    }),
63                    protocol_version: MavlinkVersion::V2,
64                    recv_any_version: false,
65                    #[cfg(feature = "signing")]
66                    signing_data: None,
67                })
68            }
69            Err(e) => {
70                //TODO don't println in lib
71                println!("listener err: {e}");
72            }
73        }
74    }
75    Err(io::Error::new(
76        io::ErrorKind::NotConnected,
77        "No incoming connections!",
78    ))
79}
80
81pub struct TcpConnection {
82    reader: Mutex<PeekReader<TcpStream>>,
83    writer: Mutex<TcpWrite>,
84    protocol_version: MavlinkVersion,
85    recv_any_version: bool,
86    #[cfg(feature = "signing")]
87    signing_data: Option<SigningData>,
88}
89
90struct TcpWrite {
91    socket: TcpStream,
92    sequence: u8,
93}
94
95impl<M: Message> MavConnection<M> for TcpConnection {
96    fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
97        let mut reader = self.reader.lock().unwrap();
98        let version = ReadVersion::from_conn_cfg::<_, M>(self);
99        #[cfg(not(feature = "signing"))]
100        let result = read_versioned_msg(reader.deref_mut(), version);
101        #[cfg(feature = "signing")]
102        let result =
103            read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
104        result
105    }
106
107    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
108        let mut reader = self.reader.lock().unwrap();
109        let version = ReadVersion::from_conn_cfg::<_, M>(self);
110        #[cfg(not(feature = "signing"))]
111        let result = read_raw_versioned_msg::<M, _>(reader.deref_mut(), version);
112        #[cfg(feature = "signing")]
113        let result = read_raw_versioned_msg_signed::<M, _>(
114            reader.deref_mut(),
115            version,
116            self.signing_data.as_ref(),
117        );
118        result
119    }
120
121    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
122        let mut reader = self.reader.lock().unwrap();
123        reader.reader_mut().set_nonblocking(true)?;
124
125        let version = ReadVersion::from_conn_cfg::<_, M>(self);
126
127        #[cfg(not(feature = "signing"))]
128        let result = read_versioned_msg(reader.deref_mut(), version);
129
130        #[cfg(feature = "signing")]
131        let result =
132            read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
133
134        reader.reader_mut().set_nonblocking(false)?;
135
136        result
137    }
138
139    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
140        let mut lock = self.writer.lock().unwrap();
141
142        let header = MavHeader {
143            sequence: lock.sequence,
144            system_id: header.system_id,
145            component_id: header.component_id,
146        };
147
148        lock.sequence = lock.sequence.wrapping_add(1);
149        #[cfg(not(feature = "signing"))]
150        let result = write_versioned_msg(&mut lock.socket, self.protocol_version, header, data);
151        #[cfg(feature = "signing")]
152        let result = write_versioned_msg_signed(
153            &mut lock.socket,
154            self.protocol_version,
155            header,
156            data,
157            self.signing_data.as_ref(),
158        );
159        result
160    }
161
162    fn set_protocol_version(&mut self, version: MavlinkVersion) {
163        self.protocol_version = version;
164    }
165
166    fn protocol_version(&self) -> MavlinkVersion {
167        self.protocol_version
168    }
169
170    fn set_allow_recv_any_version(&mut self, allow: bool) {
171        self.recv_any_version = allow;
172    }
173
174    fn allow_recv_any_version(&self) -> bool {
175        self.recv_any_version
176    }
177
178    #[cfg(feature = "signing")]
179    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
180        self.signing_data = signing_data.map(SigningData::from_config);
181    }
182}
183
184impl Connectable for TcpConfig {
185    fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
186        let conn = match self.mode {
187            TcpMode::TcpIn => tcpin(&self.address),
188            TcpMode::TcpOut => tcpout(&self.address),
189        };
190
191        Ok(Box::new(conn?))
192    }
193}