Skip to main content

mavlink_core/connection/tcp/
sync.rs

1//! TCP MAVLink connection
2
3use crate::Connectable;
4use crate::MAVLinkMessageRaw;
5use crate::connection::get_socket_addr;
6use crate::connection::{Connection, MavConnection};
7use crate::connection_shared::{
8    ConnectionState, next_send_header, read_message, read_raw_message, write_message,
9};
10use crate::peek_reader::PeekReader;
11use crate::{MavHeader, MavlinkVersion, Message};
12use core::ops::DerefMut;
13use std::io;
14use std::net::ToSocketAddrs;
15use std::net::{TcpListener, TcpStream};
16use std::sync::Mutex;
17use std::time::Duration;
18
19#[cfg(feature = "mav2-message-signing")]
20use crate::SigningConfig;
21
22use super::config::{TcpConfig, TcpMode};
23
24pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
25    let addr = get_socket_addr(&address)?;
26
27    let socket = TcpStream::connect(addr)?;
28    socket.set_read_timeout(Some(Duration::from_millis(100)))?;
29
30    Ok(TcpConnection {
31        reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
32        writer: Mutex::new(TcpWrite {
33            socket,
34            sequence: 0,
35        }),
36        state: ConnectionState::new(),
37    })
38}
39
40pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
41    let addr = get_socket_addr(&address)?;
42    let listener = TcpListener::bind(addr)?;
43
44    //For now we only accept one incoming stream: this blocks until we get one
45    for incoming in listener.incoming() {
46        match incoming {
47            Ok(socket) => {
48                return Ok(TcpConnection {
49                    reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
50                    writer: Mutex::new(TcpWrite {
51                        socket,
52                        sequence: 0,
53                    }),
54                    state: ConnectionState::new(),
55                });
56            }
57            Err(e) => {
58                //TODO don't println in lib
59                println!("listener err: {e}");
60            }
61        }
62    }
63    Err(io::Error::new(
64        io::ErrorKind::NotConnected,
65        "No incoming connections!",
66    ))
67}
68
69pub struct TcpConnection {
70    reader: Mutex<PeekReader<TcpStream>>,
71    writer: Mutex<TcpWrite>,
72    state: ConnectionState,
73}
74
75struct TcpWrite {
76    socket: TcpStream,
77    sequence: u8,
78}
79
80impl<M: Message> MavConnection<M> for TcpConnection {
81    fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
82        let mut reader = self.reader.lock().unwrap();
83        read_message::<M, _>(reader.deref_mut(), &self.state)
84    }
85
86    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
87        let mut reader = self.reader.lock().unwrap();
88        read_raw_message::<M, _>(reader.deref_mut(), &self.state)
89    }
90
91    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
92        let mut reader = self.reader.lock().unwrap();
93        reader.reader_mut().set_nonblocking(true)?;
94
95        let result = read_message::<M, _>(reader.deref_mut(), &self.state);
96
97        reader.reader_mut().set_nonblocking(false)?;
98
99        result
100    }
101
102    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
103        let mut lock = self.writer.lock().unwrap();
104
105        let header = next_send_header(&mut lock.sequence, header);
106        write_message(&mut lock.socket, &self.state, header, data)
107    }
108
109    fn set_protocol_version(&mut self, version: MavlinkVersion) {
110        self.state.set_protocol_version(version);
111    }
112
113    fn protocol_version(&self) -> MavlinkVersion {
114        self.state.protocol_version()
115    }
116
117    fn set_allow_recv_any_version(&mut self, allow: bool) {
118        self.state.set_allow_recv_any_version(allow);
119    }
120
121    fn allow_recv_any_version(&self) -> bool {
122        self.state.allow_recv_any_version()
123    }
124
125    #[cfg(feature = "mav2-message-signing")]
126    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
127        self.state.setup_signing(signing_data);
128    }
129}
130
131impl Connectable for TcpConfig {
132    fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
133        let conn = match self.mode {
134            TcpMode::TcpIn => tcpin(&self.address),
135            TcpMode::TcpOut => tcpout(&self.address),
136        };
137
138        Ok(conn?.into())
139    }
140}