Skip to content

Commit

Permalink
draft blog post for 0.20
Browse files Browse the repository at this point in the history
  • Loading branch information
dwrensha committed Sep 17, 2024
1 parent 06cc784 commit 32b23a8
Showing 1 changed file with 256 additions and 0 deletions.
256 changes: 256 additions & 0 deletions blog/_drafts/0.20-release.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
---
layout: post
title: 0.20 — streaming with backpressure
author: dwrensha
---

Version 0.20 of [capnproto-rust](https://github.com/capnproto/capnproto-rust)
is now [available on crates.io](https://crates.io/crates/capnp).
In this release, the library has new built-in support for
[streaming](https://capnproto.org/news/2020-04-23-capnproto-0.8.html#multi-stream-flow-control).

## the `stream` keyword

Suppose we have the following interface defined
in a capnp [schema file](https://capnproto.org/language.html):

```
interface ByteSink {
# A destination for a sequence of bytes.
write @0 (chunk :Data);
# Writes a chunk of bytes.
end @1 ();
# Indicates that no more chunks will be written.
}
```

In Rust code, we can invoke methods on a (possibly-remote) `ByteSink` object
through handles of type `byte_sink::Client`. For example:

```rust
/// Writes all of `bytes` to `sink` in a call to `write()`.
async fn write_bytes_all_at_once(
bytes: &[u8],
sink: byte_sink::Client,
) -> Result<(), capnp::Error> {
// Construct an RPC request for ByteSink.write().
let mut request = sink.write_request();

// Copy all of the bytes into the request.
request.get().set_chunk(bytes);

// Send the request and wait for the response.
request.send().promise.await?;

// Send a ByteSink::end() request and wait for the response.
sink.end_request().send().promise.await?;

Ok(())
}
```

However, if `bytes` is too large, we might
observe congestion, spikes in memory usage, or
an error like this:

```
remote exception: Failed: Message has 8750020 words, which is too large.
```

Fortunately, the `ByteSink` interface lets us avoid such problems. We can split
the bytes into chunks over multiple `ByteSink.write()` calls:


```rust
const CHUNK_SIZE: usize = 8192;

/// Writes all of `bytes` to `sink` in chunks of size `CHUNK_SIZE`.
async fn write_bytes_chunked(
mut bytes: &[u8],
sink: byte_sink::Client,
) -> Result<(), capnp::Error> {
// Loop until all bytes have been written.
while bytes.len() > 0 {
// Compute the end index.
let end = usize::min(CHUNK_SIZE, bytes.len());

// Construct the write() request.
let mut request = sink.write_request();
request.get().set_chunk(&bytes[..end]);

// Send the request and wait for a response.
request.send().promise.await?;

// Update `bytes` to point to the remaining bytes.
bytes = &bytes[end..];
}

// Send `ByteSink.end()`.
sink.end_request().send().promise.await?;

Ok(())
}
```

Now the bytes should transfer much more smoothly,
but we've introduced a new problem: latency.
After sending each `ByteStream.write()` request,
we're waiting for the server to send back its response
before we start the next request.
That will likely slow things down considerably!

Here's a naive, bad way to avoid such latency:

```rust
/// Like write_bytes_chunked(), but sends all write() requests
/// immediately.
async fn write_bytes_chunked_in_parallel(
mut bytes: &[u8],
sink: byte_sink::Client,
) -> Result<(), capnp::Error> {
// Construct an object that will manage multiple futures
// executing at once.
let mut responses = FuturesOrdered::new();

// Loop until all writes have been enqueued.
while bytes.len() > 0 {
let end = usize::min(CHUNK_SIZE, bytes.len());

// Construct the write() request.
let mut request = sink.write_request();
request.get().set_chunk(&bytes[..end]);

// Send the write() request and collect its response
// future (but don't wait on it yet).
responses.push_back(request.send().promise);

bytes = &bytes[end..];
}
// Wait for all of the responses.
while let Some(x) = responses.next().await {
x?;
}

// Send `ByteSink.end()`.
sink.end_request().send().promise.await?;

Ok(())
}
```

The problem is that, because we immediately send all the requests,
now we are back to causing memory spikes and congestion,
even if each individual message is small.

We need to add some way to limit the number of in-flight requests.
That's going to make the code significantly more complicated!
Perhaps you'd like to try it as an exercise?


The good news is that capnproto-rust can now automatically
perform such bookkeeping, and our code can remain simple.
To enable the new functionality, we define our
method with the `stream` keyword:


```
interface StreamingByteSink {
write @0 (chunk :Data) -> stream;
# Writes a chunk of bytes.
end @1 ();
# Indicates that no more chunks will be written.
}
```

Now we can write our code in the following direct style:

```rust

/// Like write_bytes_chunked(), but the library
/// automatically handles backpressure.
async fn write_bytes_streaming(
mut bytes: &[u8],
sink: streaming_byte_sink::Client,
) -> Result<(), capnp::Error> {
// Loop until all bytes have been enqueued.
while bytes.len() > 0 {
let end = usize::min(CHUNK_SIZE, bytes.len());

// Construct the write() request.
let mut request = sink.write_request();
request.get().set_chunk(&bytes[..end]);

// Send the request and wait until the RPC system determines that
// we are clear to send another chunk.
request.send().await?;

bytes = &bytes[end..];
}

// The end() method is not streaming. It does not return until all of
// the streaming calls have completed. If any streaming call triggered
// an error, that error will be returned here.
sink.end_request().send().promise.await?;

Ok(())
}
```

Behind the scenes, capnproto-rust maintains a limit on how many bytes
are in flight for this object at any given time.
Currently it uses a fixed value
that can be configured via `twoparty::VatNetwork::set_window_size()`.
In the future, the library could potentially add sophisticated dynamic
flow control to optimize throughput, without requiring
any code change from users.

## implementing streaming methods

To create an instance of a `ByteSinkStreaming` object in Rust,
we need to implement the `byte_sink_streaming::Server` trait:

```rust
impl byte_sink_streaming::Server for ByteStreamImpl {
fn write(
&mut self,
params: byte_sink_streaming::WriteParams
) -> Promise<(), Error> {
todo!()
}

fn end(
&mut self,
params: byte_sink_streaming::EndParams,
results: byte_sink_streaming::EndResults,
) -> Promise<(), Error> {
todo!()
}

}
```

Because it was declared with `-> stream`, the `write()` method
does not have a `Results` parameter,
unlike `end()` and other non-streaming methods.
Streaming methods never have any values to return.

### difference from capnproto-c++

[In the C++ implementation](https://github.com/capnproto/capnproto/pull/825),
streaming method calls are delivered one-by-one
to the implementing object, with no overlap.
This is intended "as a convenience" and to
make it easier to interface with `kj::AsyncOutputStream` objects.
In Rust, async I/O objects are rather different
and enjoy good type system support for preventing misuse,
so I opted not to implement the one-by-one delivery guarantee
in capnproto-rust.

## example

For a working example, see
the [capnp-rpc/examples/streaming](https://github.com/capnproto/capnproto-rust/tree/master/capnp-rpc/examples/streaming)
directory.

0 comments on commit 32b23a8

Please sign in to comment.