mavlink_core/connection/
tcp.rs1use crate::connectable::TcpConnectable;
4use crate::connection::MavConnection;
5use crate::peek_reader::PeekReader;
6use crate::{MavHeader, MavlinkVersion, Message};
7use core::ops::DerefMut;
8use std::io;
9use std::net::ToSocketAddrs;
10use std::net::{TcpListener, TcpStream};
11use std::sync::Mutex;
12use std::time::Duration;
13
14use super::{get_socket_addr, Connectable};
15
16#[cfg(not(feature = "signing"))]
17use crate::{read_versioned_msg, write_versioned_msg};
18
19#[cfg(feature = "signing")]
20use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
21
22pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
23 let addr = get_socket_addr(&address)?;
24
25 let socket = TcpStream::connect(addr)?;
26 socket.set_read_timeout(Some(Duration::from_millis(100)))?;
27
28 Ok(TcpConnection {
29 reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
30 writer: Mutex::new(TcpWrite {
31 socket,
32 sequence: 0,
33 }),
34 protocol_version: MavlinkVersion::V2,
35 #[cfg(feature = "signing")]
36 signing_data: None,
37 })
38}
39
40pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
41 let addr = get_socket_addr(&address)?;
42 let listener = TcpListener::bind(addr)?;
43
44 for incoming in listener.incoming() {
46 match incoming {
47 Ok(socket) => {
48 return Ok(TcpConnection {
49 reader: Mutex::new(PeekReader::new(socket.try_clone()?)),
50 writer: Mutex::new(TcpWrite {
51 socket,
52 sequence: 0,
53 }),
54 protocol_version: MavlinkVersion::V2,
55 #[cfg(feature = "signing")]
56 signing_data: None,
57 })
58 }
59 Err(e) => {
60 println!("listener err: {e}");
62 }
63 }
64 }
65 Err(io::Error::new(
66 io::ErrorKind::NotConnected,
67 "No incoming connections!",
68 ))
69}
70
71pub struct TcpConnection {
72 reader: Mutex<PeekReader<TcpStream>>,
73 writer: Mutex<TcpWrite>,
74 protocol_version: MavlinkVersion,
75 #[cfg(feature = "signing")]
76 signing_data: Option<SigningData>,
77}
78
79struct TcpWrite {
80 socket: TcpStream,
81 sequence: u8,
82}
83
84impl<M: Message> MavConnection<M> for TcpConnection {
85 fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
86 let mut reader = self.reader.lock().unwrap();
87 #[cfg(not(feature = "signing"))]
88 let result = read_versioned_msg(reader.deref_mut(), self.protocol_version);
89 #[cfg(feature = "signing")]
90 let result = read_versioned_msg_signed(
91 reader.deref_mut(),
92 self.protocol_version,
93 self.signing_data.as_ref(),
94 );
95 result
96 }
97
98 fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
99 let mut lock = self.writer.lock().unwrap();
100
101 let header = MavHeader {
102 sequence: lock.sequence,
103 system_id: header.system_id,
104 component_id: header.component_id,
105 };
106
107 lock.sequence = lock.sequence.wrapping_add(1);
108 #[cfg(not(feature = "signing"))]
109 let result = write_versioned_msg(&mut lock.socket, self.protocol_version, header, data);
110 #[cfg(feature = "signing")]
111 let result = write_versioned_msg_signed(
112 &mut lock.socket,
113 self.protocol_version,
114 header,
115 data,
116 self.signing_data.as_ref(),
117 );
118 result
119 }
120
121 fn set_protocol_version(&mut self, version: MavlinkVersion) {
122 self.protocol_version = version;
123 }
124
125 fn protocol_version(&self) -> MavlinkVersion {
126 self.protocol_version
127 }
128
129 #[cfg(feature = "signing")]
130 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
131 self.signing_data = signing_data.map(SigningData::from_config)
132 }
133}
134
135impl Connectable for TcpConnectable {
136 fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
137 let conn = if self.is_out {
138 tcpout(&self.address)
139 } else {
140 tcpin(&self.address)
141 };
142 Ok(Box::new(conn?))
143 }
144}