mavlink_core/
peek_reader.rs

1//! This module implements a buffered/peekable reader.
2//!
3//! The purpose of the buffered/peekable reader is to allow for backtracking parsers.
4//!
5//! A reader implementing the standard library's [`std::io::BufRead`] trait seems like a good fit, but
6//! it does not allow for peeking a specific number of bytes, so it provides no way to request
7//! more data from the underlying reader without consuming the existing data.
8//!
9//! This API still tries to adhere to the [`std::io::BufRead`]'s trait philosophy.
10//!
11//! The main type `PeekReader`does not implement [`std::io::Read`] itself, as there is no added benefit
12//! in doing so.
13//!
14#[cfg(any(feature = "embedded", feature = "embedded-hal-02"))]
15use crate::embedded::Read;
16
17#[cfg(feature = "std")]
18use std::io::Read;
19
20use crate::error::MessageReadError;
21
22/// A buffered/peekable reader
23///
24/// This reader wraps a type implementing [`std::io::Read`] and adds buffering via an internal buffer.
25///
26/// It allows the user to `peek` a specified number of bytes (without consuming them),
27/// to `read` bytes (consuming them), or to `consume` them after `peek`ing.
28///
29/// NOTE: This reader is generic over the size of the buffer, defaulting to MAVLink's current largest
30/// possible message size of 280 bytes
31///
32pub struct PeekReader<R, const BUFFER_SIZE: usize = 280> {
33    // Internal buffer
34    buffer: [u8; BUFFER_SIZE],
35    // The position of the next byte to read from the buffer.
36    cursor: usize,
37    // The position of the next byte to read into the buffer.
38    top: usize,
39    // The wrapped reader.
40    reader: R,
41}
42
43impl<R: Read, const BUFFER_SIZE: usize> PeekReader<R, BUFFER_SIZE> {
44    /// Instantiates a new [`PeekReader`], wrapping the provided [`std::io::Read`]er and using the default chunk size
45    pub fn new(reader: R) -> Self {
46        Self {
47            buffer: [0; BUFFER_SIZE],
48            cursor: 0,
49            top: 0,
50            reader,
51        }
52    }
53
54    /// Peeks an exact amount of bytes from the internal buffer
55    ///
56    /// If the internal buffer does not contain enough data, this function will read
57    /// from the underlying [`std::io::Read`]er until it does, an error occurs or no more data can be read (EOF).
58    ///
59    /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
60    ///
61    /// This function does not consume data from the buffer, so subsequent calls to `peek` or `read` functions
62    /// will still return the peeked data.
63    ///
64    pub fn peek_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
65        let result = self.fetch(amount, false);
66        result
67    }
68
69    /// Reads a specified amount of bytes from the internal buffer
70    ///
71    /// If the internal buffer does not contain enough data, this function will read
72    /// from the underlying [`std::io::Read`]er until it does, an error occurs or no more data can be read (EOF).
73    ///
74    /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
75    ///
76    /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
77    ///
78    pub fn read_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
79        self.fetch(amount, true)
80    }
81
82    /// Reads a byte from the internal buffer
83    ///
84    /// If the internal buffer does not contain enough data, this function will read
85    /// from the underlying [`std::io::Read`]er until it does, an error occurs or no more data can be read (EOF).
86    ///
87    /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
88    ///
89    /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
90    ///
91    pub fn read_u8(&mut self) -> Result<u8, MessageReadError> {
92        let buf = self.read_exact(1)?;
93        Ok(buf[0])
94    }
95
96    /// Consumes a specified amount of bytes from the buffer
97    ///
98    /// If the internal buffer does not contain enough data, this function will consume as much data as is buffered.
99    ///
100    pub fn consume(&mut self, amount: usize) -> usize {
101        let amount = amount.min(self.top - self.cursor);
102        self.cursor += amount;
103        amount
104    }
105
106    /// Returns an immutable reference to the underlying [`std::io::Read`]er
107    ///
108    /// Reading directly from the underlying stream will cause data loss
109    pub fn reader_ref(&self) -> &R {
110        &self.reader
111    }
112
113    /// Returns a mutable reference to the underlying [`std::io::Read`]er
114    ///
115    /// Reading directly from the underlying stream will cause data loss
116    pub fn reader_mut(&mut self) -> &mut R {
117        &mut self.reader
118    }
119
120    /// Internal function to fetch data from the internal buffer and/or reader
121    fn fetch(&mut self, amount: usize, consume: bool) -> Result<&[u8], MessageReadError> {
122        loop {
123            let buffered = self.top - self.cursor;
124
125            if buffered >= amount {
126                break;
127            }
128
129            // the caller requested more bytes than we have buffered, fetch them from the reader
130            let bytes_to_read = amount - buffered;
131            assert!(bytes_to_read < BUFFER_SIZE);
132            let mut buf = [0u8; BUFFER_SIZE];
133
134            // read needed bytes from reader
135            let bytes_read = self.reader.read(&mut buf[..bytes_to_read])?;
136
137            if bytes_read == 0 {
138                return Err(MessageReadError::eof());
139            }
140
141            // if some bytes were read, add them to the buffer
142
143            if self.buffer.len() - self.top < bytes_read {
144                // reallocate
145                self.buffer.copy_within(self.cursor..self.top, 0);
146                self.cursor = 0;
147                self.top = buffered;
148            }
149            self.buffer[self.top..self.top + bytes_read].copy_from_slice(&buf[..bytes_read]);
150            self.top += bytes_read;
151        }
152
153        let result = &self.buffer[self.cursor..self.cursor + amount];
154        if consume {
155            self.cursor += amount;
156        }
157        Ok(result)
158    }
159}