mavlink_core/async_peek_reader.rs
1//! This module implements a buffered/peekable reader using async I/O.
2//!
3//! The purpose of the buffered/peekable reader is to allow for backtracking parsers.
4//!
5//! This is the async version of [`crate::peek_reader::PeekReader`].
6//! A reader implementing the tokio library's [`tokio::io::AsyncBufRead`]/[`tokio::io::AsyncBufReadExt`] traits seems like a good fit, but
7//! it does not allow for peeking a specific number of bytes, so it provides no way to request
8//! more data from the underlying reader without consuming the existing data.
9//!
10//! This API still tries to adhere to the [`tokio::io::AsyncBufRead`]'s trait philosophy.
11//!
12//! The main type [`AsyncPeekReader`] does not implement [`tokio::io::AsyncBufReadExt`] itself, as there is no added benefit
13//! in doing so.
14//!
15
16#[cfg(doc)]
17use std::io::ErrorKind;
18
19use tokio::io::AsyncReadExt;
20
21use crate::error::MessageReadError;
22
23/// A buffered/peekable reader
24///
25/// This reader wraps a type implementing [`tokio::io::AsyncRead`] and adds buffering via an internal buffer.
26///
27/// It allows the user to `peek` a specified number of bytes (without consuming them),
28/// to `read` bytes (consuming them), or to `consume` them after `peek`ing.
29///
30/// NOTE: This reader is generic over the size of the buffer, defaulting to MAVLink's current largest
31/// possible message size of 280 bytes
32///
33pub struct AsyncPeekReader<R, const BUFFER_SIZE: usize = 280> {
34 // Internal buffer
35 buffer: [u8; BUFFER_SIZE],
36 // The position of the next byte to read from the buffer.
37 cursor: usize,
38 // The position of the next byte to read into the buffer.
39 top: usize,
40 // The wrapped reader.
41 reader: R,
42}
43
44impl<R: AsyncReadExt + Unpin, const BUFFER_SIZE: usize> AsyncPeekReader<R, BUFFER_SIZE> {
45 /// Instantiates a new [`AsyncPeekReader`], wrapping the provided [`tokio::io::AsyncReadExt`] and using the default chunk size
46 pub fn new(reader: R) -> Self {
47 Self {
48 buffer: [0; BUFFER_SIZE],
49 cursor: 0,
50 top: 0,
51 reader,
52 }
53 }
54
55 /// Peeks an exact amount of bytes from the internal buffer
56 ///
57 /// If the internal buffer does not contain enough data, this function will read
58 /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
59 ///
60 /// This function does not consume data from the buffer, so subsequent calls to `peek` or `read` functions
61 /// will still return the peeked data.
62 ///
63 /// # Errors
64 ///
65 /// - If any error occurs while reading from the underlying [`tokio::io::AsyncReadExt`] it is returned
66 /// - If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
67 ///
68 /// # Panics
69 ///
70 /// Will panic when attempting to read more bytes then `BUFFER_SIZE`
71 pub async fn peek_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
72 self.fetch(amount, false).await
73 }
74
75 /// Reads a specified amount of bytes from the internal buffer
76 ///
77 /// If the internal buffer does not contain enough data, this function will read
78 /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
79 ///
80 /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
81 ///
82 /// # Errors
83 ///
84 /// - If any error occurs while reading from the underlying [`tokio::io::AsyncReadExt`] it is returned
85 /// - If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
86 ///
87 /// # Panics
88 ///
89 /// Will panic when attempting to read more bytes then `BUFFER_SIZE`
90 pub async fn read_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
91 self.fetch(amount, true).await
92 }
93
94 /// Reads a byte from the internal buffer
95 ///
96 /// If the internal buffer does not contain enough data, this function will read
97 /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
98 ///
99 /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
100 ///
101 /// # Errors
102 ///
103 /// - If any error occurs while reading from the underlying [`tokio::io::AsyncReadExt`] it is returned
104 /// - If an EOF occurs before a byte could be read, this function will return an [`ErrorKind::UnexpectedEof`].
105 ///
106 /// # Panics
107 ///
108 /// Will panic if this `AsyncPeekReader`'s `BUFFER_SIZE` is 0.
109 pub async fn read_u8(&mut self) -> Result<u8, MessageReadError> {
110 let buf = self.read_exact(1).await?;
111 Ok(buf[0])
112 }
113
114 /// Consumes a specified amount of bytes from the buffer
115 ///
116 /// If the internal buffer does not contain enough data, this function will consume as much data as is buffered.
117 ///
118 pub fn consume(&mut self, amount: usize) -> usize {
119 let amount = amount.min(self.top - self.cursor);
120 self.cursor += amount;
121 amount
122 }
123
124 /// Returns an immutable reference to the underlying [`tokio::io::AsyncRead`]
125 ///
126 /// Reading directly from the underlying reader will cause data loss
127 pub fn reader_ref(&mut self) -> &R {
128 &self.reader
129 }
130
131 /// Returns a mutable reference to the underlying [`tokio::io::AsyncRead`]
132 ///
133 /// Reading directly from the underlying reader will cause data loss
134 pub fn reader_mut(&mut self) -> &mut R {
135 &mut self.reader
136 }
137
138 /// Internal function to fetch data from the internal buffer and/or reader
139 async fn fetch(&mut self, amount: usize, consume: bool) -> Result<&[u8], MessageReadError> {
140 let buffered = self.top - self.cursor;
141
142 // the caller requested more bytes than we have buffered, fetch them from the reader
143 if buffered < amount {
144 let bytes_read = amount - buffered;
145 assert!(bytes_read < BUFFER_SIZE);
146 let mut buf = [0u8; BUFFER_SIZE];
147
148 // read needed bytes from reader
149 self.reader.read_exact(&mut buf[..bytes_read]).await?;
150
151 // if some bytes were read, add them to the buffer
152
153 if self.buffer.len() - self.top < bytes_read {
154 // reallocate
155 self.buffer.copy_within(self.cursor..self.top, 0);
156 self.cursor = 0;
157 self.top = buffered;
158 }
159 self.buffer[self.top..self.top + bytes_read].copy_from_slice(&buf[..bytes_read]);
160 self.top += bytes_read;
161 }
162
163 let result = &self.buffer[self.cursor..self.cursor + amount];
164 if consume {
165 self.cursor += amount;
166 }
167 Ok(result)
168 }
169}