1use crate::connection::get_socket_addr;
4use crate::connection::MavConnection;
5use crate::peek_reader::PeekReader;
6#[cfg(not(feature = "signing"))]
7use crate::read_raw_versioned_msg;
8#[cfg(feature = "signing")]
9use crate::read_raw_versioned_msg_signed;
10use crate::Connectable;
11use crate::MAVLinkMessageRaw;
12use crate::{MavHeader, MavlinkVersion, Message, ReadVersion};
13use core::ops::DerefMut;
14use std::collections::VecDeque;
15use std::io::{self, Read};
16use std::net::{SocketAddr, UdpSocket};
17use std::sync::Mutex;
18
19#[cfg(not(feature = "signing"))]
20use crate::{read_versioned_msg, write_versioned_msg};
21
22#[cfg(feature = "signing")]
23use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData};
24
25pub mod config;
26
27use config::{UdpConfig, UdpMode};
28
29struct UdpRead {
30 socket: UdpSocket,
31 buffer: VecDeque<u8>,
32 last_recv_address: Option<SocketAddr>,
33}
34
35const MTU_SIZE: usize = 1500;
36impl Read for UdpRead {
37 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
38 if !self.buffer.is_empty() {
39 self.buffer.read(buf)
40 } else {
41 let mut read_buffer = [0u8; MTU_SIZE];
42 let (n_buffer, address) = self.socket.recv_from(&mut read_buffer)?;
43 let n = (&read_buffer[0..n_buffer]).read(buf)?;
44 self.buffer.extend(&read_buffer[n..n_buffer]);
45
46 self.last_recv_address = Some(address);
47 Ok(n)
48 }
49 }
50}
51
52struct UdpWrite {
53 socket: UdpSocket,
54 dest: Option<SocketAddr>,
55 sequence: u8,
56}
57
58pub struct UdpConnection {
59 reader: Mutex<PeekReader<UdpRead>>,
60 writer: Mutex<UdpWrite>,
61 protocol_version: MavlinkVersion,
62 recv_any_version: bool,
63 server: bool,
64 #[cfg(feature = "signing")]
65 signing_data: Option<SigningData>,
66}
67
68impl UdpConnection {
69 fn new(socket: UdpSocket, server: bool, dest: Option<SocketAddr>) -> io::Result<Self> {
70 Ok(Self {
71 server,
72 reader: Mutex::new(PeekReader::new(UdpRead {
73 socket: socket.try_clone()?,
74 buffer: VecDeque::new(),
75 last_recv_address: None,
76 })),
77 writer: Mutex::new(UdpWrite {
78 socket,
79 dest,
80 sequence: 0,
81 }),
82 protocol_version: MavlinkVersion::V2,
83 recv_any_version: false,
84 #[cfg(feature = "signing")]
85 signing_data: None,
86 })
87 }
88}
89
90impl<M: Message> MavConnection<M> for UdpConnection {
91 fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
92 let mut reader = self.reader.lock().unwrap();
93
94 loop {
95 let version = ReadVersion::from_conn_cfg::<_, M>(self);
96 #[cfg(not(feature = "signing"))]
97 let result = read_versioned_msg(reader.deref_mut(), version);
98 #[cfg(feature = "signing")]
99 let result =
100 read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
101 if self.server {
102 if let addr @ Some(_) = reader.reader_ref().last_recv_address {
103 self.writer.lock().unwrap().dest = addr;
104 }
105 }
106 if let ok @ Ok(..) = result {
107 return ok;
108 }
109 }
110 }
111
112 fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
113 let mut reader = self.reader.lock().unwrap();
114
115 loop {
116 let version = ReadVersion::from_conn_cfg::<_, M>(self);
117 #[cfg(not(feature = "signing"))]
118 let result = read_raw_versioned_msg::<M, _>(reader.deref_mut(), version);
119 #[cfg(feature = "signing")]
120 let result = read_raw_versioned_msg_signed::<M, _>(
121 reader.deref_mut(),
122 version,
123 self.signing_data.as_ref(),
124 );
125 if self.server {
126 if let addr @ Some(_) = reader.reader_ref().last_recv_address {
127 self.writer.lock().unwrap().dest = addr;
128 }
129 }
130 if let ok @ Ok(..) = result {
131 return ok;
132 }
133 }
134 }
135
136 fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
137 let mut reader = self.reader.lock().unwrap();
138 reader.reader_mut().socket.set_nonblocking(true)?;
139
140 let version = ReadVersion::from_conn_cfg::<_, M>(self);
141
142 #[cfg(not(feature = "signing"))]
143 let result = read_versioned_msg(reader.deref_mut(), version);
144 #[cfg(feature = "signing")]
145 let result =
146 read_versioned_msg_signed(reader.deref_mut(), version, self.signing_data.as_ref());
147
148 if self.server {
149 if let addr @ Some(_) = reader.reader_ref().last_recv_address {
150 self.writer.lock().unwrap().dest = addr;
151 }
152 }
153
154 reader.reader_mut().socket.set_nonblocking(false)?;
155
156 result
157 }
158
159 fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
160 let mut guard = self.writer.lock().unwrap();
161 let state = &mut *guard;
162
163 let header = MavHeader {
164 sequence: state.sequence,
165 system_id: header.system_id,
166 component_id: header.component_id,
167 };
168
169 state.sequence = state.sequence.wrapping_add(1);
170
171 let len = if let Some(addr) = state.dest {
172 let mut buf = Vec::new();
173 #[cfg(not(feature = "signing"))]
174 write_versioned_msg(&mut buf, self.protocol_version, header, data)?;
175 #[cfg(feature = "signing")]
176 write_versioned_msg_signed(
177 &mut buf,
178 self.protocol_version,
179 header,
180 data,
181 self.signing_data.as_ref(),
182 )?;
183 state.socket.send_to(&buf, addr)?
184 } else {
185 0
186 };
187
188 Ok(len)
189 }
190
191 fn set_protocol_version(&mut self, version: MavlinkVersion) {
192 self.protocol_version = version;
193 }
194
195 fn protocol_version(&self) -> MavlinkVersion {
196 self.protocol_version
197 }
198
199 fn set_allow_recv_any_version(&mut self, allow: bool) {
200 self.recv_any_version = allow;
201 }
202
203 fn allow_recv_any_version(&self) -> bool {
204 self.recv_any_version
205 }
206
207 #[cfg(feature = "signing")]
208 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
209 self.signing_data = signing_data.map(SigningData::from_config);
210 }
211}
212
213impl Connectable for UdpConfig {
214 fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
215 let (addr, server, dest): (&str, _, _) = match self.mode {
216 UdpMode::Udpin => (&self.address, true, None),
217 _ => ("0.0.0.0:0", false, Some(get_socket_addr(&self.address)?)),
218 };
219 let socket = UdpSocket::bind(addr)?;
220 if matches!(self.mode, UdpMode::Udpcast) {
221 socket.set_broadcast(true)?;
222 }
223 Ok(Box::new(UdpConnection::new(socket, server, dest)?))
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 #[test]
232 fn test_datagram_buffering() {
233 let receiver_socket = UdpSocket::bind("127.0.0.1:5000").unwrap();
234 let mut udp_reader = UdpRead {
235 socket: receiver_socket.try_clone().unwrap(),
236 buffer: VecDeque::new(),
237 last_recv_address: None,
238 };
239 let sender_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
240 sender_socket.connect("127.0.0.1:5000").unwrap();
241
242 let datagram: Vec<u8> = (0..50).collect::<Vec<_>>();
243
244 let mut n_sent = sender_socket.send(&datagram).unwrap();
245 assert_eq!(n_sent, datagram.len());
246 n_sent = sender_socket.send(&datagram).unwrap();
247 assert_eq!(n_sent, datagram.len());
248
249 let mut buf = [0u8; 30];
250
251 let mut n_read = udp_reader.read(&mut buf).unwrap();
252 assert_eq!(n_read, 30);
253 assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
254
255 n_read = udp_reader.read(&mut buf).unwrap();
256 assert_eq!(n_read, 20);
257 assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
258
259 n_read = udp_reader.read(&mut buf).unwrap();
260 assert_eq!(n_read, 30);
261 assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
262
263 n_read = udp_reader.read(&mut buf).unwrap();
264 assert_eq!(n_read, 20);
265 assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
266 }
267}