Skip to main content

mavlink_core/connection/
udp.rs

1//! UDP MAVLink connection
2
3use crate::connection::get_socket_addr;
4use crate::connection::{Connection, MavConnection};
5use crate::peek_reader::PeekReader;
6#[cfg(not(feature = "mav2-message-signing"))]
7use crate::read_versioned_raw_message;
8#[cfg(feature = "mav2-message-signing")]
9use crate::read_versioned_raw_message_signed;
10use crate::Connectable;
11use crate::MAVLinkMessageRaw;
12use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
13use core::ops::DerefMut;
14use std::collections::VecDeque;
15use std::io::{self, Read};
16use std::net::{SocketAddr, UdpSocket};
17use std::sync::Mutex;
18
19#[cfg(not(feature = "mav2-message-signing"))]
20use crate::{read_versioned_msg, write_versioned_msg};
21
22#[cfg(feature = "mav2-message-signing")]
23use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
24
25pub mod config;
26
27use config::{UdpConfig, UdpMode};
28
29struct UdpRead {
30    socket: UdpSocket,
31    buffer: VecDeque<u8>,
32    last_recv_address: Option<SocketAddr>,
33}
34
35const MTU_SIZE: usize = 1500;
36impl Read for UdpRead {
37    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
38        if !self.buffer.is_empty() {
39            self.buffer.read(buf)
40        } else {
41            let mut read_buffer = [0u8; MTU_SIZE];
42            let (n_buffer, address) = self.socket.recv_from(&mut read_buffer)?;
43            let n = (&read_buffer[0..n_buffer]).read(buf)?;
44            self.buffer.extend(&read_buffer[n..n_buffer]);
45
46            self.last_recv_address = Some(address);
47            Ok(n)
48        }
49    }
50}
51
52struct UdpWrite {
53    socket: UdpSocket,
54    dest: Option<SocketAddr>,
55    sequence: u8,
56}
57
58pub struct UdpConnection {
59    reader: Mutex<PeekReader<UdpRead>>,
60    writer: Mutex<UdpWrite>,
61    protocol_version: MavlinkVersion,
62    recv_any_version: bool,
63    server: bool,
64    #[cfg(feature = "mav2-message-signing")]
65    signing_data: Option<SigningData>,
66}
67
68impl UdpConnection {
69    fn new(socket: UdpSocket, server: bool, dest: Option<SocketAddr>) -> io::Result<Self> {
70        Ok(Self {
71            server,
72            reader: Mutex::new(PeekReader::new(UdpRead {
73                socket: socket.try_clone()?,
74                buffer: VecDeque::new(),
75                last_recv_address: None,
76            })),
77            writer: Mutex::new(UdpWrite {
78                socket,
79                dest,
80                sequence: 0,
81            }),
82            protocol_version: MavlinkVersion::V2,
83            recv_any_version: false,
84            #[cfg(feature = "mav2-message-signing")]
85            signing_data: None,
86        })
87    }
88}
89
90impl<M: Message> MavConnection<M> for UdpConnection {
91    fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
92        let mut reader = self.reader.lock().unwrap();
93        let version = ReadVersion::from_conn_cfg::<_, M>(self);
94
95        #[cfg(not(feature = "mav2-message-signing"))]
96        let result = read_versioned_msg(reader.deref_mut(), version);
97        #[cfg(feature = "mav2-message-signing")]
98        let result =
99            read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
100        if self.server {
101            if let addr @ Some(_) = reader.reader_ref().last_recv_address {
102                self.writer.lock().unwrap().dest = addr;
103            }
104        }
105        result
106    }
107
108    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
109        let mut reader = self.reader.lock().unwrap();
110        let version = ReadVersion::from_conn_cfg::<_, M>(self);
111
112        #[cfg(not(feature = "mav2-message-signing"))]
113        let result = read_versioned_raw_message::<M, _>(reader.deref_mut(), version);
114        #[cfg(feature = "mav2-message-signing")]
115        let result = read_versioned_raw_message_signed::<M, _>(
116            reader.deref_mut(),
117            version,
118            self.signing_data.as_ref(),
119        );
120        if self.server {
121            if let addr @ Some(_) = reader.reader_ref().last_recv_address {
122                self.writer.lock().unwrap().dest = addr;
123            }
124        }
125        result
126    }
127
128    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
129        let mut reader = self.reader.lock().unwrap();
130        reader.reader_mut().socket.set_nonblocking(true)?;
131
132        let version = ReadVersion::from_conn_cfg::<_, M>(self);
133
134        #[cfg(not(feature = "mav2-message-signing"))]
135        let result = read_versioned_msg(reader.deref_mut(), version);
136        #[cfg(feature = "mav2-message-signing")]
137        let result =
138            read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
139
140        if self.server {
141            if let addr @ Some(_) = reader.reader_ref().last_recv_address {
142                self.writer.lock().unwrap().dest = addr;
143            }
144        }
145
146        reader.reader_mut().socket.set_nonblocking(false)?;
147
148        result
149    }
150
151    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
152        let mut guard = self.writer.lock().unwrap();
153        let state = &mut *guard;
154
155        let header = MavHeader {
156            sequence: state.sequence,
157            system_id: header.system_id,
158            component_id: header.component_id,
159        };
160
161        state.sequence = state.sequence.wrapping_add(1);
162
163        let len = if let Some(addr) = state.dest {
164            let mut buf = Vec::new();
165            #[cfg(not(feature = "mav2-message-signing"))]
166            write_versioned_msg(&mut buf, self.protocol_version, header, data)?;
167            #[cfg(feature = "mav2-message-signing")]
168            write_versioned_msg_signed(
169                &mut buf,
170                self.protocol_version,
171                header,
172                data,
173                self.signing_data.as_ref(),
174            )?;
175            state.socket.send_to(&buf, addr)?
176        } else {
177            0
178        };
179
180        Ok(len)
181    }
182
183    fn set_protocol_version(&mut self, version: MavlinkVersion) {
184        self.protocol_version = version;
185    }
186
187    fn protocol_version(&self) -> MavlinkVersion {
188        self.protocol_version
189    }
190
191    fn set_allow_recv_any_version(&mut self, allow: bool) {
192        self.recv_any_version = allow;
193    }
194
195    fn allow_recv_any_version(&self) -> bool {
196        self.recv_any_version
197    }
198
199    #[cfg(feature = "mav2-message-signing")]
200    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
201        self.signing_data = signing_data.map(SigningData::from_config);
202    }
203}
204
205impl Connectable for UdpConfig {
206    fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
207        let (addr, server, dest): (&str, _, _) = match self.mode {
208            UdpMode::Udpin => (&self.address, true, None),
209            _ => ("0.0.0.0:0", false, Some(get_socket_addr(&self.address)?)),
210        };
211        let socket = UdpSocket::bind(addr)?;
212        if let Some(timeout) = self.read_timeout {
213            socket.set_read_timeout(Some(timeout))?;
214        }
215        if matches!(self.mode, UdpMode::UdpBroadcast) {
216            socket.set_broadcast(true)?;
217        }
218        Ok(UdpConnection::new(socket, server, dest)?.into())
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[test]
227    fn test_datagram_buffering() {
228        let receiver_socket = UdpSocket::bind("127.0.0.1:5000").unwrap();
229        let mut udp_reader = UdpRead {
230            socket: receiver_socket.try_clone().unwrap(),
231            buffer: VecDeque::new(),
232            last_recv_address: None,
233        };
234        let sender_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
235        sender_socket.connect("127.0.0.1:5000").unwrap();
236
237        let datagram: Vec<u8> = (0..50).collect::<Vec<_>>();
238
239        let mut n_sent = sender_socket.send(&datagram).unwrap();
240        assert_eq!(n_sent, datagram.len());
241        n_sent = sender_socket.send(&datagram).unwrap();
242        assert_eq!(n_sent, datagram.len());
243
244        let mut buf = [0u8; 30];
245
246        let mut n_read = udp_reader.read(&mut buf).unwrap();
247        assert_eq!(n_read, 30);
248        assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
249
250        n_read = udp_reader.read(&mut buf).unwrap();
251        assert_eq!(n_read, 20);
252        assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
253
254        n_read = udp_reader.read(&mut buf).unwrap();
255        assert_eq!(n_read, 30);
256        assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
257
258        n_read = udp_reader.read(&mut buf).unwrap();
259        assert_eq!(n_read, 20);
260        assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
261    }
262}