mavlink_core/async_connection/
file.rs

1//! Async File MAVLINK connection
2use core::ops::DerefMut;
3use std::io;
4use std::path::PathBuf;
5
6use super::{AsyncConnectable, AsyncMavConnection};
7use crate::connection::file::config::FileConfig;
8use crate::error::{MessageReadError, MessageWriteError};
9use crate::{
10    async_peek_reader::AsyncPeekReader, MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message,
11    ReadVersion,
12};
13
14use async_trait::async_trait;
15use futures::lock::Mutex;
16use tokio::fs::File;
17
18#[cfg(not(feature = "signing"))]
19use crate::{read_raw_versioned_msg_async, read_versioned_msg_async};
20
21#[cfg(feature = "signing")]
22use crate::{
23    read_raw_versioned_msg_async_signed, read_versioned_msg_async_signed, SigningConfig,
24    SigningData,
25};
26
27pub async fn open(file_path: &PathBuf) -> io::Result<AsyncFileConnection> {
28    let file = File::open(file_path).await?;
29    Ok(AsyncFileConnection {
30        file: Mutex::new(AsyncPeekReader::new(file)),
31        protocol_version: MavlinkVersion::V2,
32        recv_any_version: false,
33        #[cfg(feature = "signing")]
34        signing_data: None,
35    })
36}
37
38pub struct AsyncFileConnection {
39    file: Mutex<AsyncPeekReader<File>>,
40    protocol_version: MavlinkVersion,
41    recv_any_version: bool,
42    #[cfg(feature = "signing")]
43    signing_data: Option<SigningData>,
44}
45
46#[async_trait::async_trait]
47impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncFileConnection {
48    async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
49        let mut file = self.file.lock().await;
50        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
51        loop {
52            #[cfg(not(feature = "signing"))]
53            let result = read_raw_versioned_msg_async::<M, _>(file.deref_mut(), version).await;
54            #[cfg(feature = "signing")]
55            let result = read_raw_versioned_msg_async_signed::<M, _>(
56                file.deref_mut(),
57                version,
58                self.signing_data.as_ref(),
59            )
60            .await;
61            match result {
62                ok @ Ok(..) => {
63                    return ok;
64                }
65                Err(MessageReadError::Io(e)) => {
66                    if e.kind() == io::ErrorKind::UnexpectedEof {
67                        return Err(MessageReadError::Io(e));
68                    }
69                }
70                _ => {}
71            }
72        }
73    }
74
75    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
76        let mut file = self.file.lock().await;
77        let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
78        loop {
79            #[cfg(not(feature = "signing"))]
80            let result = read_versioned_msg_async(file.deref_mut(), version).await;
81            #[cfg(feature = "signing")]
82            let result = read_versioned_msg_async_signed(
83                file.deref_mut(),
84                version,
85                self.signing_data.as_ref(),
86            )
87            .await;
88            match result {
89                ok @ Ok(..) => {
90                    return ok;
91                }
92                Err(MessageReadError::Io(e)) => {
93                    if e.kind() == io::ErrorKind::UnexpectedEof {
94                        return Err(MessageReadError::Io(e));
95                    }
96                }
97                _ => {}
98            }
99        }
100    }
101
102    async fn send(&self, _header: &MavHeader, _data: &M) -> Result<usize, MessageWriteError> {
103        Ok(0)
104    }
105
106    fn set_protocol_version(&mut self, version: MavlinkVersion) {
107        self.protocol_version = version;
108    }
109
110    fn protocol_version(&self) -> MavlinkVersion {
111        self.protocol_version
112    }
113
114    fn set_allow_recv_any_version(&mut self, allow: bool) {
115        self.recv_any_version = allow;
116    }
117
118    fn allow_recv_any_version(&self) -> bool {
119        self.recv_any_version
120    }
121
122    #[cfg(feature = "signing")]
123    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
124        self.signing_data = signing_data.map(SigningData::from_config);
125    }
126}
127
128#[async_trait]
129impl AsyncConnectable for FileConfig {
130    async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>>
131    where
132        M: Message + Sync + Send,
133    {
134        Ok(Box::new(open(&self.address).await?))
135    }
136}