mavlink_core/connection/tcp/
async.rs1use std::io;
4
5use crate::async_peek_reader::AsyncPeekReader;
6use crate::connection::tcp::config::{TcpConfig, TcpMode};
7use crate::connection::{AsyncConnectable, AsyncMavConnection, get_socket_addr};
8use crate::connection_shared::{
9 ConnectionState, next_send_header, read_message_async, read_raw_message_async,
10 write_message_async,
11};
12use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message};
13
14use async_trait::async_trait;
15use core::ops::DerefMut;
16use futures::{FutureExt, lock::Mutex};
17use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
18use tokio::net::{TcpListener, TcpStream};
19
20#[cfg(feature = "mav2-message-signing")]
21use crate::SigningConfig;
22
23pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
24 let addr = get_socket_addr(&address)?;
25
26 let socket = TcpStream::connect(addr).await?;
27
28 let (reader, writer) = socket.into_split();
29
30 Ok(AsyncTcpConnection {
31 reader: Mutex::new(AsyncPeekReader::new(reader)),
32 writer: Mutex::new(TcpWrite {
33 socket: writer,
34 sequence: 0,
35 }),
36 state: ConnectionState::new(),
37 })
38}
39
40pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
41 let addr = get_socket_addr(&address)?;
42 let listener = TcpListener::bind(addr).await?;
43
44 match listener.accept().await {
46 Ok((socket, _)) => {
47 let (reader, writer) = socket.into_split();
48 return Ok(AsyncTcpConnection {
49 reader: Mutex::new(AsyncPeekReader::new(reader)),
50 writer: Mutex::new(TcpWrite {
51 socket: writer,
52 sequence: 0,
53 }),
54 state: ConnectionState::new(),
55 });
56 }
57 Err(e) => {
58 println!("listener err: {e}");
60 }
61 }
62 Err(io::Error::new(
63 io::ErrorKind::NotConnected,
64 "No incoming connections!",
65 ))
66}
67
68pub struct AsyncTcpConnection {
69 reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
70 writer: Mutex<TcpWrite>,
71 state: ConnectionState,
72}
73
74struct TcpWrite {
75 socket: OwnedWriteHalf,
76 sequence: u8,
77}
78
79#[async_trait::async_trait]
80impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
81 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
82 let mut reader = self.reader.lock().await;
83 read_message_async::<M, _>(reader.deref_mut(), &self.state).await
84 }
85
86 async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
87 let mut reader = self.reader.lock().await;
88 read_raw_message_async::<M, _>(reader.deref_mut(), &self.state).await
89 }
90
91 async fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
92 match self.recv().now_or_never() {
93 Some(result) => result,
94 None => Err(crate::error::MessageReadError::Io(
95 io::ErrorKind::WouldBlock.into(),
96 )),
97 }
98 }
99
100 async fn send(
101 &self,
102 header: &MavHeader,
103 data: &M,
104 ) -> Result<usize, crate::error::MessageWriteError> {
105 let mut lock = self.writer.lock().await;
106
107 let header = next_send_header(&mut lock.sequence, header);
108 write_message_async(&mut lock.socket, &self.state, header, data).await
109 }
110
111 fn set_protocol_version(&mut self, version: MavlinkVersion) {
112 self.state.set_protocol_version(version);
113 }
114
115 fn protocol_version(&self) -> MavlinkVersion {
116 self.state.protocol_version()
117 }
118
119 fn set_allow_recv_any_version(&mut self, allow: bool) {
120 self.state.set_allow_recv_any_version(allow);
121 }
122
123 fn allow_recv_any_version(&self) -> bool {
124 self.state.allow_recv_any_version()
125 }
126
127 #[cfg(feature = "mav2-message-signing")]
128 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
129 self.state.setup_signing(signing_data);
130 }
131}
132
133#[async_trait]
134impl AsyncConnectable for TcpConfig {
135 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
136 where
137 M: Message + Sync + Send,
138 {
139 let conn = match self.mode {
140 TcpMode::TcpIn => tcpin(&self.address).await,
141 TcpMode::TcpOut => tcpout(&self.address).await,
142 };
143
144 Ok(Box::new(conn?))
145 }
146}