mavlink_core/async_connection/
tcp.rs1use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection};
4use crate::async_peek_reader::AsyncPeekReader;
5use crate::connectable::TcpConnectable;
6use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
7
8use async_trait::async_trait;
9use core::ops::DerefMut;
10use tokio::io;
11use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
12use tokio::net::{TcpListener, TcpStream};
13use tokio::sync::Mutex;
14
15#[cfg(not(feature = "signing"))]
16use crate::{read_versioned_msg_async, write_versioned_msg_async};
17#[cfg(feature = "signing")]
18use crate::{
19 read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData,
20};
21
22pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
23 let addr = get_socket_addr(address)?;
24
25 let socket = TcpStream::connect(addr).await?;
26
27 let (reader, writer) = socket.into_split();
28
29 Ok(AsyncTcpConnection {
30 reader: Mutex::new(AsyncPeekReader::new(reader)),
31 writer: Mutex::new(TcpWrite {
32 socket: writer,
33 sequence: 0,
34 }),
35 protocol_version: MavlinkVersion::V2,
36 recv_any_version: false,
37 #[cfg(feature = "signing")]
38 signing_data: None,
39 })
40}
41
42pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
43 let addr = get_socket_addr(address)?;
44 let listener = TcpListener::bind(addr).await?;
45
46 match listener.accept().await {
48 Ok((socket, _)) => {
49 let (reader, writer) = socket.into_split();
50 return Ok(AsyncTcpConnection {
51 reader: Mutex::new(AsyncPeekReader::new(reader)),
52 writer: Mutex::new(TcpWrite {
53 socket: writer,
54 sequence: 0,
55 }),
56 protocol_version: MavlinkVersion::V2,
57 recv_any_version: false,
58 #[cfg(feature = "signing")]
59 signing_data: None,
60 });
61 }
62 Err(e) => {
63 println!("listener err: {e}");
65 }
66 }
67 Err(io::Error::new(
68 io::ErrorKind::NotConnected,
69 "No incoming connections!",
70 ))
71}
72
73pub struct AsyncTcpConnection {
74 reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
75 writer: Mutex<TcpWrite>,
76 protocol_version: MavlinkVersion,
77 recv_any_version: bool,
78 #[cfg(feature = "signing")]
79 signing_data: Option<SigningData>,
80}
81
82struct TcpWrite {
83 socket: OwnedWriteHalf,
84 sequence: u8,
85}
86
87#[async_trait::async_trait]
88impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
89 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
90 let mut reader = self.reader.lock().await;
91 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
92 #[cfg(not(feature = "signing"))]
93 let result = read_versioned_msg_async(reader.deref_mut(), version).await;
94 #[cfg(feature = "signing")]
95 let result = read_versioned_msg_async_signed(
96 reader.deref_mut(),
97 version,
98 self.signing_data.as_ref(),
99 )
100 .await;
101 result
102 }
103
104 async fn send(
105 &self,
106 header: &MavHeader,
107 data: &M,
108 ) -> Result<usize, crate::error::MessageWriteError> {
109 let mut lock = self.writer.lock().await;
110
111 let header = MavHeader {
112 sequence: lock.sequence,
113 system_id: header.system_id,
114 component_id: header.component_id,
115 };
116
117 lock.sequence = lock.sequence.wrapping_add(1);
118 #[cfg(not(feature = "signing"))]
119 let result =
120 write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
121 #[cfg(feature = "signing")]
122 let result = write_versioned_msg_async_signed(
123 &mut lock.socket,
124 self.protocol_version,
125 header,
126 data,
127 self.signing_data.as_ref(),
128 )
129 .await;
130 result
131 }
132
133 fn set_protocol_version(&mut self, version: MavlinkVersion) {
134 self.protocol_version = version;
135 }
136
137 fn protocol_version(&self) -> MavlinkVersion {
138 self.protocol_version
139 }
140
141 fn set_allow_recv_any_version(&mut self, allow: bool) {
142 self.recv_any_version = allow
143 }
144
145 fn allow_recv_any_version(&self) -> bool {
146 self.recv_any_version
147 }
148
149 #[cfg(feature = "signing")]
150 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
151 self.signing_data = signing_data.map(SigningData::from_config)
152 }
153}
154
155#[async_trait]
156impl AsyncConnectable for TcpConnectable {
157 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
158 where
159 M: Message + Sync + Send,
160 {
161 let conn = if self.is_out {
162 tcpout(&self.address).await
163 } else {
164 tcpin(&self.address).await
165 };
166 Ok(Box::new(conn?))
167 }
168}