Skip to main content

mavlink_core/connection/tcp/
sync.rs

1//! TCP MAVLink connection
2
3use crate::Connectable;
4use crate::MAVLinkMessageRaw;
5use crate::connection::get_socket_addr;
6use crate::connection::{Connection, MavConnection};
7use crate::connection_shared::{
8    ConnectionState, next_send_header, read_message, read_raw_message, write_message,
9    write_raw_message,
10};
11use crate::peek_reader::PeekReader;
12use crate::{MavHeader, MavlinkVersion, Message};
13use core::ops::DerefMut;
14use std::io;
15use std::net::ToSocketAddrs;
16use std::net::{TcpListener, TcpStream};
17use std::sync::Mutex;
18use std::time::Duration;
19
20#[cfg(feature = "mav2-message-signing")]
21use crate::SigningConfig;
22
23use super::config::{TcpConfig, TcpMode};
24
25pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
26    let addr = get_socket_addr(&address)?;
27
28    let socket = TcpStream::connect(addr)?;
29    socket.set_read_timeout(Some(Duration::from_millis(100)))?;
30
31    Ok(TcpConnection {
32        reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
33        writer: Mutex::new(TcpWrite {
34            socket,
35            sequence: 0,
36        }),
37        state: ConnectionState::new(),
38    })
39}
40
41pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
42    let addr = get_socket_addr(&address)?;
43    let listener = TcpListener::bind(addr)?;
44
45    //For now we only accept one incoming stream: this blocks until we get one
46    for incoming in listener.incoming() {
47        match incoming {
48            Ok(socket) => {
49                return Ok(TcpConnection {
50                    reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
51                    writer: Mutex::new(TcpWrite {
52                        socket,
53                        sequence: 0,
54                    }),
55                    state: ConnectionState::new(),
56                });
57            }
58            Err(e) => {
59                //TODO don't println in lib
60                println!("listener err: {e}");
61            }
62        }
63    }
64    Err(io::Error::new(
65        io::ErrorKind::NotConnected,
66        "No incoming connections!",
67    ))
68}
69
70pub struct TcpConnection {
71    reader: Mutex<PeekReader<TcpStream>>,
72    writer: Mutex<TcpWrite>,
73    state: ConnectionState,
74}
75
76struct TcpWrite {
77    socket: TcpStream,
78    sequence: u8,
79}
80
81impl<M: Message> MavConnection<M> for TcpConnection {
82    fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
83        let mut reader = self.reader.lock().unwrap();
84        read_message::<M, _>(reader.deref_mut(), &self.state)
85    }
86
87    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
88        let mut reader = self.reader.lock().unwrap();
89        read_raw_message::<M, _>(reader.deref_mut(), &self.state)
90    }
91
92    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
93        let mut reader = self.reader.lock().unwrap();
94        reader.reader_mut().set_nonblocking(true)?;
95
96        let result = read_message::<M, _>(reader.deref_mut(), &self.state);
97
98        reader.reader_mut().set_nonblocking(false)?;
99
100        result
101    }
102
103    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
104        let mut lock = self.writer.lock().unwrap();
105
106        let header = next_send_header(&mut lock.sequence, header);
107        write_message(&mut lock.socket, &self.state, header, data)
108    }
109
110    fn send_raw(&self, data: &MAVLinkMessageRaw) -> Result<usize, crate::error::MessageWriteError> {
111        let mut lock = self.writer.lock().unwrap();
112        write_raw_message(&mut lock.socket, data)
113    }
114
115    fn set_protocol_version(&mut self, version: MavlinkVersion) {
116        self.state.set_protocol_version(version);
117    }
118
119    fn protocol_version(&self) -> MavlinkVersion {
120        self.state.protocol_version()
121    }
122
123    fn set_allow_recv_any_version(&mut self, allow: bool) {
124        self.state.set_allow_recv_any_version(allow);
125    }
126
127    fn allow_recv_any_version(&self) -> bool {
128        self.state.allow_recv_any_version()
129    }
130
131    #[cfg(feature = "mav2-message-signing")]
132    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
133        self.state.setup_signing(signing_data);
134    }
135}
136
137impl Connectable for TcpConfig {
138    fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
139        let conn = match self.mode {
140            TcpMode::TcpIn => tcpin(&self.address),
141            TcpMode::TcpOut => tcpout(&self.address),
142        };
143
144        Ok(conn?.into())
145    }
146}