mavlink_core/
async_peek_reader.rs

1//! This module implements a buffered/peekable reader using async I/O.
2//!
3//! The purpose of the buffered/peekable reader is to allow for backtracking parsers.
4//!
5//! This is the async version of [`crate::peek_reader::PeekReader`].
6//! A reader implementing the tokio library's [`tokio::io::AsyncBufRead`]/[`tokio::io::AsyncBufReadExt`] traits seems like a good fit, but
7//! it does not allow for peeking a specific number of bytes, so it provides no way to request
8//! more data from the underlying reader without consuming the existing data.
9//!
10//! This API still tries to adhere to the [`tokio::io::AsyncBufRead`]'s trait philosophy.
11//!
12//! The main type [`AsyncPeekReader`] does not implement [`tokio::io::AsyncBufReadExt`] itself, as there is no added benefit
13//! in doing so.
14//!
15use tokio::io::AsyncReadExt;
16
17use crate::error::MessageReadError;
18
19/// A buffered/peekable reader
20///
21/// This reader wraps a type implementing [`tokio::io::AsyncRead`] and adds buffering via an internal buffer.
22///
23/// It allows the user to `peek` a specified number of bytes (without consuming them),
24/// to `read` bytes (consuming them), or to `consume` them after `peek`ing.
25///
26/// NOTE: This reader is generic over the size of the buffer, defaulting to MAVLink's current largest
27/// possible message size of 280 bytes
28///
29pub struct AsyncPeekReader<R, const BUFFER_SIZE: usize = 280> {
30    // Internal buffer
31    buffer: [u8; BUFFER_SIZE],
32    // The position of the next byte to read from the buffer.
33    cursor: usize,
34    // The position of the next byte to read into the buffer.
35    top: usize,
36    // The wrapped reader.
37    reader: R,
38}
39
40impl<R: AsyncReadExt + Unpin, const BUFFER_SIZE: usize> AsyncPeekReader<R, BUFFER_SIZE> {
41    /// Instantiates a new [`AsyncPeekReader`], wrapping the provided [`tokio::io::AsyncReadExt`] and using the default chunk size
42    pub fn new(reader: R) -> Self {
43        Self {
44            buffer: [0; BUFFER_SIZE],
45            cursor: 0,
46            top: 0,
47            reader,
48        }
49    }
50
51    /// Peeks an exact amount of bytes from the internal buffer
52    ///
53    /// If the internal buffer does not contain enough data, this function will read
54    /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
55    ///
56    /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
57    ///
58    /// This function does not consume data from the buffer, so subsequent calls to `peek` or `read` functions
59    /// will still return the peeked data.
60    ///
61    pub async fn peek_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
62        self.fetch(amount, false).await
63    }
64
65    /// Reads a specified amount of bytes from the internal buffer
66    ///
67    /// If the internal buffer does not contain enough data, this function will read
68    /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
69    ///
70    /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
71    ///
72    /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
73    ///
74    pub async fn read_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
75        self.fetch(amount, true).await
76    }
77
78    /// Reads a byte from the internal buffer
79    ///
80    /// If the internal buffer does not contain enough data, this function will read
81    /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
82    ///
83    /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
84    ///
85    /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
86    ///
87    pub async fn read_u8(&mut self) -> Result<u8, MessageReadError> {
88        let buf = self.read_exact(1).await?;
89        Ok(buf[0])
90    }
91
92    /// Consumes a specified amount of bytes from the buffer
93    ///
94    /// If the internal buffer does not contain enough data, this function will consume as much data as is buffered.
95    ///
96    pub fn consume(&mut self, amount: usize) -> usize {
97        let amount = amount.min(self.top - self.cursor);
98        self.cursor += amount;
99        amount
100    }
101
102    /// Returns an immutable reference to the underlying [`tokio::io::AsyncRead`]
103    ///
104    /// Reading directly from the underlying stream will cause data loss
105    pub fn reader_ref(&mut self) -> &R {
106        &self.reader
107    }
108
109    /// Returns a mutable reference to the underlying [`tokio::io::AsyncRead`]
110    ///
111    /// Reading directly from the underlying stream will cause data loss
112    pub fn reader_mut(&mut self) -> &mut R {
113        &mut self.reader
114    }
115
116    /// Internal function to fetch data from the internal buffer and/or reader
117    async fn fetch(&mut self, amount: usize, consume: bool) -> Result<&[u8], MessageReadError> {
118        let buffered = self.top - self.cursor;
119
120        // the caller requested more bytes than we have buffered, fetch them from the reader
121        if buffered < amount {
122            let bytes_read = amount - buffered;
123            assert!(bytes_read < BUFFER_SIZE);
124            let mut buf = [0u8; BUFFER_SIZE];
125
126            // read needed bytes from reader
127            self.reader.read_exact(&mut buf[..bytes_read]).await?;
128
129            // if some bytes were read, add them to the buffer
130
131            if self.buffer.len() - self.top < bytes_read {
132                // reallocate
133                self.buffer.copy_within(self.cursor..self.top, 0);
134                self.cursor = 0;
135                self.top = buffered;
136            }
137            self.buffer[self.top..self.top + bytes_read].copy_from_slice(&buf[..bytes_read]);
138            self.top += bytes_read;
139        }
140
141        let result = &self.buffer[self.cursor..self.cursor + amount];
142        if consume {
143            self.cursor += amount;
144        }
145        Ok(result)
146    }
147}