mavlink_core/connection/udp/
sync.rs1use crate::Connectable;
4use crate::MAVLinkMessageRaw;
5use crate::connection::get_socket_addr;
6use crate::connection::{Connection, MavConnection};
7use crate::connection_shared::{
8 ConnectionState, next_send_header, read_message, read_raw_message, write_message,
9 write_raw_message,
10};
11use crate::peek_reader::PeekReader;
12use crate::{MavHeader, MavlinkVersion, Message};
13use core::ops::DerefMut;
14use std::collections::VecDeque;
15use std::io::{self, Read, Write};
16use std::net::{SocketAddr, UdpSocket};
17use std::sync::Mutex;
18
19#[cfg(feature = "mav2-message-signing")]
20use crate::SigningConfig;
21
22use super::config::{UdpConfig, UdpMode};
23
24struct UdpRead {
25 socket: UdpSocket,
26 buffer: VecDeque<u8>,
27 last_recv_address: Option<SocketAddr>,
28}
29
30const MTU_SIZE: usize = 1500;
31impl Read for UdpRead {
32 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
33 if !self.buffer.is_empty() {
34 self.buffer.read(buf)
35 } else {
36 let mut read_buffer = [0u8; MTU_SIZE];
37 let (n_buffer, address) = self.socket.recv_from(&mut read_buffer)?;
38 let n = (&read_buffer[0..n_buffer]).read(buf)?;
39 self.buffer.extend(&read_buffer[n..n_buffer]);
40
41 self.last_recv_address = Some(address);
42 Ok(n)
43 }
44 }
45}
46
47struct UdpWrite {
48 socket: UdpSocket,
49 dest: Option<SocketAddr>,
50 sequence: u8,
51}
52
53impl Write for UdpWrite {
54 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
55 let addr = self.dest.expect("`dest` is checked before write");
56 self.socket.send_to(buf, addr)
57 }
58
59 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
60 if self.write(buf)? != buf.len() {
61 return Err(io::Error::new(
62 io::ErrorKind::WriteZero,
63 "failed to send complete UDP datagram",
64 ));
65 }
66
67 Ok(())
68 }
69
70 fn flush(&mut self) -> io::Result<()> {
71 Ok(())
72 }
73}
74
75pub struct UdpConnection {
76 reader: Mutex<PeekReader<UdpRead>>,
77 writer: Mutex<UdpWrite>,
78 state: ConnectionState,
79 server: bool,
80}
81
82impl UdpConnection {
83 fn new(socket: UdpSocket, server: bool, dest: Option<SocketAddr>) -> io::Result<Self> {
84 Ok(Self {
85 server,
86 reader: Mutex::new(PeekReader::new(UdpRead {
87 socket: socket.try_clone()?,
88 buffer: VecDeque::new(),
89 last_recv_address: None,
90 })),
91 writer: Mutex::new(UdpWrite {
92 socket,
93 dest,
94 sequence: 0,
95 }),
96 state: ConnectionState::new(),
97 })
98 }
99
100 fn update_reply_destination(&self, reader: &PeekReader<UdpRead>) {
101 if self.server {
102 if let addr @ Some(_) = reader.reader_ref().last_recv_address {
103 self.writer.lock().unwrap().dest = addr;
104 }
105 }
106 }
107}
108
109impl<M: Message> MavConnection<M> for UdpConnection {
110 fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
111 let mut reader = self.reader.lock().unwrap();
112
113 let result = read_message::<M, _>(reader.deref_mut(), &self.state);
114 self.update_reply_destination(&reader);
115 result
116 }
117
118 fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
119 let mut reader = self.reader.lock().unwrap();
120
121 let result = read_raw_message::<M, _>(reader.deref_mut(), &self.state);
122 self.update_reply_destination(&reader);
123 result
124 }
125
126 fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
127 let mut reader = self.reader.lock().unwrap();
128 reader.reader_mut().socket.set_nonblocking(true)?;
129
130 let result = read_message::<M, _>(reader.deref_mut(), &self.state);
131 self.update_reply_destination(&reader);
132
133 reader.reader_mut().socket.set_nonblocking(false)?;
134
135 result
136 }
137
138 fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
139 let mut guard = self.writer.lock().unwrap();
140 let writer = &mut *guard;
141
142 let header = next_send_header(&mut writer.sequence, header);
143
144 let len = if writer.dest.is_some() {
145 write_message(writer, &self.state, header, data)?
146 } else {
147 0
148 };
149
150 Ok(len)
151 }
152
153 fn send_raw(&self, data: &MAVLinkMessageRaw) -> Result<usize, crate::error::MessageWriteError> {
154 let mut guard = self.writer.lock().unwrap();
155 let writer = &mut *guard;
156
157 let len = if writer.dest.is_some() {
158 write_raw_message(writer, data)?
159 } else {
160 0
161 };
162
163 Ok(len)
164 }
165
166 fn set_protocol_version(&mut self, version: MavlinkVersion) {
167 self.state.set_protocol_version(version);
168 }
169
170 fn protocol_version(&self) -> MavlinkVersion {
171 self.state.protocol_version()
172 }
173
174 fn set_allow_recv_any_version(&mut self, allow: bool) {
175 self.state.set_allow_recv_any_version(allow);
176 }
177
178 fn allow_recv_any_version(&self) -> bool {
179 self.state.allow_recv_any_version()
180 }
181
182 #[cfg(feature = "mav2-message-signing")]
183 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
184 self.state.setup_signing(signing_data);
185 }
186}
187
188impl Connectable for UdpConfig {
189 fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
190 let (addr, server, dest): (&str, _, _) = match self.mode {
191 UdpMode::Udpin => (&self.address, true, None),
192 _ => ("0.0.0.0:0", false, Some(get_socket_addr(&self.address)?)),
193 };
194 let socket = UdpSocket::bind(addr)?;
195 if let Some(timeout) = self.read_timeout {
196 socket.set_read_timeout(Some(timeout))?;
197 }
198 if matches!(self.mode, UdpMode::UdpBroadcast) {
199 socket.set_broadcast(true)?;
200 }
201 Ok(UdpConnection::new(socket, server, dest)?.into())
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 #[test]
210 fn test_datagram_buffering() {
211 let receiver_socket = UdpSocket::bind("127.0.0.1:5000").unwrap();
212 let mut udp_reader = UdpRead {
213 socket: receiver_socket.try_clone().unwrap(),
214 buffer: VecDeque::new(),
215 last_recv_address: None,
216 };
217 let sender_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
218 sender_socket.connect("127.0.0.1:5000").unwrap();
219
220 let datagram: Vec<u8> = (0..50).collect::<Vec<_>>();
221
222 let mut n_sent = sender_socket.send(&datagram).unwrap();
223 assert_eq!(n_sent, datagram.len());
224 n_sent = sender_socket.send(&datagram).unwrap();
225 assert_eq!(n_sent, datagram.len());
226
227 let mut buf = [0u8; 30];
228
229 let mut n_read = udp_reader.read(&mut buf).unwrap();
230 assert_eq!(n_read, 30);
231 assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
232
233 n_read = udp_reader.read(&mut buf).unwrap();
234 assert_eq!(n_read, 20);
235 assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
236
237 n_read = udp_reader.read(&mut buf).unwrap();
238 assert_eq!(n_read, 30);
239 assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
240
241 n_read = udp_reader.read(&mut buf).unwrap();
242 assert_eq!(n_read, 20);
243 assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
244 }
245}