Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ use wstd::net::TcpListener;
use wstd::runtime::block_on;

fn main() -> io::Result<()> {
block_on(|reactor| async move {
let listener = TcpListener::bind(&reactor, "127.0.0.1:8080").await?;
block_on(async move {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on {}", listener.local_addr()?);
println!("type `nc localhost 8080` to create a TCP client");

Expand Down
35 changes: 15 additions & 20 deletions src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ use super::{response::IncomingBody, Body, Request, Response, Result};
use crate::runtime::Reactor;

/// An HTTP client.
// Empty for now, but permits adding support for RequestOptions soon:
#[derive(Debug)]
pub struct Client<'a> {
reactor: &'a Reactor,
}
pub struct Client {}

impl<'a> Client<'a> {
impl Client {
/// Create a new instance of `Client`
pub fn new(reactor: &'a Reactor) -> Self {
Self { reactor }
pub fn new() -> Self {
Self {}
}

/// Send an HTTP request.
Expand All @@ -27,7 +26,7 @@ impl<'a> Client<'a> {
let res = wasi::http::outgoing_handler::handle(wasi_req, None).unwrap();

// 2. Start sending the request body
io::copy(body, OutputStream::new(&self.reactor, body_stream))
io::copy(body, OutputStream::new(body_stream))
.await
.expect("io::copy broke oh no");

Expand All @@ -36,42 +35,38 @@ impl<'a> Client<'a> {
OutgoingBody::finish(wasi_body, trailers).unwrap();

// 4. Receive the response
self.reactor.wait_for(res.subscribe()).await;
Reactor::current().wait_for(res.subscribe()).await;
// NOTE: the first `unwrap` is to ensure readiness, the second `unwrap`
// is to trap if we try and get the response more than once. The final
// `?` is to raise the actual error if there is one.
let res = res.get().unwrap().unwrap()?;
Ok(Response::try_from_incoming_response(
res,
self.reactor.clone(),
)?)
Ok(Response::try_from_incoming_response(res)?)
}
}

struct OutputStream<'a> {
reactor: &'a Reactor,
struct OutputStream {
stream: wasi::http::types::OutputStream,
}

impl<'a> OutputStream<'a> {
fn new(reactor: &'a Reactor, stream: wasi::http::types::OutputStream) -> Self {
Self { reactor, stream }
impl OutputStream {
fn new(stream: wasi::http::types::OutputStream) -> Self {
Self { stream }
}
}

impl<'a> AsyncWrite for OutputStream<'a> {
impl AsyncWrite for OutputStream {
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let max = self.stream.check_write().unwrap() as usize;
let max = max.min(buf.len());
let buf = &buf[0..max];
self.stream.write(buf).unwrap();
self.reactor.wait_for(self.stream.subscribe()).await;
Reactor::current().wait_for(self.stream.subscribe()).await;
Ok(max)
}

async fn flush(&mut self) -> io::Result<()> {
self.stream.flush().unwrap();
self.reactor.wait_for(self.stream.subscribe()).await;
Reactor::current().wait_for(self.stream.subscribe()).await;
Ok(())
}
}
9 changes: 2 additions & 7 deletions src/http/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ pub struct Response<B: Body> {
// }

impl Response<IncomingBody> {
pub(crate) fn try_from_incoming_response(
incoming: IncomingResponse,
reactor: Reactor,
) -> super::Result<Self> {
pub(crate) fn try_from_incoming_response(incoming: IncomingResponse) -> super::Result<Self> {
let headers: Headers = incoming.headers().into();
let status = incoming.status().into();

Expand All @@ -64,7 +61,6 @@ impl Response<IncomingBody> {
let body = IncomingBody {
buf_offset: 0,
buf: None,
reactor,
body_stream,
_incoming_body: incoming_body,
};
Expand Down Expand Up @@ -101,7 +97,6 @@ impl<B: Body> Response<B> {
/// An incoming HTTP body
#[derive(Debug)]
pub struct IncomingBody {
reactor: Reactor,
buf: Option<Vec<u8>>,
// How many bytes have we already read from the buf?
buf_offset: usize,
Expand All @@ -119,7 +114,7 @@ impl AsyncRead for IncomingBody {
None => {
// Wait for an event to be ready
let pollable = self.body_stream.subscribe();
self.reactor.wait_for(pollable).await;
Reactor::current().wait_for(pollable).await;

// Read the bytes from the body stream
let buf = self.body_stream.read(CHUNK_SIZE).map_err(|err| match err {
Expand Down
10 changes: 2 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
//! use wstd::runtime::block_on;
//!
//! fn main() -> io::Result<()> {
//! block_on(|reactor| async move {
//! let listener = TcpListener::bind(&reactor, "127.0.0.1:8080").await?;
//! block_on(async move {
//! let listener = TcpListener::bind("127.0.0.1:8080").await?;
//! println!("Listening on {}", listener.local_addr()?);
//! println!("type `nc localhost 8080` to create a TCP client");
//!
Expand Down Expand Up @@ -56,12 +56,6 @@
//! bytes. And `wstd::runtime` provides access to async runtime primitives.
//! These are unique capabilities provided by WASI 0.2, and because this library
//! is specific to that are exposed from here.
//!
//! Finally, this library does not implicitly thread through a
//! [`Reactor`][runtime::Reactor] handle. Rather than using a `thread_local!`
//! async resource APIs in `wstd` will borrow an instance of `Reactor`. This is
//! a little more verbose, but in turn is a little simpler to implement,
//! maintain, and extend.

pub mod http;
pub mod io;
Expand Down
18 changes: 8 additions & 10 deletions src/net/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ use super::TcpStream;

/// A TCP socket server, listening for connections.
#[derive(Debug)]
pub struct TcpListener<'a> {
pub struct TcpListener {
socket: TcpSocket,
reactor: &'a Reactor,
}

impl<'a> TcpListener<'a> {
impl TcpListener {
/// Creates a new TcpListener which will be bound to the specified address.
///
/// The returned listener is ready for accepting connections.
pub async fn bind(reactor: &'a Reactor, addr: &str) -> io::Result<Self> {
pub async fn bind(addr: &str) -> io::Result<Self> {
let addr: SocketAddr = addr
.parse()
.map_err(|_| io::Error::other("failed to parse string to socket addr"))?;
Expand All @@ -41,6 +40,7 @@ impl<'a> TcpListener<'a> {
}
SocketAddr::V6(_) => todo!("IPv6 not yet supported in `wstd::net::TcpListener`"),
};
let reactor = Reactor::current();

socket
.start_bind(&network, local_address)
Expand All @@ -51,7 +51,7 @@ impl<'a> TcpListener<'a> {
socket.start_listen().map_err(to_io_err)?;
reactor.wait_for(socket.subscribe()).await;
socket.finish_listen().map_err(to_io_err)?;
Ok(Self { socket, reactor })
Ok(Self { socket })
}

/// Returns the local socket address of this listener.
Expand All @@ -70,15 +70,14 @@ impl<'a> TcpListener<'a> {
/// An iterator that infinitely accepts connections on a TcpListener.
#[derive(Debug)]
pub struct Incoming<'a> {
listener: &'a TcpListener<'a>,
listener: &'a TcpListener,
}

impl<'a> AsyncIterator for Incoming<'a> {
type Item = io::Result<TcpStream<'a>>;
type Item = io::Result<TcpStream>;

async fn next(&mut self) -> Option<Self::Item> {
self.listener
.reactor
Reactor::current()
.wait_for(self.listener.socket.subscribe())
.await;
let (socket, input, output) = match self.listener.socket.accept().map_err(to_io_err) {
Expand All @@ -89,7 +88,6 @@ impl<'a> AsyncIterator for Incoming<'a> {
socket,
input,
output,
reactor: self.listener.reactor,
}))
}
}
Expand Down
21 changes: 10 additions & 11 deletions src/net/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ use crate::{
};

/// A TCP stream between a local and a remote socket.
pub struct TcpStream<'a> {
pub(super) reactor: &'a Reactor,
pub struct TcpStream {
pub(super) input: InputStream,
pub(super) output: OutputStream,
pub(super) socket: TcpSocket,
}

impl<'a> TcpStream<'a> {
impl TcpStream {
/// Returns the socket address of the remote peer of this TCP connection.
pub fn peer_addr(&self) -> io::Result<String> {
let addr = self
Expand All @@ -29,29 +28,29 @@ impl<'a> TcpStream<'a> {
}
}

impl<'a> AsyncRead for TcpStream<'a> {
impl AsyncRead for TcpStream {
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.reactor.wait_for(self.input.subscribe()).await;
Reactor::current().wait_for(self.input.subscribe()).await;
let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?;
let bytes_read = slice.len();
buf[..bytes_read].clone_from_slice(&slice);
Ok(bytes_read)
}
}

impl<'a> AsyncRead for &TcpStream<'a> {
impl AsyncRead for &TcpStream {
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.reactor.wait_for(self.input.subscribe()).await;
Reactor::current().wait_for(self.input.subscribe()).await;
let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?;
let bytes_read = slice.len();
buf[..bytes_read].clone_from_slice(&slice);
Ok(bytes_read)
}
}

impl<'a> AsyncWrite for TcpStream<'a> {
impl AsyncWrite for TcpStream {
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.reactor.wait_for(self.output.subscribe()).await;
Reactor::current().wait_for(self.output.subscribe()).await;
self.output.write(buf).map_err(to_io_err)?;
Ok(buf.len())
}
Expand All @@ -61,9 +60,9 @@ impl<'a> AsyncWrite for TcpStream<'a> {
}
}

impl<'a> AsyncWrite for &TcpStream<'a> {
impl AsyncWrite for &TcpStream {
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.reactor.wait_for(self.output.subscribe()).await;
Reactor::current().wait_for(self.output.subscribe()).await;
self.output.write(buf).map_err(to_io_err)?;
Ok(buf.len())
}
Expand Down
22 changes: 14 additions & 8 deletions src/runtime/block_on.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::Reactor;
use super::{Reactor, REACTOR};

use core::future::Future;
use core::pin::pin;
Expand All @@ -7,16 +7,19 @@ use core::task::Waker;
use core::task::{Context, Poll, RawWaker, RawWakerVTable};

/// Start the event loop
pub fn block_on<F, Fut>(f: F) -> Fut::Output
pub fn block_on<Fut>(fut: Fut) -> Fut::Output
where
F: FnOnce(Reactor) -> Fut,
Fut: Future,
{
// Construct the reactor
let reactor = Reactor::new();
// Store a copy as a singleton to be used elsewhere:
let prev = REACTOR.replace(Some(reactor.clone()));
if prev.is_some() {
panic!("cannot wstd::runtime::block_on inside an existing block_on!")
}

// Create the future and pin it so it can be polled
let fut = (f)(reactor.clone());
// Pin the future so it can be polled
let mut fut = pin!(fut);

// Create a new context to be passed to the future.
Expand All @@ -25,12 +28,15 @@ where

// Either the future completes and we return, or some IO is happening
// and we wait.
loop {
let res = loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(res) => return res,
Poll::Ready(res) => break res,
Poll::Pending => reactor.block_until(),
}
}
};
// Clear the singleton
REACTOR.replace(None);
res
}

/// Construct a new no-op waker
Expand Down
13 changes: 10 additions & 3 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Async event loop support.
//!
//! The way to use this is to call [`block_on()`] to obtain an instance of
//! [`Reactor`]. You can then share the reactor in code that needs it to insert
//! instances of
//! The way to use this is to call [`block_on()`]. Inside the future, [`Reactor::current`]
//! will give an instance of the [`Reactor`] running the event loop, which can be
//! to [`Reactor::wait_for`] instances of
//! [`wasi::Pollable`](https://docs.rs/wasi/latest/wasi/io/poll/struct.Pollable.html).
//! This will automatically wait for the futures to resolve, and call the
//! necessary wakers to work.
Expand All @@ -16,3 +16,10 @@ mod reactor;

pub use block_on::block_on;
pub use reactor::Reactor;
use std::cell::RefCell;

// There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all
// use sites in the background.
std::thread_local! {
pub(crate) static REACTOR: RefCell<Option<Reactor>> = RefCell::new(None);
}
18 changes: 17 additions & 1 deletion src/runtime/reactor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use super::polling::{EventKey, Poller};
use super::{
polling::{EventKey, Poller},
REACTOR,
};

use core::cell::RefCell;
use core::future;
Expand All @@ -23,6 +26,19 @@ struct InnerReactor {
}

impl Reactor {
/// Return a `Reactor` for the currently running `wstd::runtime::block_on`.
///
/// # Panic
/// This will panic if called outside of `wstd::runtime::block_on`.
pub fn current() -> Self {
REACTOR.with(|r| {
r.borrow()
.as_ref()
.expect("Reactor::current must be called within a wstd runtime")
.clone()
})
}

/// Create a new instance of `Reactor`
pub(crate) fn new() -> Self {
Self {
Expand Down
Loading