mavlink_core/connection/
tcp.rs1use crate::connectable::TcpConnectable;
4use crate::connection::MavConnection;
5use crate::peek_reader::PeekReader;
6use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
7use core::ops::DerefMut;
8use std::io;
9use std::net::ToSocketAddrs;
10use std::net::{TcpListener, TcpStream};
11use std::sync::Mutex;
12use std::time::Duration;
13
14use super::{get_socket_addr, Connectable};
15
16#[cfg(not(feature = "signing"))]
17use crate::{read_versioned_msg, write_versioned_msg};
18
19#[cfg(feature = "signing")]
20use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
21
22pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
23 let addr = get_socket_addr(&address)?;
24
25 let socket = TcpStream::connect(addr)?;
26 socket.set_read_timeout(Some(Duration::from_millis(100)))?;
27
28 Ok(TcpConnection {
29 reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
30 writer: Mutex::new(TcpWrite {
31 socket,
32 sequence: 0,
33 }),
34 protocol_version: MavlinkVersion::V2,
35 recv_any_version: false,
36 #[cfg(feature = "signing")]
37 signing_data: None,
38 })
39}
40
41pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
42 let addr = get_socket_addr(&address)?;
43 let listener = TcpListener::bind(addr)?;
44
45 for incoming in listener.incoming() {
47 match incoming {
48 Ok(socket) => {
49 return Ok(TcpConnection {
50 reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
51 writer: Mutex::new(TcpWrite {
52 socket,
53 sequence: 0,
54 }),
55 protocol_version: MavlinkVersion::V2,
56 recv_any_version: false,
57 #[cfg(feature = "signing")]
58 signing_data: None,
59 })
60 }
61 Err(e) => {
62 println!("listener err: {e}");
64 }
65 }
66 }
67 Err(io::Error::new(
68 io::ErrorKind::NotConnected,
69 "No incoming connections!",
70 ))
71}
72
73pub struct TcpConnection {
74 reader: Mutex<PeekReader<TcpStream>>,
75 writer: Mutex<TcpWrite>,
76 protocol_version: MavlinkVersion,
77 recv_any_version: bool,
78 #[cfg(feature = "signing")]
79 signing_data: Option<SigningData>,
80}
81
82struct TcpWrite {
83 socket: TcpStream,
84 sequence: u8,
85}
86
87impl<M: Message> MavConnection<M> for TcpConnection {
88 fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
89 let mut reader = self.reader.lock().unwrap();
90 let version = ReadVersion::from_conn_cfg::<_, M>(self);
91 #[cfg(not(feature = "signing"))]
92 let result = read_versioned_msg(reader.deref_mut(), version);
93 #[cfg(feature = "signing")]
94 let result =
95 read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
96 result
97 }
98
99 fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
100 let mut lock = self.writer.lock().unwrap();
101
102 let header = MavHeader {
103 sequence: lock.sequence,
104 system_id: header.system_id,
105 component_id: header.component_id,
106 };
107
108 lock.sequence = lock.sequence.wrapping_add(1);
109 #[cfg(not(feature = "signing"))]
110 let result = write_versioned_msg(&mut lock.socket, self.protocol_version, header, data);
111 #[cfg(feature = "signing")]
112 let result = write_versioned_msg_signed(
113 &mut lock.socket,
114 self.protocol_version,
115 header,
116 data,
117 self.signing_data.as_ref(),
118 );
119 result
120 }
121
122 fn set_protocol_version(&mut self, version: MavlinkVersion) {
123 self.protocol_version = version;
124 }
125
126 fn protocol_version(&self) -> MavlinkVersion {
127 self.protocol_version
128 }
129
130 fn set_allow_recv_any_version(&mut self, allow: bool) {
131 self.recv_any_version = allow
132 }
133
134 fn allow_recv_any_version(&self) -> bool {
135 self.recv_any_version
136 }
137
138 #[cfg(feature = "signing")]
139 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
140 self.signing_data = signing_data.map(SigningData::from_config)
141 }
142}
143
144impl Connectable for TcpConnectable {
145 fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
146 let conn = if self.is_out {
147 tcpout(&self.address)
148 } else {
149 tcpin(&self.address)
150 };
151 Ok(Box::new(conn?))
152 }
153}