mavlink_core/async_connection/
tcp.rs1use std::io;
4
5use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection};
6use crate::async_peek_reader::AsyncPeekReader;
7use crate::connection::tcp::config::{TcpConfig, TcpMode};
8use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message, ReadVersion};
9
10use async_trait::async_trait;
11use core::ops::DerefMut;
12use futures::{lock::Mutex, FutureExt};
13use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
14use tokio::net::{TcpListener, TcpStream};
15
16#[cfg(not(feature = "mav2-message-signing"))]
17use crate::{
18 read_versioned_msg_async, read_versioned_raw_message_async, write_versioned_msg_async,
19};
20#[cfg(feature = "mav2-message-signing")]
21use crate::{
22 read_versioned_msg_async_signed, read_versioned_raw_message_async_signed,
23 write_versioned_msg_async_signed, SigningConfig, SigningData,
24};
25
26pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
27 let addr = get_socket_addr(address)?;
28
29 let socket = TcpStream::connect(addr).await?;
30
31 let (reader, writer) = socket.into_split();
32
33 Ok(AsyncTcpConnection {
34 reader: Mutex::new(AsyncPeekReader::new(reader)),
35 writer: Mutex::new(TcpWrite {
36 socket: writer,
37 sequence: 0,
38 }),
39 protocol_version: MavlinkVersion::V2,
40 recv_any_version: false,
41 #[cfg(feature = "mav2-message-signing")]
42 signing_data: None,
43 })
44}
45
46pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
47 let addr = get_socket_addr(address)?;
48 let listener = TcpListener::bind(addr).await?;
49
50 match listener.accept().await {
52 Ok((socket, _)) => {
53 let (reader, writer) = socket.into_split();
54 return Ok(AsyncTcpConnection {
55 reader: Mutex::new(AsyncPeekReader::new(reader)),
56 writer: Mutex::new(TcpWrite {
57 socket: writer,
58 sequence: 0,
59 }),
60 protocol_version: MavlinkVersion::V2,
61 recv_any_version: false,
62 #[cfg(feature = "mav2-message-signing")]
63 signing_data: None,
64 });
65 }
66 Err(e) => {
67 println!("listener err: {e}");
69 }
70 }
71 Err(io::Error::new(
72 io::ErrorKind::NotConnected,
73 "No incoming connections!",
74 ))
75}
76
77pub struct AsyncTcpConnection {
78 reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
79 writer: Mutex<TcpWrite>,
80 protocol_version: MavlinkVersion,
81 recv_any_version: bool,
82 #[cfg(feature = "mav2-message-signing")]
83 signing_data: Option<SigningData>,
84}
85
86struct TcpWrite {
87 socket: OwnedWriteHalf,
88 sequence: u8,
89}
90
91#[async_trait::async_trait]
92impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
93 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
94 let mut reader = self.reader.lock().await;
95 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
96 #[cfg(not(feature = "mav2-message-signing"))]
97 let result = read_versioned_msg_async(reader.deref_mut(), version).await;
98 #[cfg(feature = "mav2-message-signing")]
99 let result = read_versioned_msg_async_signed(
100 reader.deref_mut(),
101 version,
102 self.signing_data.as_ref(),
103 )
104 .await;
105 result
106 }
107
108 async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
109 let mut reader = self.reader.lock().await;
110 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
111 #[cfg(not(feature = "mav2-message-signing"))]
112 let result = read_versioned_raw_message_async::<M, _>(reader.deref_mut(), version).await;
113 #[cfg(feature = "mav2-message-signing")]
114 let result = read_versioned_raw_message_async_signed::<M, _>(
115 reader.deref_mut(),
116 version,
117 self.signing_data.as_ref(),
118 )
119 .await;
120 result
121 }
122
123 async fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
124 match self.recv().now_or_never() {
125 Some(result) => result,
126 None => Err(crate::error::MessageReadError::Io(
127 io::ErrorKind::WouldBlock.into(),
128 )),
129 }
130 }
131
132 async fn send(
133 &self,
134 header: &MavHeader,
135 data: &M,
136 ) -> Result<usize, crate::error::MessageWriteError> {
137 let mut lock = self.writer.lock().await;
138
139 let header = MavHeader {
140 sequence: lock.sequence,
141 system_id: header.system_id,
142 component_id: header.component_id,
143 };
144
145 lock.sequence = lock.sequence.wrapping_add(1);
146 #[cfg(not(feature = "mav2-message-signing"))]
147 let result =
148 write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
149 #[cfg(feature = "mav2-message-signing")]
150 let result = write_versioned_msg_async_signed(
151 &mut lock.socket,
152 self.protocol_version,
153 header,
154 data,
155 self.signing_data.as_ref(),
156 )
157 .await;
158 result
159 }
160
161 fn set_protocol_version(&mut self, version: MavlinkVersion) {
162 self.protocol_version = version;
163 }
164
165 fn protocol_version(&self) -> MavlinkVersion {
166 self.protocol_version
167 }
168
169 fn set_allow_recv_any_version(&mut self, allow: bool) {
170 self.recv_any_version = allow;
171 }
172
173 fn allow_recv_any_version(&self) -> bool {
174 self.recv_any_version
175 }
176
177 #[cfg(feature = "mav2-message-signing")]
178 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
179 self.signing_data = signing_data.map(SigningData::from_config);
180 }
181}
182
183#[async_trait]
184impl AsyncConnectable for TcpConfig {
185 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
186 where
187 M: Message + Sync + Send,
188 {
189 let conn = match self.mode {
190 TcpMode::TcpIn => tcpin(&self.address).await,
191 TcpMode::TcpOut => tcpout(&self.address).await,
192 };
193
194 Ok(Box::new(conn?))
195 }
196}