In my work with networked projects, I often find myself deserializing messages from the network. Since TCP is a streaming protocol, data read from a socket may contain part of a message or multiple messages. This affects how the data can be deserialized. If there is not enough data, this isn’t an error in the same way it would usually be since more data may arrive to complete the message. On the other hand, there may also be more data in the buffer than a single message. This condition must also be handled gracefully. For this exploration, we will assume that messages do not have any length encoded into them, and we must examine the message to find its end. E.g. JSON.
The straightforward way to handle this is to sort of brute force the problem. First, build a queue. When there is incoming data, append it to the queue. Then attempt to deserialize the head of the queue. If unsuccessful, wait for more data. If the deserializer produced a message, remove that many bytes from the queue and emit the deserialized message.
Something like this:
buffer.extend(read); // Read data
while let Some((len, item)) = simple_deserialize(buffer.make_contiguous()){
items.push(item); // If successful add to the collection of deserialized items
buffer.drain(0..len); // Remove successfully read bytes
}
Each field in simple_deserialize then deserializes like the following:
let x = if data.len() >= 4 { // There are enough bytes to deserialize the value `x`
i32::from_le_bytes(data[0..4].try_into().unwrap())
}else{
return None; // There were not enough bytes, abort
};
This works. However, if the stream of serialized messages is fragmented, the deserializer will repeat work during its attempted deserialization. Any partial message will cause the deserializer to try to deserialize that portion at least twice, or more if the message is fragmented further.
To solve this repeated work issue the intermediate state of the deserializer could be saved, and when the next chunk of bytes arrives the deserialization can be resumed where it left off.
The main loop starts to look a bit different, since the calling code no longer has to maintain the buffer. The buffer must be managed by the deserializer because its state becomes coupled with the state of the partial deserialization. It might become something like the following:
let mut deserializer = TestItemDeserializer::new();
deserializer.add_bytes(read); // read some bytes, and add them to the deserializer instance
// Get all possible deserializations with the bytes in the deserializer so far
while let Some(deserialized_item) = deserializer.deserialize() {
items.push(deserialized_item); // Emit a successful deserialization
}
This avoids the repeated work, but adds to the complexity of the deserializer. For example, lets say the following is the struct being transmitted.
pub struct TestItem {
pub x: i32,
pub y: i32,
pub bytes: Vec<u8>
}
The deserializer must try to deserialize each field. If there are enough
bytes to succeed, the field must be saved. If there are too few bytes, the
function must return None. Finally, when the function is to be resumed, it
must check all the intermediate values for each of the fields in turn to recover
where it was before it got stuck.
An additional struct is needed in order to save the intermediate deserialization
progress. This deserializer stores the buffer of received bytes (buf). It must
also store x and y as the message fragment boundary might fall at any point
between those fields. It also stores len which is the length of the vec to be
deserialized. The vec itself is not required in the intermediate state as once
the vec is deserialized, the entire deserialization is complete and TestItem
is returned immediately.
pub struct TestItemDeserializer {
buf: VecDeque<u8>,
x: Option<i32>,
y: Option<i32>,
len: Option<i32>,
}
Each must then be deserialized like the following. This requires much more boilerplate than the previous simple deserializer. There are more conditions to test for, and there is more state to maintain. This would be a real pain to write and maintain.
if self.x.is_none() {
if self.buf.len() >= 4 {
let mut buf = [0u8; 4];
self.buf
.read_exact(&mut buf)
.expect("There are enough bytes to read");
let x = i32::from_le_bytes(buf);
self.x = Some(x);
} else {
return None;
}
};
The stateful deserializer tracks where it is in the deserialization process, and
is able to resume based on that state. This is incredibly similar to how futures
work in rust. A future can attempt to make progress whenever its poll function
is called. The poll function can either return a value if it was able to
complete, or nothing if it was not. This matches the behavior of the
.deserialize method of the stateful deserializer above.
Since the goal is to write some code that will return a TestItem and will save
its progress along the way, there must be an async function that returns a
TestItem. Furthermore, it must be able to somehow await the arrival of more
bytes in the buffer. My solution to this is an async function that takes a
ByteSource, which has an async read(n) function, and returns the TestItem.
pub async fn deserialize_func(mut reader: ByteSource) -> TestItem {
let bytes = reader.read(4).await;
let x = i32::from_le_bytes(bytes.as_slice().try_into().unwrap());
let bytes = reader.read(4).await;
let y = i32::from_le_bytes(bytes.as_slice().try_into().unwrap());
let bytes = reader.read(4).await;
let len = i32::from_le_bytes(bytes.as_slice().try_into().unwrap());
let bytes = reader.read(len as usize).await;
TestItem { x, y, bytes }
}
The try_into().unwrap()s are not a problem since read is guaranteed to return
the specified number of bytes. Further improvements to this solution could
include read_xxx() functions that return each of the primitive types.
However, this leaves the issue of how to drive the function. Calling
deserialize_func() will not produce a TestItem, only a future that resolves
to a TestItem. There needs to be a struct to both drive the future returned
from the function, as well as manage the addition of bytes to the ByteSource.
The following struct holds three things: The buffer (ByteSource), the provided
async function to turn bytes into the yielded item, and a future that represents
the progress through the current item being deserialized.
type Param = ByteSource;
pub struct FutureDeserializer<Func, Fut, Ret>
where
Func: FnMut(Param) -> Fut,
Fut: Future<Output = Ret>,
{
buf: Param,
func: Func,
fut: Pin<Box<Fut>>,
}
Calling new, takes an async function as a parameter, initializes the
ByteSource with a new, empty one, and initializes fut to the result of
calling the async function.
All deserialize then has to do is poll the future. If poll returned
Pending, deserialize returns None. If it returned Ready, deserialize
can return Some(value) and re-initialize fut to a new future from the stored
async function.
pub fn deserialize(&mut self) -> Option<Ret> {
let mut cx = Context::from_waker(Waker::noop());
match Future::poll(self.fut.as_mut(), &mut cx) {
Poll::Pending => None,
Poll::Ready(value) => {
self.fut = Box::pin((self.func)(self.buf.clone()));
Some(value)
}
}
}
Since there is no real executor here, there is no real context to pass to
poll. Therefore a no-op context is used. While this works well, it introduces
a subtlety. The function passed to new is async and can await other functions
called from its body. These nested async functions will also be called with the
no-op context and, if they were expecting a real context with a runtime, they
will not be able to register themselves to be woken when they are ready to
continue. Instead, they will have to rely on deserialize being called again.
It could be possible to create an async version of deserialize that uses the
caller’s context by calling the future with the caller’s context. But this
creates additional complication. To match the behavior of the sync version, the
async version must fully return None when the ByteSource has too few bytes,
but await when any other await point is hit.
In testing, the stateful deserializer performed slightly worse than the future deserializer in all cases. However, the performance differences between the simple deserializer and the future deserializer were less conclusive.
In cases with low fragmentation, the simple deserializer performs much better than its future based counterpart. In cases with high fragmentation, the performance of the future deserializer is able to pull ahead. In real scenarios fragmentation is not likely to be extreme. However, the performance was tested with a small struct that serializes to a binary format very well. It is possible that a format that requires more complex parsing, or a much larger struct itself, could see more gains from the smarter retry technique.