mavlink_core/connection/udp/
sync.rs1use crate::Connectable;
4use crate::MAVLinkMessageRaw;
5use crate::connection::get_socket_addr;
6use crate::connection::{Connection, MavConnection};
7use crate::connection_shared::{
8 ConnectionState, next_send_header, read_message, read_raw_message, write_message,
9};
10use crate::peek_reader::PeekReader;
11use crate::{MavHeader, MavlinkVersion, Message};
12use core::ops::DerefMut;
13use std::collections::VecDeque;
14use std::io::{self, Read};
15use std::net::{SocketAddr, UdpSocket};
16use std::sync::Mutex;
17
18#[cfg(feature = "mav2-message-signing")]
19use crate::SigningConfig;
20
21use super::config::{UdpConfig, UdpMode};
22
23struct UdpRead {
24 socket: UdpSocket,
25 buffer: VecDeque<u8>,
26 last_recv_address: Option<SocketAddr>,
27}
28
29const MTU_SIZE: usize = 1500;
30impl Read for UdpRead {
31 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
32 if !self.buffer.is_empty() {
33 self.buffer.read(buf)
34 } else {
35 let mut read_buffer = [0u8; MTU_SIZE];
36 let (n_buffer, address) = self.socket.recv_from(&mut read_buffer)?;
37 let n = (&read_buffer[0..n_buffer]).read(buf)?;
38 self.buffer.extend(&read_buffer[n..n_buffer]);
39
40 self.last_recv_address = Some(address);
41 Ok(n)
42 }
43 }
44}
45
46struct UdpWrite {
47 socket: UdpSocket,
48 dest: Option<SocketAddr>,
49 sequence: u8,
50}
51
52pub struct UdpConnection {
53 reader: Mutex<PeekReader<UdpRead>>,
54 writer: Mutex<UdpWrite>,
55 state: ConnectionState,
56 server: bool,
57}
58
59impl UdpConnection {
60 fn new(socket: UdpSocket, server: bool, dest: Option<SocketAddr>) -> io::Result<Self> {
61 Ok(Self {
62 server,
63 reader: Mutex::new(PeekReader::new(UdpRead {
64 socket: socket.try_clone()?,
65 buffer: VecDeque::new(),
66 last_recv_address: None,
67 })),
68 writer: Mutex::new(UdpWrite {
69 socket,
70 dest,
71 sequence: 0,
72 }),
73 state: ConnectionState::new(),
74 })
75 }
76
77 fn update_reply_destination(&self, reader: &PeekReader<UdpRead>) {
78 if self.server {
79 if let addr @ Some(_) = reader.reader_ref().last_recv_address {
80 self.writer.lock().unwrap().dest = addr;
81 }
82 }
83 }
84}
85
86impl<M: Message> MavConnection<M> for UdpConnection {
87 fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
88 let mut reader = self.reader.lock().unwrap();
89
90 let result = read_message::<M, _>(reader.deref_mut(), &self.state);
91 self.update_reply_destination(&reader);
92 result
93 }
94
95 fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
96 let mut reader = self.reader.lock().unwrap();
97
98 let result = read_raw_message::<M, _>(reader.deref_mut(), &self.state);
99 self.update_reply_destination(&reader);
100 result
101 }
102
103 fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
104 let mut reader = self.reader.lock().unwrap();
105 reader.reader_mut().socket.set_nonblocking(true)?;
106
107 let result = read_message::<M, _>(reader.deref_mut(), &self.state);
108 self.update_reply_destination(&reader);
109
110 reader.reader_mut().socket.set_nonblocking(false)?;
111
112 result
113 }
114
115 fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
116 let mut guard = self.writer.lock().unwrap();
117 let state = &mut *guard;
118
119 let header = next_send_header(&mut state.sequence, header);
120
121 let len = if let Some(addr) = state.dest {
122 let mut buf = Vec::new();
123 write_message(&mut buf, &self.state, header, data)?;
124 state.socket.send_to(&buf, addr)?
125 } else {
126 0
127 };
128
129 Ok(len)
130 }
131
132 fn set_protocol_version(&mut self, version: MavlinkVersion) {
133 self.state.set_protocol_version(version);
134 }
135
136 fn protocol_version(&self) -> MavlinkVersion {
137 self.state.protocol_version()
138 }
139
140 fn set_allow_recv_any_version(&mut self, allow: bool) {
141 self.state.set_allow_recv_any_version(allow);
142 }
143
144 fn allow_recv_any_version(&self) -> bool {
145 self.state.allow_recv_any_version()
146 }
147
148 #[cfg(feature = "mav2-message-signing")]
149 fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
150 self.state.setup_signing(signing_data);
151 }
152}
153
154impl Connectable for UdpConfig {
155 fn connect<M: Message>(&self) -> io::Result<Connection<M>> {
156 let (addr, server, dest): (&str, _, _) = match self.mode {
157 UdpMode::Udpin => (&self.address, true, None),
158 _ => ("0.0.0.0:0", false, Some(get_socket_addr(&self.address)?)),
159 };
160 let socket = UdpSocket::bind(addr)?;
161 if let Some(timeout) = self.read_timeout {
162 socket.set_read_timeout(Some(timeout))?;
163 }
164 if matches!(self.mode, UdpMode::UdpBroadcast) {
165 socket.set_broadcast(true)?;
166 }
167 Ok(UdpConnection::new(socket, server, dest)?.into())
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn test_datagram_buffering() {
177 let receiver_socket = UdpSocket::bind("127.0.0.1:5000").unwrap();
178 let mut udp_reader = UdpRead {
179 socket: receiver_socket.try_clone().unwrap(),
180 buffer: VecDeque::new(),
181 last_recv_address: None,
182 };
183 let sender_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
184 sender_socket.connect("127.0.0.1:5000").unwrap();
185
186 let datagram: Vec<u8> = (0..50).collect::<Vec<_>>();
187
188 let mut n_sent = sender_socket.send(&datagram).unwrap();
189 assert_eq!(n_sent, datagram.len());
190 n_sent = sender_socket.send(&datagram).unwrap();
191 assert_eq!(n_sent, datagram.len());
192
193 let mut buf = [0u8; 30];
194
195 let mut n_read = udp_reader.read(&mut buf).unwrap();
196 assert_eq!(n_read, 30);
197 assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
198
199 n_read = udp_reader.read(&mut buf).unwrap();
200 assert_eq!(n_read, 20);
201 assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
202
203 n_read = udp_reader.read(&mut buf).unwrap();
204 assert_eq!(n_read, 30);
205 assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
206
207 n_read = udp_reader.read(&mut buf).unwrap();
208 assert_eq!(n_read, 20);
209 assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
210 }
211}