Skip to main content

mavlink_core/connection/udp/
sync.rs

1//! UDP MAVLink connection
2
3use crate::Connectable;
4use crate::MAVLinkMessageRaw;
5use crate::connection::get_socket_addr;
6use crate::connection::{Connection, MavConnection};
7use crate::connection_shared::{
8    ConnectionState, next_send_header, read_message, read_raw_message, write_message,
9    write_raw_message,
10};
11use crate::peek_reader::PeekReader;
12use crate::{MavHeader, MavlinkVersion, Message};
13use core::ops::DerefMut;
14use std::collections::VecDeque;
15use std::io::{self, Read, Write};
16use std::net::{SocketAddr, UdpSocket};
17use std::sync::Mutex;
18
19#[cfg(feature = "mav2-message-signing")]
20use crate::SigningConfig;
21
22use super::config::{UdpConfig, UdpMode};
23
24struct UdpRead {
25    socket: UdpSocket,
26    buffer: VecDeque<u8>,
27    last_recv_address: Option<SocketAddr>,
28}
29
30const MTU_SIZE: usize = 1500;
31impl Read for UdpRead {
32    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
33        if !self.buffer.is_empty() {
34            self.buffer.read(buf)
35        } else {
36            let mut read_buffer = [0u8; MTU_SIZE];
37            let (n_buffer, address) = self.socket.recv_from(&mut read_buffer)?;
38            let n = (&read_buffer[0..n_buffer]).read(buf)?;
39            self.buffer.extend(&read_buffer[n..n_buffer]);
40
41            self.last_recv_address = Some(address);
42            Ok(n)
43        }
44    }
45}
46
47struct UdpWrite {
48    socket: UdpSocket,
49    dest: Option<SocketAddr>,
50    sequence: u8,
51}
52
53impl Write for UdpWrite {
54    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
55        let addr = self.dest.expect("`dest` is checked before write");
56        self.socket.send_to(buf, addr)
57    }
58
59    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
60        if self.write(buf)? != buf.len() {
61            return Err(io::Error::new(
62                io::ErrorKind::WriteZero,
63                "failed to send complete UDP datagram",
64            ));
65        }
66
67        Ok(())
68    }
69
70    fn flush(&mut self) -> io::Result<()> {
71        Ok(())
72    }
73}
74
75pub struct UdpConnection {
76    reader: Mutex<PeekReader<UdpRead>>,
77    writer: Mutex<UdpWrite>,
78    state: ConnectionState,
79    server: bool,
80}
81
82impl UdpConnection {
83    fn new(socket: UdpSocket, server: bool, dest: Option<SocketAddr>) -> io::Result<Self> {
84        Ok(Self {
85            server,
86            reader: Mutex::new(PeekReader::new(UdpRead {
87                socket: socket.try_clone()?,
88                buffer: VecDeque::new(),
89                last_recv_address: None,
90            })),
91            writer: Mutex::new(UdpWrite {
92                socket,
93                dest,
94                sequence: 0,
95            }),
96            state: ConnectionState::new(),
97        })
98    }
99
100    fn update_reply_destination(&self, reader: &PeekReader<UdpRead>) {
101        if self.server {
102            if let addr @ Some(_) = reader.reader_ref().last_recv_address {
103                self.writer.lock().unwrap().dest = addr;
104            }
105        }
106    }
107}
108
109impl<M: Message> MavConnection<M> for UdpConnection {
110    fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
111        let mut reader = self.reader.lock().unwrap();
112
113        let result = read_message::<M, _>(reader.deref_mut(), &self.state);
114        self.update_reply_destination(&reader);
115        result
116    }
117
118    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
119        let mut reader = self.reader.lock().unwrap();
120
121        let result = read_raw_message::<M, _>(reader.deref_mut(), &self.state);
122        self.update_reply_destination(&reader);
123        result
124    }
125
126    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
127        let mut reader = self.reader.lock().unwrap();
128        reader.reader_mut().socket.set_nonblocking(true)?;
129
130        let result = read_message::<M, _>(reader.deref_mut(), &self.state);
131        self.update_reply_destination(&reader);
132
133        reader.reader_mut().socket.set_nonblocking(false)?;
134
135        result
136    }
137
138    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
139        let mut guard = self.writer.lock().unwrap();
140        let writer = &mut *guard;
141
142        let header = next_send_header(&mut writer.sequence, header);
143
144        let len = if writer.dest.is_some() {
145            write_message(writer, &self.state, header, data)?
146        } else {
147            0
148        };
149
150        Ok(len)
151    }
152
153    fn send_raw(&self, data: &MAVLinkMessageRaw) -> Result<usize, crate::error::MessageWriteError> {
154        let mut guard = self.writer.lock().unwrap();
155        let writer = &mut *guard;
156
157        let len = if writer.dest.is_some() {
158            write_raw_message(writer, data)?
159        } else {
160            0
161        };
162
163        Ok(len)
164    }
165
166    fn set_protocol_version(&mut self, version: MavlinkVersion) {
167        self.state.set_protocol_version(version);
168    }
169
170    fn protocol_version(&self) -> MavlinkVersion {
171        self.state.protocol_version()
172    }
173
174    fn set_allow_recv_any_version(&mut self, allow: bool) {
175        self.state.set_allow_recv_any_version(allow);
176    }
177
178    fn allow_recv_any_version(&self) -> bool {
179        self.state.allow_recv_any_version()
180    }
181
182    #[cfg(feature = "mav2-message-signing")]
183    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
184        self.state.setup_signing(signing_data);
185    }
186}
187
188impl Connectable for UdpConfig {
189    fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
190        let (addr, server, dest): (&str, _, _) = match self.mode {
191            UdpMode::Udpin => (&self.address, true, None),
192            _ => ("0.0.0.0:0", false, Some(get_socket_addr(&self.address)?)),
193        };
194        let socket = UdpSocket::bind(addr)?;
195        if let Some(timeout) = self.read_timeout {
196            socket.set_read_timeout(Some(timeout))?;
197        }
198        if matches!(self.mode, UdpMode::UdpBroadcast) {
199            socket.set_broadcast(true)?;
200        }
201        Ok(UdpConnection::new(socket, server, dest)?.into())
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[test]
210    fn test_datagram_buffering() {
211        let receiver_socket = UdpSocket::bind("127.0.0.1:5000").unwrap();
212        let mut udp_reader = UdpRead {
213            socket: receiver_socket.try_clone().unwrap(),
214            buffer: VecDeque::new(),
215            last_recv_address: None,
216        };
217        let sender_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
218        sender_socket.connect("127.0.0.1:5000").unwrap();
219
220        let datagram: Vec<u8> = (0..50).collect::<Vec<_>>();
221
222        let mut n_sent = sender_socket.send(&datagram).unwrap();
223        assert_eq!(n_sent, datagram.len());
224        n_sent = sender_socket.send(&datagram).unwrap();
225        assert_eq!(n_sent, datagram.len());
226
227        let mut buf = [0u8; 30];
228
229        let mut n_read = udp_reader.read(&mut buf).unwrap();
230        assert_eq!(n_read, 30);
231        assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
232
233        n_read = udp_reader.read(&mut buf).unwrap();
234        assert_eq!(n_read, 20);
235        assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
236
237        n_read = udp_reader.read(&mut buf).unwrap();
238        assert_eq!(n_read, 30);
239        assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
240
241        n_read = udp_reader.read(&mut buf).unwrap();
242        assert_eq!(n_read, 20);
243        assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
244    }
245}