mavlink_core/connection/
tcp.rs1use crate::connection::get_socket_addr;
4use crate::connection::MavConnection;
5use crate::peek_reader::PeekReader;
6#[cfg(not(feature = "signing"))]
7use crate::read_raw_versioned_msg;
8#[cfg(feature = "signing")]
9use crate::read_raw_versioned_msg_signed;
10use crate::Connectable;
11use crate::MAVLinkMessageRaw;
12use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
13use core::ops::DerefMut;
14use std::io;
15use std::net::ToSocketAddrs;
16use std::net::{TcpListener, TcpStream};
17use std::sync::Mutex;
18use std::time::Duration;
19
20#[cfg(not(feature = "signing"))]
21use crate::{read_versioned_msg, write_versioned_msg};
22
23#[cfg(feature = "signing")]
24use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
25
26pub mod config;
27
28use config::{TcpConfig, TcpMode};
29
30pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
31 let addr = get_socket_addr(&address)?;
32
33 let socket = TcpStream::connect(addr)?;
34 socket.set_read_timeout(Some(Duration::from_millis(100)))?;
35
36 Ok(TcpConnection {
37 reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
38 writer: Mutex::new(TcpWrite {
39 socket,
40 sequence: 0,
41 }),
42 protocol_version: MavlinkVersion::V2,
43 recv_any_version: false,
44 #[cfg(feature = "signing")]
45 signing_data: None,
46 })
47}
48
49pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
50 let addr = get_socket_addr(&address)?;
51 let listener = TcpListener::bind(addr)?;
52
53 for incoming in listener.incoming() {
55 match incoming {
56 Ok(socket) => {
57 return Ok(TcpConnection {
58 reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
59 writer: Mutex::new(TcpWrite {
60 socket,
61 sequence: 0,
62 }),
63 protocol_version: MavlinkVersion::V2,
64 recv_any_version: false,
65 #[cfg(feature = "signing")]
66 signing_data: None,
67 })
68 }
69 Err(e) => {
70 println!("listener err: {e}");
72 }
73 }
74 }
75 Err(io::Error::new(
76 io::ErrorKind::NotConnected,
77 "No incoming connections!",
78 ))
79}
80
81pub struct TcpConnection {
82 reader: Mutex<PeekReader<TcpStream>>,
83 writer: Mutex<TcpWrite>,
84 protocol_version: MavlinkVersion,
85 recv_any_version: bool,
86 #[cfg(feature = "signing")]
87 signing_data: Option<SigningData>,
88}
89
90struct TcpWrite {
91 socket: TcpStream,
92 sequence: u8,
93}
94
95impl<M: Message> MavConnection<M> for TcpConnection {
96 fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
97 let mut reader = self.reader.lock().unwrap();
98 let version = ReadVersion::from_conn_cfg::<_, M>(self);
99 #[cfg(not(feature = "signing"))]
100 let result = read_versioned_msg(reader.deref_mut(), version);
101 #[cfg(feature = "signing")]
102 let result =
103 read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
104 result
105 }
106
107 fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
108 let mut reader = self.reader.lock().unwrap();
109 let version = ReadVersion::from_conn_cfg::<_, M>(self);
110 #[cfg(not(feature = "signing"))]
111 let result = read_raw_versioned_msg::<M, _>(reader.deref_mut(), version);
112 #[cfg(feature = "signing")]
113 let result = read_raw_versioned_msg_signed::<M, _>(
114 reader.deref_mut(),
115 version,
116 self.signing_data.as_ref(),
117 );
118 result
119 }
120
121 fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
122 self.recv()
123 }
124
125 fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
126 let mut lock = self.writer.lock().unwrap();
127
128 let header = MavHeader {
129 sequence: lock.sequence,
130 system_id: header.system_id,
131 component_id: header.component_id,
132 };
133
134 lock.sequence = lock.sequence.wrapping_add(1);
135 #[cfg(not(feature = "signing"))]
136 let result = write_versioned_msg(&mut lock.socket, self.protocol_version, header, data);
137 #[cfg(feature = "signing")]
138 let result = write_versioned_msg_signed(
139 &mut lock.socket,
140 self.protocol_version,
141 header,
142 data,
143 self.signing_data.as_ref(),
144 );
145 result
146 }
147
148 fn set_protocol_version(&mut self, version: MavlinkVersion) {
149 self.protocol_version = version;
150 }
151
152 fn protocol_version(&self) -> MavlinkVersion {
153 self.protocol_version
154 }
155
156 fn set_allow_recv_any_version(&mut self, allow: bool) {
157 self.recv_any_version = allow;
158 }
159
160 fn allow_recv_any_version(&self) -> bool {
161 self.recv_any_version
162 }
163
164 #[cfg(feature = "signing")]
165 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
166 self.signing_data = signing_data.map(SigningData::from_config);
167 }
168}
169
170impl Connectable for TcpConfig {
171 fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
172 let conn = match self.mode {
173 TcpMode::TcpIn => tcpin(&self.address),
174 TcpMode::TcpOut => tcpout(&self.address),
175 };
176
177 Ok(Box::new(conn?))
178 }
179}