Skip to main content

mavlink_core/connection/udp/
sync.rs

1//! UDP MAVLink connection
2
3use crate::Connectable;
4use crate::MAVLinkMessageRaw;
5use crate::connection::get_socket_addr;
6use crate::connection::{Connection, MavConnection};
7use crate::connection_shared::{
8    ConnectionState, next_send_header, read_message, read_raw_message, write_message,
9};
10use crate::peek_reader::PeekReader;
11use crate::{MavHeader, MavlinkVersion, Message};
12use core::ops::DerefMut;
13use std::collections::VecDeque;
14use std::io::{self, Read};
15use std::net::{SocketAddr, UdpSocket};
16use std::sync::Mutex;
17
18#[cfg(feature = "mav2-message-signing")]
19use crate::SigningConfig;
20
21use super::config::{UdpConfig, UdpMode};
22
23struct UdpRead {
24    socket: UdpSocket,
25    buffer: VecDeque<u8>,
26    last_recv_address: Option<SocketAddr>,
27}
28
29const MTU_SIZE: usize = 1500;
30impl Read for UdpRead {
31    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
32        if !self.buffer.is_empty() {
33            self.buffer.read(buf)
34        } else {
35            let mut read_buffer = [0u8; MTU_SIZE];
36            let (n_buffer, address) = self.socket.recv_from(&mut read_buffer)?;
37            let n = (&read_buffer[0..n_buffer]).read(buf)?;
38            self.buffer.extend(&read_buffer[n..n_buffer]);
39
40            self.last_recv_address = Some(address);
41            Ok(n)
42        }
43    }
44}
45
46struct UdpWrite {
47    socket: UdpSocket,
48    dest: Option<SocketAddr>,
49    sequence: u8,
50}
51
52pub struct UdpConnection {
53    reader: Mutex<PeekReader<UdpRead>>,
54    writer: Mutex<UdpWrite>,
55    state: ConnectionState,
56    server: bool,
57}
58
59impl UdpConnection {
60    fn new(socket: UdpSocket, server: bool, dest: Option<SocketAddr>) -> io::Result<Self> {
61        Ok(Self {
62            server,
63            reader: Mutex::new(PeekReader::new(UdpRead {
64                socket: socket.try_clone()?,
65                buffer: VecDeque::new(),
66                last_recv_address: None,
67            })),
68            writer: Mutex::new(UdpWrite {
69                socket,
70                dest,
71                sequence: 0,
72            }),
73            state: ConnectionState::new(),
74        })
75    }
76
77    fn update_reply_destination(&self, reader: &PeekReader<UdpRead>) {
78        if self.server {
79            if let addr @ Some(_) = reader.reader_ref().last_recv_address {
80                self.writer.lock().unwrap().dest = addr;
81            }
82        }
83    }
84}
85
86impl<M: Message> MavConnection<M> for UdpConnection {
87    fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
88        let mut reader = self.reader.lock().unwrap();
89
90        let result = read_message::<M, _>(reader.deref_mut(), &self.state);
91        self.update_reply_destination(&reader);
92        result
93    }
94
95    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
96        let mut reader = self.reader.lock().unwrap();
97
98        let result = read_raw_message::<M, _>(reader.deref_mut(), &self.state);
99        self.update_reply_destination(&reader);
100        result
101    }
102
103    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
104        let mut reader = self.reader.lock().unwrap();
105        reader.reader_mut().socket.set_nonblocking(true)?;
106
107        let result = read_message::<M, _>(reader.deref_mut(), &self.state);
108        self.update_reply_destination(&reader);
109
110        reader.reader_mut().socket.set_nonblocking(false)?;
111
112        result
113    }
114
115    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
116        let mut guard = self.writer.lock().unwrap();
117        let state = &mut *guard;
118
119        let header = next_send_header(&mut state.sequence, header);
120
121        let len = if let Some(addr) = state.dest {
122            let mut buf = Vec::new();
123            write_message(&mut buf, &self.state, header, data)?;
124            state.socket.send_to(&buf, addr)?
125        } else {
126            0
127        };
128
129        Ok(len)
130    }
131
132    fn set_protocol_version(&mut self, version: MavlinkVersion) {
133        self.state.set_protocol_version(version);
134    }
135
136    fn protocol_version(&self) -> MavlinkVersion {
137        self.state.protocol_version()
138    }
139
140    fn set_allow_recv_any_version(&mut self, allow: bool) {
141        self.state.set_allow_recv_any_version(allow);
142    }
143
144    fn allow_recv_any_version(&self) -> bool {
145        self.state.allow_recv_any_version()
146    }
147
148    #[cfg(feature = "mav2-message-signing")]
149    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
150        self.state.setup_signing(signing_data);
151    }
152}
153
154impl Connectable for UdpConfig {
155    fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
156        let (addr, server, dest): (&str, _, _) = match self.mode {
157            UdpMode::Udpin => (&self.address, true, None),
158            _ => ("0.0.0.0:0", false, Some(get_socket_addr(&self.address)?)),
159        };
160        let socket = UdpSocket::bind(addr)?;
161        if let Some(timeout) = self.read_timeout {
162            socket.set_read_timeout(Some(timeout))?;
163        }
164        if matches!(self.mode, UdpMode::UdpBroadcast) {
165            socket.set_broadcast(true)?;
166        }
167        Ok(UdpConnection::new(socket, server, dest)?.into())
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[test]
176    fn test_datagram_buffering() {
177        let receiver_socket = UdpSocket::bind("127.0.0.1:5000").unwrap();
178        let mut udp_reader = UdpRead {
179            socket: receiver_socket.try_clone().unwrap(),
180            buffer: VecDeque::new(),
181            last_recv_address: None,
182        };
183        let sender_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
184        sender_socket.connect("127.0.0.1:5000").unwrap();
185
186        let datagram: Vec<u8> = (0..50).collect::<Vec<_>>();
187
188        let mut n_sent = sender_socket.send(&datagram).unwrap();
189        assert_eq!(n_sent, datagram.len());
190        n_sent = sender_socket.send(&datagram).unwrap();
191        assert_eq!(n_sent, datagram.len());
192
193        let mut buf = [0u8; 30];
194
195        let mut n_read = udp_reader.read(&mut buf).unwrap();
196        assert_eq!(n_read, 30);
197        assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
198
199        n_read = udp_reader.read(&mut buf).unwrap();
200        assert_eq!(n_read, 20);
201        assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
202
203        n_read = udp_reader.read(&mut buf).unwrap();
204        assert_eq!(n_read, 30);
205        assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
206
207        n_read = udp_reader.read(&mut buf).unwrap();
208        assert_eq!(n_read, 20);
209        assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
210    }
211}