mavlink_core/async_connection/
tcp.rs1use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection};
4use crate::async_peek_reader::AsyncPeekReader;
5use crate::connectable::TcpConnectable;
6use crate::{MavHeader, MavlinkVersion, Message};
7
8use async_trait::async_trait;
9use core::ops::DerefMut;
10use tokio::io;
11use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
12use tokio::net::{TcpListener, TcpStream};
13use tokio::sync::Mutex;
14
15#[cfg(not(feature = "signing"))]
16use crate::{read_versioned_msg_async, write_versioned_msg_async};
17#[cfg(feature = "signing")]
18use crate::{
19 read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData,
20};
21
22pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
23 let addr = get_socket_addr(address)?;
24
25 let socket = TcpStream::connect(addr).await?;
26
27 let (reader, writer) = socket.into_split();
28
29 Ok(AsyncTcpConnection {
30 reader: Mutex::new(AsyncPeekReader::new(reader)),
31 writer: Mutex::new(TcpWrite {
32 socket: writer,
33 sequence: 0,
34 }),
35 protocol_version: MavlinkVersion::V2,
36 #[cfg(feature = "signing")]
37 signing_data: None,
38 })
39}
40
41pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
42 let addr = get_socket_addr(address)?;
43 let listener = TcpListener::bind(addr).await?;
44
45 match listener.accept().await {
47 Ok((socket, _)) => {
48 let (reader, writer) = socket.into_split();
49 return Ok(AsyncTcpConnection {
50 reader: Mutex::new(AsyncPeekReader::new(reader)),
51 writer: Mutex::new(TcpWrite {
52 socket: writer,
53 sequence: 0,
54 }),
55 protocol_version: MavlinkVersion::V2,
56 #[cfg(feature = "signing")]
57 signing_data: None,
58 });
59 }
60 Err(e) => {
61 println!("listener err: {e}");
63 }
64 }
65 Err(io::Error::new(
66 io::ErrorKind::NotConnected,
67 "No incoming connections!",
68 ))
69}
70
71pub struct AsyncTcpConnection {
72 reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
73 writer: Mutex<TcpWrite>,
74 protocol_version: MavlinkVersion,
75 #[cfg(feature = "signing")]
76 signing_data: Option<SigningData>,
77}
78
79struct TcpWrite {
80 socket: OwnedWriteHalf,
81 sequence: u8,
82}
83
84#[async_trait::async_trait]
85impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
86 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
87 let mut reader = self.reader.lock().await;
88 #[cfg(not(feature = "signing"))]
89 let result = read_versioned_msg_async(reader.deref_mut(), self.protocol_version).await;
90 #[cfg(feature = "signing")]
91 let result = read_versioned_msg_async_signed(
92 reader.deref_mut(),
93 self.protocol_version,
94 self.signing_data.as_ref(),
95 )
96 .await;
97 result
98 }
99
100 async fn send(
101 &self,
102 header: &MavHeader,
103 data: &M,
104 ) -> Result<usize, crate::error::MessageWriteError> {
105 let mut lock = self.writer.lock().await;
106
107 let header = MavHeader {
108 sequence: lock.sequence,
109 system_id: header.system_id,
110 component_id: header.component_id,
111 };
112
113 lock.sequence = lock.sequence.wrapping_add(1);
114 #[cfg(not(feature = "signing"))]
115 let result =
116 write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
117 #[cfg(feature = "signing")]
118 let result = write_versioned_msg_async_signed(
119 &mut lock.socket,
120 self.protocol_version,
121 header,
122 data,
123 self.signing_data.as_ref(),
124 )
125 .await;
126 result
127 }
128
129 fn set_protocol_version(&mut self, version: MavlinkVersion) {
130 self.protocol_version = version;
131 }
132
133 fn get_protocol_version(&self) -> MavlinkVersion {
134 self.protocol_version
135 }
136
137 #[cfg(feature = "signing")]
138 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
139 self.signing_data = signing_data.map(SigningData::from_config)
140 }
141}
142
143#[async_trait]
144impl AsyncConnectable for TcpConnectable {
145 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
146 where
147 M: Message + Sync + Send,
148 {
149 let conn = if self.is_out {
150 tcpout(&self.address).await
151 } else {
152 tcpin(&self.address).await
153 };
154 Ok(Box::new(conn?))
155 }
156}