mavlink_core/async_connection/
file.rs1use core::ops::DerefMut;
4
5use super::{AsyncConnectable, AsyncMavConnection};
6use crate::connectable::FileConnectable;
7use crate::error::{MessageReadError, MessageWriteError};
8
9use crate::ReadVersion;
10use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message};
11
12use async_trait::async_trait;
13use tokio::fs::File;
14use tokio::io;
15use tokio::sync::Mutex;
16
17#[cfg(not(feature = "signing"))]
18use crate::read_versioned_msg_async;
19
20#[cfg(feature = "signing")]
21use crate::{read_versioned_msg_async_signed, SigningConfig, SigningData};
22
23pub async fn open(file_path: &str) -> io::Result<AsyncFileConnection> {
24 let file = File::open(file_path).await?;
25 Ok(AsyncFileConnection {
26 file: Mutex::new(AsyncPeekReader::new(file)),
27 protocol_version: MavlinkVersion::V2,
28 recv_any_version: false,
29 #[cfg(feature = "signing")]
30 signing_data: None,
31 })
32}
33
34pub struct AsyncFileConnection {
35 file: Mutex<AsyncPeekReader<File>>,
36 protocol_version: MavlinkVersion,
37 recv_any_version: bool,
38 #[cfg(feature = "signing")]
39 signing_data: Option<SigningData>,
40}
41
42#[async_trait::async_trait]
43impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncFileConnection {
44 async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
45 let mut file = self.file.lock().await;
46 let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
47 loop {
48 #[cfg(not(feature = "signing"))]
49 let result = read_versioned_msg_async(file.deref_mut(), version).await;
50 #[cfg(feature = "signing")]
51 let result = read_versioned_msg_async_signed(
52 file.deref_mut(),
53 version,
54 self.signing_data.as_ref(),
55 )
56 .await;
57 match result {
58 ok @ Ok(..) => {
59 return ok;
60 }
61 Err(MessageReadError::Io(e)) => {
62 if e.kind() == io::ErrorKind::UnexpectedEof {
63 return Err(MessageReadError::Io(e));
64 }
65 }
66 _ => {}
67 }
68 }
69 }
70
71 async fn send(&self, _header: &MavHeader, _data: &M) -> Result<usize, MessageWriteError> {
72 Ok(0)
73 }
74
75 fn set_protocol_version(&mut self, version: MavlinkVersion) {
76 self.protocol_version = version;
77 }
78
79 fn protocol_version(&self) -> MavlinkVersion {
80 self.protocol_version
81 }
82
83 fn set_allow_recv_any_version(&mut self, allow: bool) {
84 self.recv_any_version = allow
85 }
86
87 fn allow_recv_any_version(&self) -> bool {
88 self.recv_any_version
89 }
90
91 #[cfg(feature = "signing")]
92 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
93 self.signing_data = signing_data.map(SigningData::from_config)
94 }
95}
96
97#[async_trait]
98impl AsyncConnectable for FileConnectable {
99 async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
100 where
101 M: Message + Sync + Send,
102 {
103 Ok(Box::new(open(&self.address).await?))
104 }
105}