mavlink_core/connection/tcp/
sync.rs1use crate::Connectable;
4use crate::MAVLinkMessageRaw;
5use crate::connection::get_socket_addr;
6use crate::connection::{Connection, MavConnection};
7use crate::connection_shared::{
8 ConnectionState, next_send_header, read_message, read_raw_message, write_message,
9};
10use crate::peek_reader::PeekReader;
11use crate::{MavHeader, MavlinkVersion, Message};
12use core::ops::DerefMut;
13use std::io;
14use std::net::ToSocketAddrs;
15use std::net::{TcpListener, TcpStream};
16use std::sync::Mutex;
17use std::time::Duration;
18
19#[cfg(feature = "mav2-message-signing")]
20use crate::SigningConfig;
21
22use super::config::{TcpConfig, TcpMode};
23
24pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
25 let addr = get_socket_addr(&address)?;
26
27 let socket = TcpStream::connect(addr)?;
28 socket.set_read_timeout(Some(Duration::from_millis(100)))?;
29
30 Ok(TcpConnection {
31 reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
32 writer: Mutex::new(TcpWrite {
33 socket,
34 sequence: 0,
35 }),
36 state: ConnectionState::new(),
37 })
38}
39
40pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
41 let addr = get_socket_addr(&address)?;
42 let listener = TcpListener::bind(addr)?;
43
44 for incoming in listener.incoming() {
46 match incoming {
47 Ok(socket) => {
48 return Ok(TcpConnection {
49 reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
50 writer: Mutex::new(TcpWrite {
51 socket,
52 sequence: 0,
53 }),
54 state: ConnectionState::new(),
55 });
56 }
57 Err(e) => {
58 println!("listener err: {e}");
60 }
61 }
62 }
63 Err(io::Error::new(
64 io::ErrorKind::NotConnected,
65 "No incoming connections!",
66 ))
67}
68
69pub struct TcpConnection {
70 reader: Mutex<PeekReader<TcpStream>>,
71 writer: Mutex<TcpWrite>,
72 state: ConnectionState,
73}
74
75struct TcpWrite {
76 socket: TcpStream,
77 sequence: u8,
78}
79
80impl<M: Message> MavConnection<M> for TcpConnection {
81 fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
82 let mut reader = self.reader.lock().unwrap();
83 read_message::<M, _>(reader.deref_mut(), &self.state)
84 }
85
86 fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
87 let mut reader = self.reader.lock().unwrap();
88 read_raw_message::<M, _>(reader.deref_mut(), &self.state)
89 }
90
91 fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
92 let mut reader = self.reader.lock().unwrap();
93 reader.reader_mut().set_nonblocking(true)?;
94
95 let result = read_message::<M, _>(reader.deref_mut(), &self.state);
96
97 reader.reader_mut().set_nonblocking(false)?;
98
99 result
100 }
101
102 fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
103 let mut lock = self.writer.lock().unwrap();
104
105 let header = next_send_header(&mut lock.sequence, header);
106 write_message(&mut lock.socket, &self.state, header, data)
107 }
108
109 fn set_protocol_version(&mut self, version: MavlinkVersion) {
110 self.state.set_protocol_version(version);
111 }
112
113 fn protocol_version(&self) -> MavlinkVersion {
114 self.state.protocol_version()
115 }
116
117 fn set_allow_recv_any_version(&mut self, allow: bool) {
118 self.state.set_allow_recv_any_version(allow);
119 }
120
121 fn allow_recv_any_version(&self) -> bool {
122 self.state.allow_recv_any_version()
123 }
124
125 #[cfg(feature = "mav2-message-signing")]
126 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
127 self.state.setup_signing(signing_data);
128 }
129}
130
131impl Connectable for TcpConfig {
132 fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
133 let conn = match self.mode {
134 TcpMode::TcpIn => tcpin(&self.address),
135 TcpMode::TcpOut => tcpout(&self.address),
136 };
137
138 Ok(conn?.into())
139 }
140}