Future Serializer

Repository RustAsync

In my work with networked projects, I often find myself deserializing messages from the network. Since TCP is a streaming protocol, data read from a socket may contain part of a message or multiple messages. This affects how the data can be deserialized. If there is not enough data, this isn’t an error in the same way it would usually be since more data may arrive to complete the message. On the other hand, there may also be more data in the buffer than a single message. This condition must also be handled gracefully. For this exploration, we will assume that messages do not have any length encoded into them, and we must examine the message to find its end. E.g. JSON.

Simple Deserializer

The straightforward way to handle this is to sort of brute force the problem. First, build a queue. When there is incoming data, append it to the queue. Then attempt to deserialize the head of the queue. If unsuccessful, wait for more data. If the deserializer produced a message, remove that many bytes from the queue and emit the deserialized message.

Something like this:

buffer.extend(read); // Read data
while let Some((len, item)) = simple_deserialize(buffer.make_contiguous()){
    items.push(item); // If successful add to the collection of deserialized items
    buffer.drain(0..len); // Remove successfully read bytes
}

Each field in simple_deserialize then deserializes like the following:

let x = if data.len() >= 4 { // There are enough bytes to deserialize the value `x`
    i32::from_le_bytes(data[0..4].try_into().unwrap())
}else{
    return None; // There were not enough bytes, abort
};

This works. However, if the stream of serialized messages is fragmented, the deserializer will repeat work during its attempted deserialization. Any partial message will cause the deserializer to try to deserialize that portion at least twice, or more if the message is fragmented further.

Stateful Deserializer

To solve this repeated work issue the intermediate state of the deserializer could be saved, and when the next chunk of bytes arrives the deserialization can be resumed where it left off.

The main loop starts to look a bit different, since the calling code no longer has to maintain the buffer. The buffer must be managed by the deserializer because its state becomes coupled with the state of the partial deserialization. It might become something like the following:

let mut deserializer = TestItemDeserializer::new();

deserializer.add_bytes(read); // read some bytes, and add them to the deserializer instance

// Get all possible deserializations with the bytes in the deserializer so far
while let Some(deserialized_item) = deserializer.deserialize() { 
    items.push(deserialized_item); // Emit a successful deserialization
}

This avoids the repeated work, but adds to the complexity of the deserializer. For example, lets say the following is the struct being transmitted.

pub struct TestItem {
    pub x: i32,
    pub y: i32,
    pub bytes: Vec<u8>
}

The deserializer must try to deserialize each field. If there are enough bytes to succeed, the field must be saved. If there are too few bytes, the function must return None. Finally, when the function is to be resumed, it must check all the intermediate values for each of the fields in turn to recover where it was before it got stuck.

An additional struct is needed in order to save the intermediate deserialization progress. This deserializer stores the buffer of received bytes (buf). It must also store x and y as the message fragment boundary might fall at any point between those fields. It also stores len which is the length of the vec to be deserialized. The vec itself is not required in the intermediate state as once the vec is deserialized, the entire deserialization is complete and TestItem is returned immediately.

pub struct TestItemDeserializer {
    buf: VecDeque<u8>,
    x: Option<i32>,
    y: Option<i32>,
    len: Option<i32>,
}

Each must then be deserialized like the following. This requires much more boilerplate than the previous simple deserializer. There are more conditions to test for, and there is more state to maintain. This would be a real pain to write and maintain.

if self.x.is_none() {
    if self.buf.len() >= 4 {
        let mut buf = [0u8; 4];
        self.buf
            .read_exact(&mut buf)
            .expect("There are enough bytes to read");
        let x = i32::from_le_bytes(buf);
        self.x = Some(x);
    } else {
        return None;
    }
};

Future Deserializer

The stateful deserializer tracks where it is in the deserialization process, and is able to resume based on that state. This is incredibly similar to how futures work in rust. A future can attempt to make progress whenever its poll function is called. The poll function can either return a value if it was able to complete, or nothing if it was not. This matches the behavior of the .deserialize method of the stateful deserializer above.

Since the goal is to write some code that will return a TestItem and will save its progress along the way, there must be an async function that returns a TestItem. Furthermore, it must be able to somehow await the arrival of more bytes in the buffer. My solution to this is an async function that takes a ByteSource, which has an async read(n) function, and returns the TestItem.

pub async fn deserialize_func(mut reader: ByteSource) -> TestItem {
    let bytes = reader.read(4).await;
    let x = i32::from_le_bytes(bytes.as_slice().try_into().unwrap());
    let bytes = reader.read(4).await;
    let y = i32::from_le_bytes(bytes.as_slice().try_into().unwrap());
    let bytes = reader.read(4).await;
    let len = i32::from_le_bytes(bytes.as_slice().try_into().unwrap());
    let bytes = reader.read(len as usize).await;

    TestItem { x, y, bytes }
}

The try_into().unwrap()s are not a problem since read is guaranteed to return the specified number of bytes. Further improvements to this solution could include read_xxx() functions that return each of the primitive types.

However, this leaves the issue of how to drive the function. Calling deserialize_func() will not produce a TestItem, only a future that resolves to a TestItem. There needs to be a struct to both drive the future returned from the function, as well as manage the addition of bytes to the ByteSource.

The following struct holds three things: The buffer (ByteSource), the provided async function to turn bytes into the yielded item, and a future that represents the progress through the current item being deserialized.

type Param = ByteSource;
pub struct FutureDeserializer<Func, Fut, Ret>
where
    Func: FnMut(Param) -> Fut,
    Fut: Future<Output = Ret>,
{
    buf: Param,
    func: Func,
    fut: Pin<Box<Fut>>,
}

Calling new, takes an async function as a parameter, initializes the ByteSource with a new, empty one, and initializes fut to the result of calling the async function.

All deserialize then has to do is poll the future. If poll returned Pending, deserialize returns None. If it returned Ready, deserialize can return Some(value) and re-initialize fut to a new future from the stored async function.

pub fn deserialize(&mut self) -> Option<Ret> {
    let mut cx = Context::from_waker(Waker::noop());

    match Future::poll(self.fut.as_mut(), &mut cx) {
        Poll::Pending => None,
        Poll::Ready(value) => {
            self.fut = Box::pin((self.func)(self.buf.clone()));
            Some(value)
        }
    }
}

Since there is no real executor here, there is no real context to pass to poll. Therefore a no-op context is used. While this works well, it introduces a subtlety. The function passed to new is async and can await other functions called from its body. These nested async functions will also be called with the no-op context and, if they were expecting a real context with a runtime, they will not be able to register themselves to be woken when they are ready to continue. Instead, they will have to rely on deserialize being called again.

It could be possible to create an async version of deserialize that uses the caller’s context by calling the future with the caller’s context. But this creates additional complication. To match the behavior of the sync version, the async version must fully return None when the ByteSource has too few bytes, but await when any other await point is hit.

Results

In testing, the stateful deserializer performed slightly worse than the future deserializer in all cases. However, the performance differences between the simple deserializer and the future deserializer were less conclusive.

In cases with low fragmentation, the simple deserializer performs much better than its future based counterpart. In cases with high fragmentation, the performance of the future deserializer is able to pull ahead. In real scenarios fragmentation is not likely to be extreme. However, the performance was tested with a small struct that serializes to a binary format very well. It is possible that a format that requires more complex parsing, or a much larger struct itself, could see more gains from the smarter retry technique.