1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
//! This module implements a buffered/peekable reader using async I/O.
//!
//! The purpose of the buffered/peekable reader is to allow for backtracking parsers.
//!
//! This is the async version of [`crate::peek_reader::PeekReader`].
//! A reader implementing the tokio library's [`tokio::io::AsyncBufRead`]/[`tokio::io::AsyncBufReadExt`] traits seems like a good fit, but
//! it does not allow for peeking a specific number of bytes, so it provides no way to request
//! more data from the underlying reader without consuming the existing data.
//!
//! This API still tries to adhere to the [`tokio::io::AsyncBufRead`]'s trait philosophy.
//!
//! The main type [`AsyncPeekReader`] does not implement [`tokio::io::AsyncBufReadExt`] itself, as there is no added benefit
//! in doing so.
//!
use tokio::io::AsyncReadExt;
use crate::error::MessageReadError;
/// A buffered/peekable reader
///
/// This reader wraps a type implementing [`tokio::io::AsyncRead`] and adds buffering via an internal buffer.
///
/// It allows the user to `peek` a specified number of bytes (without consuming them),
/// to `read` bytes (consuming them), or to `consume` them after `peek`ing.
///
/// NOTE: This reader is generic over the size of the buffer, defaulting to MAVLink's current largest
/// possible message size of 280 bytes
///
pub struct AsyncPeekReader<R, const BUFFER_SIZE: usize = 280> {
// Internal buffer
buffer: [u8; BUFFER_SIZE],
// The position of the next byte to read from the buffer.
cursor: usize,
// The position of the next byte to read into the buffer.
top: usize,
// The wrapped reader.
reader: R,
}
impl<R: AsyncReadExt + Unpin, const BUFFER_SIZE: usize> AsyncPeekReader<R, BUFFER_SIZE> {
/// Instantiates a new [`AsyncPeekReader`], wrapping the provided [`tokio::io::AsyncReadExt`] and using the default chunk size
pub fn new(reader: R) -> Self {
Self {
buffer: [0; BUFFER_SIZE],
cursor: 0,
top: 0,
reader,
}
}
/// Peeks an exact amount of bytes from the internal buffer
///
/// If the internal buffer does not contain enough data, this function will read
/// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
///
/// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
///
/// This function does not consume data from the buffer, so subsequent calls to `peek` or `read` functions
/// will still return the peeked data.
///
pub async fn peek_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
self.fetch(amount, false).await
}
/// Reads a specified amount of bytes from the internal buffer
///
/// If the internal buffer does not contain enough data, this function will read
/// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
///
/// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
///
/// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
///
pub async fn read_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
self.fetch(amount, true).await
}
/// Reads a byte from the internal buffer
///
/// If the internal buffer does not contain enough data, this function will read
/// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
///
/// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
///
/// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
///
pub async fn read_u8(&mut self) -> Result<u8, MessageReadError> {
let buf = self.read_exact(1).await?;
Ok(buf[0])
}
/// Consumes a specified amount of bytes from the buffer
///
/// If the internal buffer does not contain enough data, this function will consume as much data as is buffered.
///
pub fn consume(&mut self, amount: usize) -> usize {
let amount = amount.min(self.top - self.cursor);
self.cursor += amount;
amount
}
/// Returns an immutable reference to the underlying [`tokio::io::AsyncRead`]
///
/// Reading directly from the underlying stream will cause data loss
pub fn reader_ref(&mut self) -> &R {
&self.reader
}
/// Returns a mutable reference to the underlying [`tokio::io::AsyncRead`]
///
/// Reading directly from the underlying stream will cause data loss
pub fn reader_mut(&mut self) -> &mut R {
&mut self.reader
}
/// Internal function to fetch data from the internal buffer and/or reader
async fn fetch(&mut self, amount: usize, consume: bool) -> Result<&[u8], MessageReadError> {
let buffered = self.top - self.cursor;
// the caller requested more bytes than we have buffered, fetch them from the reader
if buffered < amount {
let bytes_read = amount - buffered;
assert!(bytes_read < BUFFER_SIZE);
let mut buf = [0u8; BUFFER_SIZE];
// read needed bytes from reader
self.reader.read_exact(&mut buf[..bytes_read]).await?;
// if some bytes were read, add them to the buffer
if self.buffer.len() - self.top < bytes_read {
// reallocate
self.buffer.copy_within(self.cursor..self.top, 0);
self.cursor = 0;
self.top = buffered;
}
self.buffer[self.top..self.top + bytes_read].copy_from_slice(&buf[..bytes_read]);
self.top += bytes_read;
}
let result = &self.buffer[self.cursor..self.cursor + amount];
if consume {
self.cursor += amount;
}
Ok(result)
}
}