mavlink_core/async_connection/
tcp.rs1use std::io;
4
5use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection};
6use crate::async_peek_reader::AsyncPeekReader;
7use crate::connection::tcp::config::{TcpConfig, TcpMode};
8use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message, ReadVersion};
9
10use async_trait::async_trait;
11use core::ops::DerefMut;
12use futures::lock::Mutex;
13use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
14use tokio::net::{TcpListener, TcpStream};
15
16#[cfg(not(feature = "signing"))]
17use crate::{read_raw_versioned_msg_async, read_versioned_msg_async, write_versioned_msg_async};
18#[cfg(feature = "signing")]
19use crate::{
20 read_raw_versioned_msg_async_signed, read_versioned_msg_async_signed,
21 write_versioned_msg_async_signed, SigningConfig, SigningData,
22};
23
24pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
25 let addr = get_socket_addr(address)?;
26
27 let socket = TcpStream::connect(addr).await?;
28
29 let (reader, writer) = socket.into_split();
30
31 Ok(AsyncTcpConnection {
32 reader: Mutex::new(AsyncPeekReader::new(reader)),
33 writer: Mutex::new(TcpWrite {
34 socket: writer,
35 sequence: 0,
36 }),
37 protocol_version: MavlinkVersion::V2,
38 recv_any_version: false,
39 #[cfg(feature = "signing")]
40 signing_data: None,
41 })
42}
43
44pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
45 let addr = get_socket_addr(address)?;
46 let listener = TcpListener::bind(addr).await?;
47
48 match listener.accept().await {
50 Ok((socket, _)) => {
51 let (reader, writer) = socket.into_split();
52 return Ok(AsyncTcpConnection {
53 reader: Mutex::new(AsyncPeekReader::new(reader)),
54 writer: Mutex::new(TcpWrite {
55 socket: writer,
56 sequence: 0,
57 }),
58 protocol_version: MavlinkVersion::V2,
59 recv_any_version: false,
60 #[cfg(feature = "signing")]
61 signing_data: None,
62 });
63 }
64 Err(e) => {
65 println!("listener err: {e}");
67 }
68 }
69 Err(io::Error::new(
70 io::ErrorKind::NotConnected,
71 "No incoming connections!",
72 ))
73}
74
75pub struct AsyncTcpConnection {
76 reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
77 writer: Mutex<TcpWrite>,
78 protocol_version: MavlinkVersion,
79 recv_any_version: bool,
80 #[cfg(feature = "signing")]
81 signing_data: Option<SigningData>,
82}
83
84struct TcpWrite {
85 socket: OwnedWriteHalf,
86 sequence: u8,
87}
88
89#[async_trait::async_trait]
90impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
91 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
92 let mut reader = self.reader.lock().await;
93 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
94 #[cfg(not(feature = "signing"))]
95 let result = read_versioned_msg_async(reader.deref_mut(), version).await;
96 #[cfg(feature = "signing")]
97 let result = read_versioned_msg_async_signed(
98 reader.deref_mut(),
99 version,
100 self.signing_data.as_ref(),
101 )
102 .await;
103 result
104 }
105
106 async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
107 let mut reader = self.reader.lock().await;
108 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
109 #[cfg(not(feature = "signing"))]
110 let result = read_raw_versioned_msg_async::<M, _>(reader.deref_mut(), version).await;
111 #[cfg(feature = "signing")]
112 let result = read_raw_versioned_msg_async_signed::<M, _>(
113 reader.deref_mut(),
114 version,
115 self.signing_data.as_ref(),
116 )
117 .await;
118 result
119 }
120
121 async fn send(
122 &self,
123 header: &MavHeader,
124 data: &M,
125 ) -> Result<usize, crate::error::MessageWriteError> {
126 let mut lock = self.writer.lock().await;
127
128 let header = MavHeader {
129 sequence: lock.sequence,
130 system_id: header.system_id,
131 component_id: header.component_id,
132 };
133
134 lock.sequence = lock.sequence.wrapping_add(1);
135 #[cfg(not(feature = "signing"))]
136 let result =
137 write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
138 #[cfg(feature = "signing")]
139 let result = write_versioned_msg_async_signed(
140 &mut lock.socket,
141 self.protocol_version,
142 header,
143 data,
144 self.signing_data.as_ref(),
145 )
146 .await;
147 result
148 }
149
150 fn set_protocol_version(&mut self, version: MavlinkVersion) {
151 self.protocol_version = version;
152 }
153
154 fn protocol_version(&self) -> MavlinkVersion {
155 self.protocol_version
156 }
157
158 fn set_allow_recv_any_version(&mut self, allow: bool) {
159 self.recv_any_version = allow;
160 }
161
162 fn allow_recv_any_version(&self) -> bool {
163 self.recv_any_version
164 }
165
166 #[cfg(feature = "signing")]
167 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
168 self.signing_data = signing_data.map(SigningData::from_config);
169 }
170}
171
172#[async_trait]
173impl AsyncConnectable for TcpConfig {
174 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
175 where
176 M: Message + Sync + Send,
177 {
178 let conn = match self.mode {
179 TcpMode::TcpIn => tcpin(&self.address).await,
180 TcpMode::TcpOut => tcpout(&self.address).await,
181 };
182
183 Ok(Box::new(conn?))
184 }
185}