mavlink_core/async_connection/
tcp.rs1use std::io;
4
5use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection};
6use crate::async_peek_reader::AsyncPeekReader;
7use crate::connectable::TcpConnectable;
8use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
9
10use async_trait::async_trait;
11use core::ops::DerefMut;
12use futures::lock::Mutex;
13use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
14use tokio::net::{TcpListener, TcpStream};
15
16#[cfg(not(feature = "signing"))]
17use crate::{read_versioned_msg_async, write_versioned_msg_async};
18#[cfg(feature = "signing")]
19use crate::{
20 read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData,
21};
22
23pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
24 let addr = get_socket_addr(address)?;
25
26 let socket = TcpStream::connect(addr).await?;
27
28 let (reader, writer) = socket.into_split();
29
30 Ok(AsyncTcpConnection {
31 reader: Mutex::new(AsyncPeekReader::new(reader)),
32 writer: Mutex::new(TcpWrite {
33 socket: writer,
34 sequence: 0,
35 }),
36 protocol_version: MavlinkVersion::V2,
37 recv_any_version: false,
38 #[cfg(feature = "signing")]
39 signing_data: None,
40 })
41}
42
43pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
44 let addr = get_socket_addr(address)?;
45 let listener = TcpListener::bind(addr).await?;
46
47 match listener.accept().await {
49 Ok((socket, _)) => {
50 let (reader, writer) = socket.into_split();
51 return Ok(AsyncTcpConnection {
52 reader: Mutex::new(AsyncPeekReader::new(reader)),
53 writer: Mutex::new(TcpWrite {
54 socket: writer,
55 sequence: 0,
56 }),
57 protocol_version: MavlinkVersion::V2,
58 recv_any_version: false,
59 #[cfg(feature = "signing")]
60 signing_data: None,
61 });
62 }
63 Err(e) => {
64 println!("listener err: {e}");
66 }
67 }
68 Err(io::Error::new(
69 io::ErrorKind::NotConnected,
70 "No incoming connections!",
71 ))
72}
73
74pub struct AsyncTcpConnection {
75 reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
76 writer: Mutex<TcpWrite>,
77 protocol_version: MavlinkVersion,
78 recv_any_version: bool,
79 #[cfg(feature = "signing")]
80 signing_data: Option<SigningData>,
81}
82
83struct TcpWrite {
84 socket: OwnedWriteHalf,
85 sequence: u8,
86}
87
88#[async_trait::async_trait]
89impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
90 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
91 let mut reader = self.reader.lock().await;
92 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
93 #[cfg(not(feature = "signing"))]
94 let result = read_versioned_msg_async(reader.deref_mut(), version).await;
95 #[cfg(feature = "signing")]
96 let result = read_versioned_msg_async_signed(
97 reader.deref_mut(),
98 version,
99 self.signing_data.as_ref(),
100 )
101 .await;
102 result
103 }
104
105 async fn send(
106 &self,
107 header: &MavHeader,
108 data: &M,
109 ) -> Result<usize, crate::error::MessageWriteError> {
110 let mut lock = self.writer.lock().await;
111
112 let header = MavHeader {
113 sequence: lock.sequence,
114 system_id: header.system_id,
115 component_id: header.component_id,
116 };
117
118 lock.sequence = lock.sequence.wrapping_add(1);
119 #[cfg(not(feature = "signing"))]
120 let result =
121 write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
122 #[cfg(feature = "signing")]
123 let result = write_versioned_msg_async_signed(
124 &mut lock.socket,
125 self.protocol_version,
126 header,
127 data,
128 self.signing_data.as_ref(),
129 )
130 .await;
131 result
132 }
133
134 fn set_protocol_version(&mut self, version: MavlinkVersion) {
135 self.protocol_version = version;
136 }
137
138 fn protocol_version(&self) -> MavlinkVersion {
139 self.protocol_version
140 }
141
142 fn set_allow_recv_any_version(&mut self, allow: bool) {
143 self.recv_any_version = allow
144 }
145
146 fn allow_recv_any_version(&self) -> bool {
147 self.recv_any_version
148 }
149
150 #[cfg(feature = "signing")]
151 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
152 self.signing_data = signing_data.map(SigningData::from_config)
153 }
154}
155
156#[async_trait]
157impl AsyncConnectable for TcpConnectable {
158 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
159 where
160 M: Message + Sync + Send,
161 {
162 let conn = if self.is_out {
163 tcpout(&self.address).await
164 } else {
165 tcpin(&self.address).await
166 };
167 Ok(Box::new(conn?))
168 }
169}