Skip to main content

mavlink_core/
async_peek_reader.rs

1//! This module implements a buffered/peekable reader using async I/O.
2//!
3//! The purpose of the buffered/peekable reader is to allow for backtracking parsers.
4//!
5//! This is the async version of [`crate::peek_reader::PeekReader`].
6//! A reader implementing the tokio library's [`tokio::io::AsyncBufRead`]/[`tokio::io::AsyncBufReadExt`] traits seems like a good fit, but
7//! it does not allow for peeking a specific number of bytes, so it provides no way to request
8//! more data from the underlying reader without consuming the existing data.
9//!
10//! This API still tries to adhere to the [`tokio::io::AsyncBufRead`]'s trait philosophy.
11//!
12//! The main type [`AsyncPeekReader`] does not implement [`tokio::io::AsyncBufReadExt`] itself, as there is no added benefit
13//! in doing so.
14//!
15
16#[cfg(doc)]
17use std::io::ErrorKind;
18
19use tokio::io::AsyncReadExt;
20
21use crate::error::MessageReadError;
22
23/// A buffered/peekable reader
24///
25/// This reader wraps a type implementing [`tokio::io::AsyncRead`] and adds buffering via an internal buffer.
26///
27/// It allows the user to `peek` a specified number of bytes (without consuming them),
28/// to `read` bytes (consuming them), or to `consume` them after `peek`ing.
29///
30/// NOTE: This reader is generic over the size of the buffer, defaulting to MAVLink's current largest
31/// possible message size of 280 bytes
32///
33pub struct AsyncPeekReader<R, const BUFFER_SIZE: usize = 280> {
34    // Internal buffer
35    buffer: [u8; BUFFER_SIZE],
36    // The position of the next byte to read from the buffer.
37    cursor: usize,
38    // The position of the next byte to read into the buffer.
39    top: usize,
40    // The wrapped reader.
41    reader: R,
42}
43
44impl<R: AsyncReadExt + Unpin, const BUFFER_SIZE: usize> AsyncPeekReader<R, BUFFER_SIZE> {
45    /// Instantiates a new [`AsyncPeekReader`], wrapping the provided [`tokio::io::AsyncReadExt`] and using the default chunk size
46    pub fn new(reader: R) -> Self {
47        Self {
48            buffer: [0; BUFFER_SIZE],
49            cursor: 0,
50            top: 0,
51            reader,
52        }
53    }
54
55    /// Peeks an exact amount of bytes from the internal buffer
56    ///
57    /// If the internal buffer does not contain enough data, this function will read
58    /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
59    ///
60    /// This function does not consume data from the buffer, so subsequent calls to `peek` or `read` functions
61    /// will still return the peeked data.
62    ///
63    /// # Errors
64    ///
65    /// - If any error occurs while reading from the underlying [`tokio::io::AsyncReadExt`] it is returned
66    /// - If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
67    ///
68    /// # Panics
69    ///
70    /// Will panic when attempting to read more bytes then `BUFFER_SIZE`
71    pub async fn peek_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
72        self.fetch(amount, false).await
73    }
74
75    /// Reads a specified amount of bytes from the internal buffer
76    ///
77    /// If the internal buffer does not contain enough data, this function will read
78    /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
79    ///
80    /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
81    ///
82    /// # Errors
83    ///
84    /// - If any error occurs while reading from the underlying [`tokio::io::AsyncReadExt`] it is returned
85    /// - If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
86    ///
87    /// # Panics
88    ///
89    /// Will panic when attempting to read more bytes then `BUFFER_SIZE`
90    pub async fn read_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
91        self.fetch(amount, true).await
92    }
93
94    /// Reads a byte from the internal buffer
95    ///
96    /// If the internal buffer does not contain enough data, this function will read
97    /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
98    ///
99    /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
100    ///
101    /// # Errors
102    ///
103    /// - If any error occurs while reading from the underlying [`tokio::io::AsyncReadExt`] it is returned
104    /// - If an EOF occurs before a byte could be read, this function will return an [`ErrorKind::UnexpectedEof`].
105    ///
106    /// # Panics
107    ///
108    /// Will panic if this `AsyncPeekReader`'s `BUFFER_SIZE` is 0.  
109    pub async fn read_u8(&mut self) -> Result<u8, MessageReadError> {
110        let buf = self.read_exact(1).await?;
111        Ok(buf[0])
112    }
113
114    /// Consumes a specified amount of bytes from the buffer
115    ///
116    /// If the internal buffer does not contain enough data, this function will consume as much data as is buffered.
117    ///
118    pub fn consume(&mut self, amount: usize) -> usize {
119        let amount = amount.min(self.top - self.cursor);
120        self.cursor += amount;
121        amount
122    }
123
124    /// Returns an immutable reference to the underlying [`tokio::io::AsyncRead`]
125    ///
126    /// Reading directly from the underlying reader will cause data loss
127    pub fn reader_ref(&mut self) -> &R {
128        &self.reader
129    }
130
131    /// Returns a mutable reference to the underlying [`tokio::io::AsyncRead`]
132    ///
133    /// Reading directly from the underlying reader will cause data loss
134    pub fn reader_mut(&mut self) -> &mut R {
135        &mut self.reader
136    }
137
138    /// Internal function to fetch data from the internal buffer and/or reader
139    async fn fetch(&mut self, amount: usize, consume: bool) -> Result<&[u8], MessageReadError> {
140        assert!(BUFFER_SIZE >= amount);
141
142        let buffered = self.top - self.cursor;
143
144        // the caller requested more bytes than we have buffered, fetch them from the reader
145        if buffered < amount {
146            let bytes_needed = amount - buffered;
147
148            // Check if we have space at the tail. If not, compact the buffer.
149            if self.top + bytes_needed > self.buffer.len() {
150                // Move active data to the beginning of the buffer
151                self.buffer.copy_within(self.cursor..self.top, 0);
152                self.cursor = 0;
153                self.top = buffered;
154            }
155
156            // Read directly into the internal buffer.
157            let dest = &mut self.buffer[self.top..self.top + bytes_needed];
158            self.reader.read_exact(dest).await?;
159
160            self.top += bytes_needed;
161        }
162
163        let result = &self.buffer[self.cursor..self.cursor + amount];
164        if consume {
165            self.cursor += amount;
166        }
167        Ok(result)
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[tokio::test]
176    #[should_panic(expected = "assertion failed")]
177    async fn test_peek_exact_panics_when_amount_exceeds_buffer_size() {
178        let data = b"abcd";
179        let mut reader = AsyncPeekReader::<_, 4>::new(&data[..]);
180        let _ = reader.peek_exact(5).await;
181    }
182
183    #[tokio::test]
184    #[should_panic(expected = "assertion failed")]
185    async fn test_read_exact_panics_when_amount_exceeds_buffer_size() {
186        let data = b"abcd";
187        let mut reader = AsyncPeekReader::<_, 4>::new(&data[..]);
188        let _ = reader.read_exact(5).await;
189    }
190}