mavlink_core/async_connection/
tcp.rs1use std::io;
4
5use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection};
6use crate::async_peek_reader::AsyncPeekReader;
7use crate::connection::tcp::config::{TcpConfig, TcpMode};
8use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message, ReadVersion};
9
10use async_trait::async_trait;
11use core::ops::DerefMut;
12use futures::lock::Mutex;
13use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
14use tokio::net::{TcpListener, TcpStream};
15
16#[cfg(not(feature = "signing"))]
17use crate::{
18 read_versioned_msg_async, read_versioned_raw_message_async, write_versioned_msg_async,
19};
20#[cfg(feature = "signing")]
21use crate::{
22 read_versioned_msg_async_signed, read_versioned_raw_message_async_signed,
23 write_versioned_msg_async_signed, SigningConfig, SigningData,
24};
25
26pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
27 let addr = get_socket_addr(address)?;
28
29 let socket = TcpStream::connect(addr).await?;
30
31 let (reader, writer) = socket.into_split();
32
33 Ok(AsyncTcpConnection {
34 reader: Mutex::new(AsyncPeekReader::new(reader)),
35 writer: Mutex::new(TcpWrite {
36 socket: writer,
37 sequence: 0,
38 }),
39 protocol_version: MavlinkVersion::V2,
40 recv_any_version: false,
41 #[cfg(feature = "signing")]
42 signing_data: None,
43 })
44}
45
46pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
47 let addr = get_socket_addr(address)?;
48 let listener = TcpListener::bind(addr).await?;
49
50 match listener.accept().await {
52 Ok((socket, _)) => {
53 let (reader, writer) = socket.into_split();
54 return Ok(AsyncTcpConnection {
55 reader: Mutex::new(AsyncPeekReader::new(reader)),
56 writer: Mutex::new(TcpWrite {
57 socket: writer,
58 sequence: 0,
59 }),
60 protocol_version: MavlinkVersion::V2,
61 recv_any_version: false,
62 #[cfg(feature = "signing")]
63 signing_data: None,
64 });
65 }
66 Err(e) => {
67 println!("listener err: {e}");
69 }
70 }
71 Err(io::Error::new(
72 io::ErrorKind::NotConnected,
73 "No incoming connections!",
74 ))
75}
76
77pub struct AsyncTcpConnection {
78 reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
79 writer: Mutex<TcpWrite>,
80 protocol_version: MavlinkVersion,
81 recv_any_version: bool,
82 #[cfg(feature = "signing")]
83 signing_data: Option<SigningData>,
84}
85
86struct TcpWrite {
87 socket: OwnedWriteHalf,
88 sequence: u8,
89}
90
91#[async_trait::async_trait]
92impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
93 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
94 let mut reader = self.reader.lock().await;
95 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
96 #[cfg(not(feature = "signing"))]
97 let result = read_versioned_msg_async(reader.deref_mut(), version).await;
98 #[cfg(feature = "signing")]
99 let result = read_versioned_msg_async_signed(
100 reader.deref_mut(),
101 version,
102 self.signing_data.as_ref(),
103 )
104 .await;
105 result
106 }
107
108 async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
109 let mut reader = self.reader.lock().await;
110 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
111 #[cfg(not(feature = "signing"))]
112 let result = read_versioned_raw_message_async::<M, _>(reader.deref_mut(), version).await;
113 #[cfg(feature = "signing")]
114 let result = read_versioned_raw_message_async_signed::<M, _>(
115 reader.deref_mut(),
116 version,
117 self.signing_data.as_ref(),
118 )
119 .await;
120 result
121 }
122
123 async fn send(
124 &self,
125 header: &MavHeader,
126 data: &M,
127 ) -> Result<usize, crate::error::MessageWriteError> {
128 let mut lock = self.writer.lock().await;
129
130 let header = MavHeader {
131 sequence: lock.sequence,
132 system_id: header.system_id,
133 component_id: header.component_id,
134 };
135
136 lock.sequence = lock.sequence.wrapping_add(1);
137 #[cfg(not(feature = "signing"))]
138 let result =
139 write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
140 #[cfg(feature = "signing")]
141 let result = write_versioned_msg_async_signed(
142 &mut lock.socket,
143 self.protocol_version,
144 header,
145 data,
146 self.signing_data.as_ref(),
147 )
148 .await;
149 result
150 }
151
152 fn set_protocol_version(&mut self, version: MavlinkVersion) {
153 self.protocol_version = version;
154 }
155
156 fn protocol_version(&self) -> MavlinkVersion {
157 self.protocol_version
158 }
159
160 fn set_allow_recv_any_version(&mut self, allow: bool) {
161 self.recv_any_version = allow;
162 }
163
164 fn allow_recv_any_version(&self) -> bool {
165 self.recv_any_version
166 }
167
168 #[cfg(feature = "signing")]
169 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
170 self.signing_data = signing_data.map(SigningData::from_config);
171 }
172}
173
174#[async_trait]
175impl AsyncConnectable for TcpConfig {
176 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
177 where
178 M: Message + Sync + Send,
179 {
180 let conn = match self.mode {
181 TcpMode::TcpIn => tcpin(&self.address).await,
182 TcpMode::TcpOut => tcpout(&self.address).await,
183 };
184
185 Ok(Box::new(conn?))
186 }
187}