I think these might be two issues in one:
- There's no example (except maybe the SSE one) that has streaming responses. Which is probably because it's hard to come up with a good example for streaming responses.
- I haven't found good documentation on how to "pipe" an
AsyncWrite
into a Stream<Item=Bytes(Mut)>
(maybe something for actix-web-lab).
Note: Also see discussion on actix-web Discord: https://discord.com/channels/771444961383153695/771447545154371646/1009110232473014383
The proposed example is one that streams files from a directory (files
) as a zip file, i.e. it dynamically creates the zip file.
For this I'm using async_zip
which exposes an API that requires the user to pass in an AsyncWrite
(ZipFileWriter::new
).
To "pipe" the AsyncWrite
to a Stream
, I'm using a DuplexStream
and the BytesCodec
.
main.rs
use actix_web::{get, http, App, HttpResponse, HttpServer, Responder};
use async_zip::write::{EntryOptions, ZipFileWriter};
use futures::stream::TryStreamExt;
use std::io;
use tokio::io::AsyncWrite;
use tokio_util::codec;
#[get("/")]
async fn index() -> impl Responder {
let (to_write, to_read) = tokio::io::duplex(2048);
tokio::spawn(async move {
let mut zipper = async_zip::write::ZipFileWriter::new(to_write);
if let Err(e) = read_dir(&mut zipper).await {
// TODO: do something
eprintln!("Failed to write files from directory to zip: {e}")
}
if let Err(e) = zipper.close().await {
// TODO: do something
eprintln!("Failed to close zipper: {e}")
}
});
let stream = codec::FramedRead::new(to_read, codec::BytesCodec::new()).map_ok(|b| b.freeze());
HttpResponse::Ok()
.append_header((
http::header::CONTENT_DISPOSITION,
r#"attachment; filename="folder.zip""#,
))
// not sure if this is really necessary,
// but we're already sending compressed data,
// so make sure other middleware won't compress this again
.append_header((http::header::CONTENT_ENCODING, "identity"))
.streaming(stream)
}
async fn read_dir<W>(zipper: &mut ZipFileWriter<W>) -> Result<(), io::Error>
where
W: AsyncWrite + Unpin,
{
let mut dir = tokio::fs::read_dir("files").await?;
while let Ok(Some(entry)) = dir.next_entry().await {
if !entry.metadata().await.map(|m| m.is_file()).unwrap_or(false) {
continue;
}
let mut file = match tokio::fs::OpenOptions::new()
.read(true)
.open(entry.path())
.await
{
Ok(f) => f,
Err(_) => continue, // we can't read the file
};
let filename = match entry.file_name().into_string() {
Ok(s) => s,
Err(_) => continue, // the file has a non UTF-8 name
};
let mut entry = zipper
.write_entry_stream(EntryOptions::new(filename, async_zip::Compression::Deflate))
.await
.map_err(zip_to_io_err)?;
tokio::io::copy(&mut file, &mut entry).await?;
entry.close().await.map_err(zip_to_io_err)?;
}
Ok(())
}
fn zip_to_io_err(e: async_zip::error::ZipError) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
#[actix_web::main]
async fn main() -> io::Result<()> {
HttpServer::new(move || App::new().service(index))
.bind(("127.0.0.1", 8080))?
.run()
.await
}
As I explained on Discord, using a DuplexStream
is probably overkill, since it's supposed to be used bi-directional (see example in tokio docs), so I tried to extract the internal Pipe
used by the tokio implementation and made a pipe specifically for (buffered) piping of AsyncWrite
to a Stream<BytesMut>
. I'm not sure if this should be included in actix-web-lab
as a utility when dealing with AsyncWrite
(or maybe in some other crate?).
async_pipe.rs
use std::{sync::{Arc, Mutex, MutexGuard}, task::{self,Waker, Poll}, pin::Pin};
use bytes::{BytesMut, Buf};
use futures::Stream;
use tokio::io::AsyncWrite;
/// The `AsyncWrite` half of an [`async_pipe`]
pub struct AsyncPipeWriter(Arc<Mutex<Pipe>>);
/// The `Stream` half of an [`async_pipe`]
pub struct AsyncPipeReader(Arc<Mutex<Pipe>>);
/// Creates buffered pipe that pipes writes from an `AsyncWrite` to a `Stream<Item=BytesMut>`.
///
/// `max_buf_size` is the maximum amount of bytes that can be written to the pipe's internal buffer
/// before further writes return `Poll::Pending`.
pub fn async_pipe(max_buf_size: usize) -> (AsyncPipeWriter, AsyncPipeReader) {
let pipe = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
(AsyncPipeWriter(pipe.clone()), AsyncPipeReader(pipe))
}
/// A unidirectional IO over a piece of memory.
///
/// Data can be written to the pipe, and reading will return that data.
///
/// [tokio's](https://github.com/tokio-rs/tokio/blob/de81985762a242c77361a6ab9de198372ca85987/tokio/src/io/util/mem.rs#L54-L76)
/// internal representation of a pipe for a `tokio::io::DuplexStream`.
#[derive(Debug)]
struct Pipe {
/// The buffer storing the bytes written, also read from.
///
/// Using a `BytesMut` because it has efficient `Buf` and `BufMut`
/// functionality already.
buffer: BytesMut,
/// Determines if the write side has been closed.
is_closed: bool,
/// The maximum amount of bytes that can be written before returning
/// `Poll::Pending`.
max_buf_size: usize,
/// If the `read` side has been polled and is pending, this is the waker
/// for that parked task.
read_waker: Option<Waker>,
/// If the `write` side has filled the `max_buf_size` and returned
/// `Poll::Pending`, this is the waker for that parked task.
write_waker: Option<Waker>,
}
impl Pipe {
fn new(max_buf_size: usize) -> Self {
Pipe {
buffer: BytesMut::new(),
is_closed: false,
max_buf_size,
read_waker: None,
write_waker: None,
}
}
fn close_write(&mut self) {
self.is_closed = true;
// needs to notify any readers that no more data will come
if let Some(waker) = self.read_waker.take() {
waker.wake();
}
}
fn close_read(&mut self) {
self.is_closed = true;
// needs to notify any writers that they have to abort
if let Some(waker) = self.write_waker.take() {
waker.wake();
}
}
fn poll_read_internal(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>
) -> Poll<Option<BytesMut>> {
if self.buffer.has_remaining() {
let bytes = std::mem::take(&mut self.buffer);
if let Some(waker) = self.write_waker.take() {
waker.wake();
}
Poll::Ready(Some(bytes))
} else if self.is_closed {
Poll::Ready(None)
} else {
self.read_waker = Some(cx.waker().clone());
Poll::Pending
}
}
fn poll_write_internal(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
if self.is_closed {
return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()));
}
let avail = self.max_buf_size - self.buffer.len();
if avail == 0 {
self.write_waker = Some(cx.waker().clone());
return Poll::Pending;
}
let len = buf.len().min(avail);
self.buffer.extend_from_slice(&buf[..len]);
if let Some(waker) = self.read_waker.take() {
waker.wake();
}
Poll::Ready(Ok(len))
}
}
impl AsyncWrite for Pipe {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.poll_write_internal(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
_: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
self.close_write();
Poll::Ready(Ok(()))
}
}
impl AsyncWrite for AsyncPipeWriter {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut *always_lock(&self.0)).poll_write(cx, buf)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut *always_lock(&self.0)).poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut *always_lock(&self.0)).poll_shutdown(cx)
}
}
impl Stream for Pipe {
type Item = BytesMut;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_read_internal(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.buffer.remaining();
(remaining, Some(remaining))
}
}
impl Stream for AsyncPipeReader {
type Item = BytesMut;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut *always_lock(&self.0)).poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
always_lock(&self.0).size_hint()
}
}
impl Drop for AsyncPipeWriter {
fn drop(&mut self) {
// notify the other side of the closure
always_lock(&self.0).close_write();
}
}
impl Drop for AsyncPipeReader {
fn drop(&mut self) {
// notify the other side of the closure
always_lock(&self.0).close_read();
}
}
#[inline]
fn always_lock<T>(mtx: &Mutex<T>) -> MutexGuard<T> {
match mtx.lock() {
Ok(g) => g,
Err(e) => e.into_inner(),
}
}
index handler with async_pipe
#[get("/")]
async fn index() -> impl Responder {
let (to_write, stream) = async_pipe(2048);
tokio::spawn(async move {
let mut zipper = async_zip::write::ZipFileWriter::new(to_write);
if let Err(e) = read_dir(&mut zipper).await {
// TODO: do something
eprintln!("Failed to write files from directory to zip: {e}")
}
if let Err(e) = zipper.close().await {
// TODO: do something
eprintln!("Failed to close zipper: {e}")
}
});
HttpResponse::Ok()
.append_header((
http::header::CONTENT_DISPOSITION,
r#"attachment; filename="folder.zip""#,
))
// not sure if this is really necessary,
// but we're already sending compressed data,
// so make sure other middleware won't compress this again
.append_header((http::header::CONTENT_ENCODING, "identity"))
.streaming(stream.map(|b| Ok::<_, io::Error>(b.freeze())))
}
cargo.toml
[package]
name = "actix-zippy"
version = "0.1.0"
edition = "2021"
[dependencies]
actix-web = "4.1.0"
async_zip = "0.0.8"
bytes = "1.2.1"
futures = "0.3.23"
tokio = { version = "1.20.1", features = ["io-util", "fs"] }
tokio-stream = "0.1.9"
tokio-util = { version = "0.7.3", features = ["codec"] }
good first issue new-example