1use crate::connection::get_socket_addr;
4use crate::connection::{Connection, MavConnection};
5use crate::peek_reader::PeekReader;
6#[cfg(not(feature = "mav2-message-signing"))]
7use crate::read_versioned_raw_message;
8#[cfg(feature = "mav2-message-signing")]
9use crate::read_versioned_raw_message_signed;
10use crate::Connectable;
11use crate::MAVLinkMessageRaw;
12use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
13use core::ops::DerefMut;
14use std::collections::VecDeque;
15use std::io::{self, Read};
16use std::net::{SocketAddr, UdpSocket};
17use std::sync::Mutex;
18
19#[cfg(not(feature = "mav2-message-signing"))]
20use crate::{read_versioned_msg, write_versioned_msg};
21
22#[cfg(feature = "mav2-message-signing")]
23use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
24
25pub mod config;
26
27use config::{UdpConfig, UdpMode};
28
29struct UdpRead {
30 socket: UdpSocket,
31 buffer: VecDeque<u8>,
32 last_recv_address: Option<SocketAddr>,
33}
34
35const MTU_SIZE: usize = 1500;
36impl Read for UdpRead {
37 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
38 if !self.buffer.is_empty() {
39 self.buffer.read(buf)
40 } else {
41 let mut read_buffer = [0u8; MTU_SIZE];
42 let (n_buffer, address) = self.socket.recv_from(&mut read_buffer)?;
43 let n = (&read_buffer[0..n_buffer]).read(buf)?;
44 self.buffer.extend(&read_buffer[n..n_buffer]);
45
46 self.last_recv_address = Some(address);
47 Ok(n)
48 }
49 }
50}
51
52struct UdpWrite {
53 socket: UdpSocket,
54 dest: Option<SocketAddr>,
55 sequence: u8,
56}
57
58pub struct UdpConnection {
59 reader: Mutex<PeekReader<UdpRead>>,
60 writer: Mutex<UdpWrite>,
61 protocol_version: MavlinkVersion,
62 recv_any_version: bool,
63 server: bool,
64 #[cfg(feature = "mav2-message-signing")]
65 signing_data: Option<SigningData>,
66}
67
68impl UdpConnection {
69 fn new(socket: UdpSocket, server: bool, dest: Option<SocketAddr>) -> io::Result<Self> {
70 Ok(Self {
71 server,
72 reader: Mutex::new(PeekReader::new(UdpRead {
73 socket: socket.try_clone()?,
74 buffer: VecDeque::new(),
75 last_recv_address: None,
76 })),
77 writer: Mutex::new(UdpWrite {
78 socket,
79 dest,
80 sequence: 0,
81 }),
82 protocol_version: MavlinkVersion::V2,
83 recv_any_version: false,
84 #[cfg(feature = "mav2-message-signing")]
85 signing_data: None,
86 })
87 }
88}
89
90impl<M: Message> MavConnection<M> for UdpConnection {
91 fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
92 let mut reader = self.reader.lock().unwrap();
93 let version = ReadVersion::from_conn_cfg::<_, M>(self);
94
95 #[cfg(not(feature = "mav2-message-signing"))]
96 let result = read_versioned_msg(reader.deref_mut(), version);
97 #[cfg(feature = "mav2-message-signing")]
98 let result =
99 read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
100 if self.server {
101 if let addr @ Some(_) = reader.reader_ref().last_recv_address {
102 self.writer.lock().unwrap().dest = addr;
103 }
104 }
105 result
106 }
107
108 fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
109 let mut reader = self.reader.lock().unwrap();
110 let version = ReadVersion::from_conn_cfg::<_, M>(self);
111
112 #[cfg(not(feature = "mav2-message-signing"))]
113 let result = read_versioned_raw_message::<M, _>(reader.deref_mut(), version);
114 #[cfg(feature = "mav2-message-signing")]
115 let result = read_versioned_raw_message_signed::<M, _>(
116 reader.deref_mut(),
117 version,
118 self.signing_data.as_ref(),
119 );
120 if self.server {
121 if let addr @ Some(_) = reader.reader_ref().last_recv_address {
122 self.writer.lock().unwrap().dest = addr;
123 }
124 }
125 result
126 }
127
128 fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
129 let mut reader = self.reader.lock().unwrap();
130 reader.reader_mut().socket.set_nonblocking(true)?;
131
132 let version = ReadVersion::from_conn_cfg::<_, M>(self);
133
134 #[cfg(not(feature = "mav2-message-signing"))]
135 let result = read_versioned_msg(reader.deref_mut(), version);
136 #[cfg(feature = "mav2-message-signing")]
137 let result =
138 read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
139
140 if self.server {
141 if let addr @ Some(_) = reader.reader_ref().last_recv_address {
142 self.writer.lock().unwrap().dest = addr;
143 }
144 }
145
146 reader.reader_mut().socket.set_nonblocking(false)?;
147
148 result
149 }
150
151 fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
152 let mut guard = self.writer.lock().unwrap();
153 let state = &mut *guard;
154
155 let header = MavHeader {
156 sequence: state.sequence,
157 system_id: header.system_id,
158 component_id: header.component_id,
159 };
160
161 state.sequence = state.sequence.wrapping_add(1);
162
163 let len = if let Some(addr) = state.dest {
164 let mut buf = Vec::new();
165 #[cfg(not(feature = "mav2-message-signing"))]
166 write_versioned_msg(&mut buf, self.protocol_version, header, data)?;
167 #[cfg(feature = "mav2-message-signing")]
168 write_versioned_msg_signed(
169 &mut buf,
170 self.protocol_version,
171 header,
172 data,
173 self.signing_data.as_ref(),
174 )?;
175 state.socket.send_to(&buf, addr)?
176 } else {
177 0
178 };
179
180 Ok(len)
181 }
182
183 fn set_protocol_version(&mut self, version: MavlinkVersion) {
184 self.protocol_version = version;
185 }
186
187 fn protocol_version(&self) -> MavlinkVersion {
188 self.protocol_version
189 }
190
191 fn set_allow_recv_any_version(&mut self, allow: bool) {
192 self.recv_any_version = allow;
193 }
194
195 fn allow_recv_any_version(&self) -> bool {
196 self.recv_any_version
197 }
198
199 #[cfg(feature = "mav2-message-signing")]
200 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
201 self.signing_data = signing_data.map(SigningData::from_config);
202 }
203}
204
205impl Connectable for UdpConfig {
206 fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
207 let (addr, server, dest): (&str, _, _) = match self.mode {
208 UdpMode::Udpin => (&self.address, true, None),
209 _ => ("0.0.0.0:0", false, Some(get_socket_addr(&self.address)?)),
210 };
211 let socket = UdpSocket::bind(addr)?;
212 if let Some(timeout) = self.read_timeout {
213 socket.set_read_timeout(Some(timeout))?;
214 }
215 if matches!(self.mode, UdpMode::UdpBroadcast) {
216 socket.set_broadcast(true)?;
217 }
218 Ok(UdpConnection::new(socket, server, dest)?.into())
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn test_datagram_buffering() {
228 let receiver_socket = UdpSocket::bind("127.0.0.1:5000").unwrap();
229 let mut udp_reader = UdpRead {
230 socket: receiver_socket.try_clone().unwrap(),
231 buffer: VecDeque::new(),
232 last_recv_address: None,
233 };
234 let sender_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
235 sender_socket.connect("127.0.0.1:5000").unwrap();
236
237 let datagram: Vec<u8> = (0..50).collect::<Vec<_>>();
238
239 let mut n_sent = sender_socket.send(&datagram).unwrap();
240 assert_eq!(n_sent, datagram.len());
241 n_sent = sender_socket.send(&datagram).unwrap();
242 assert_eq!(n_sent, datagram.len());
243
244 let mut buf = [0u8; 30];
245
246 let mut n_read = udp_reader.read(&mut buf).unwrap();
247 assert_eq!(n_read, 30);
248 assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
249
250 n_read = udp_reader.read(&mut buf).unwrap();
251 assert_eq!(n_read, 20);
252 assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
253
254 n_read = udp_reader.read(&mut buf).unwrap();
255 assert_eq!(n_read, 30);
256 assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
257
258 n_read = udp_reader.read(&mut buf).unwrap();
259 assert_eq!(n_read, 20);
260 assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
261 }
262}