mavlink_core/connection/
tcp.rs1use crate::connection::get_socket_addr;
4use crate::connection::MavConnection;
5use crate::peek_reader::PeekReader;
6#[cfg(not(feature = "signing"))]
7use crate::read_raw_versioned_msg;
8#[cfg(feature = "signing")]
9use crate::read_raw_versioned_msg_signed;
10use crate::Connectable;
11use crate::MAVLinkMessageRaw;
12use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
13use core::ops::DerefMut;
14use std::io;
15use std::net::ToSocketAddrs;
16use std::net::{TcpListener, TcpStream};
17use std::sync::Mutex;
18use std::time::Duration;
19
20#[cfg(not(feature = "signing"))]
21use crate::{read_versioned_msg, write_versioned_msg};
22
23#[cfg(feature = "signing")]
24use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
25
26pub mod config;
27
28use config::{TcpConfig, TcpMode};
29
30pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
31 let addr = get_socket_addr(&address)?;
32
33 let socket = TcpStream::connect(addr)?;
34 socket.set_read_timeout(Some(Duration::from_millis(100)))?;
35
36 Ok(TcpConnection {
37 reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
38 writer: Mutex::new(TcpWrite {
39 socket,
40 sequence: 0,
41 }),
42 protocol_version: MavlinkVersion::V2,
43 recv_any_version: false,
44 #[cfg(feature = "signing")]
45 signing_data: None,
46 })
47}
48
49pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
50 let addr = get_socket_addr(&address)?;
51 let listener = TcpListener::bind(addr)?;
52
53 for incoming in listener.incoming() {
55 match incoming {
56 Ok(socket) => {
57 return Ok(TcpConnection {
58 reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
59 writer: Mutex::new(TcpWrite {
60 socket,
61 sequence: 0,
62 }),
63 protocol_version: MavlinkVersion::V2,
64 recv_any_version: false,
65 #[cfg(feature = "signing")]
66 signing_data: None,
67 })
68 }
69 Err(e) => {
70 println!("listener err: {e}");
72 }
73 }
74 }
75 Err(io::Error::new(
76 io::ErrorKind::NotConnected,
77 "No incoming connections!",
78 ))
79}
80
81pub struct TcpConnection {
82 reader: Mutex<PeekReader<TcpStream>>,
83 writer: Mutex<TcpWrite>,
84 protocol_version: MavlinkVersion,
85 recv_any_version: bool,
86 #[cfg(feature = "signing")]
87 signing_data: Option<SigningData>,
88}
89
90struct TcpWrite {
91 socket: TcpStream,
92 sequence: u8,
93}
94
95impl<M: Message> MavConnection<M> for TcpConnection {
96 fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
97 let mut reader = self.reader.lock().unwrap();
98 let version = ReadVersion::from_conn_cfg::<_, M>(self);
99 #[cfg(not(feature = "signing"))]
100 let result = read_versioned_msg(reader.deref_mut(), version);
101 #[cfg(feature = "signing")]
102 let result =
103 read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
104 result
105 }
106
107 fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
108 let mut reader = self.reader.lock().unwrap();
109 let version = ReadVersion::from_conn_cfg::<_, M>(self);
110 #[cfg(not(feature = "signing"))]
111 let result = read_raw_versioned_msg::<M, _>(reader.deref_mut(), version);
112 #[cfg(feature = "signing")]
113 let result = read_raw_versioned_msg_signed::<M, _>(
114 reader.deref_mut(),
115 version,
116 self.signing_data.as_ref(),
117 );
118 result
119 }
120
121 fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
122 let mut reader = self.reader.lock().unwrap();
123 reader.reader_mut().set_nonblocking(true)?;
124
125 let version = ReadVersion::from_conn_cfg::<_, M>(self);
126
127 #[cfg(not(feature = "signing"))]
128 let result = read_versioned_msg(reader.deref_mut(), version);
129
130 #[cfg(feature = "signing")]
131 let result =
132 read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
133
134 reader.reader_mut().set_nonblocking(false)?;
135
136 result
137 }
138
139 fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
140 let mut lock = self.writer.lock().unwrap();
141
142 let header = MavHeader {
143 sequence: lock.sequence,
144 system_id: header.system_id,
145 component_id: header.component_id,
146 };
147
148 lock.sequence = lock.sequence.wrapping_add(1);
149 #[cfg(not(feature = "signing"))]
150 let result = write_versioned_msg(&mut lock.socket, self.protocol_version, header, data);
151 #[cfg(feature = "signing")]
152 let result = write_versioned_msg_signed(
153 &mut lock.socket,
154 self.protocol_version,
155 header,
156 data,
157 self.signing_data.as_ref(),
158 );
159 result
160 }
161
162 fn set_protocol_version(&mut self, version: MavlinkVersion) {
163 self.protocol_version = version;
164 }
165
166 fn protocol_version(&self) -> MavlinkVersion {
167 self.protocol_version
168 }
169
170 fn set_allow_recv_any_version(&mut self, allow: bool) {
171 self.recv_any_version = allow;
172 }
173
174 fn allow_recv_any_version(&self) -> bool {
175 self.recv_any_version
176 }
177
178 #[cfg(feature = "signing")]
179 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
180 self.signing_data = signing_data.map(SigningData::from_config);
181 }
182}
183
184impl Connectable for TcpConfig {
185 fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
186 let conn = match self.mode {
187 TcpMode::TcpIn => tcpin(&self.address),
188 TcpMode::TcpOut => tcpout(&self.address),
189 };
190
191 Ok(Box::new(conn?))
192 }
193}