async_nursery - Rust (original) (raw)

Crate async_nursery

Source

Expand description

standard-readme compliant Build Status Docs crates.io

Primitive for structured concurrency.

The nursery allows writing concurrent programs adhering to structured concurrency. If you are new to the concept, there are some excellent resources on the dedicated structured concurrency forum. The name of the library is inspired by the excellent python Trio library.

§Table of Contents

§Description

async_nursery brings a structured concurrency primitive to Rust. There are three main goals in structured concurrency:

§1. A sane control flow for concurrent programs.

Notes on structured concurrency, or: Go statement considered harmful by Nathaniel J. Smith explains this exquisitely. To summarize, if a function wants to split off and do some work concurrently, make sure that all its child tasks are finished when the function returns. That way it functions as the black box we are used to from synchronous code. A function has inputs and a return value, and when it is done, no code it created is running anymore.

You could already do this by stuffing JoinHandles from async_executors in a FuturesUnordered, but as we will see below, async_nursery is a bit more flexible and convenient. As opposed to the JoinHandles from tokio or async-std directly, the ones from async_executors do not detach on drop by default.

§2. Prevent resource leakage.

Orphaned tasks, spawned without a JoinHandle can potentially stay alive forever, either running in loops, or deadlocking. Structured concurrency makes sure there are no leaks, putting all resources neatly in a call tree, very similar to a call stack. In a call tree, a stack frame can be several stack frames sitting side by side doing things concurrently, but when we return to the previous stack frame, all of them are done.

§3. Propagate errors

In Rust it is common to propagate errors up the call stack. If you spawn a task and let it run off into the void, you need out-of-band error handling like channels. In structured concurrency, since all tasks get joined before their parent returns, you can return the errors just like in sync code. It is also possible to cancel all sibling tasks if one task runs into an error.

§Properties of async_nursery:

§Missing features

§Install

With cargo add:cargo add async_nursery

With cargo yaml:

dependencies:

   async_nursery: ^0.6

With Cargo.toml

[dependencies]

   async_nursery = "0.6"

§Upgrade

Please check out the changelog when upgrading.

§Dependencies

This crate has few dependencies (futures and async_executors). Cargo will automatically handle it’s dependencies for you. You will have to choose executors from the async_executors crate and set the correct feature on that crate to enable it.

There are no optional features.

§Security

The crate uses forbid(unsafe), but depends on futures which has quite some unsafe. There are no security issues I’m aware of specific to using this crate.

§Performance

Currently the implementation is simple. Nursery just sends the JoinHandle to NurseryStream over an unbounded channel. This is convenient, because it means NurseExt::nurse doesn’t have to be async, but it has some overhead compared to using the underlying executor directly. In the future I hope to optimize the implementation.

§Usage

Warning: If ever you wait on the stream to finish, remember it will only finish if there are no Nursery’s alive anymore. You must drop the Nursery before awaiting the NurseryStream. If your program deadlocks, this should be the first place to look.

All tasks spawned on a nursery must have the same Future::Output type.

§Basic example

There is an extensive list of examples for all kinds of patterns of using async_nursery in the examples directory. Please have a look at them.

use
{
   async_nursery   :: { Nursery, NurseExt } ,
   async_executors :: { AsyncStd          } ,
};

pub type DynResult<T> = Result<T, Box< dyn std::error::Error + Send + Sync + 'static >>;

async fn self_contained() -> DynResult<()>
{
   let (nursery, output) = Nursery::new( AsyncStd );

   for _ in 0..5
   {
      nursery.nurse( async { /* do something useful */ } )?;
   }

   // This is necessary. Since we could keep spawning tasks even after starting to poll
   // the output, it can't know that we are done, unless we drop all senders or call
   // `close_nursery`. If we don't, the await below deadlocks.
   //
   drop(nursery);

   // Resolves when all spawned tasks are done.
   //
   output.await;

   Ok(())
}

§Returning errors

The functionality of TryStreamExt::try_next can be used to bail early if all concurrent tasks need to complete successfully. You can now drop the NurseryStream and cancel all running sibling tasks.

§Recover other return types

It’s possible to return useful data sometimes from spawned tasks. You can effectively see them as function calls or closures that can run concurrently. The nursery let’s you recover these as you go. It could be used to implement a progress bar for example.

Another possibility is using collect to gain a collection of all returned values when everything is done.

§Panics

Nursery has no special handling of panics. If your task panics, it depends on the executor what happens. Currently tokio is different from other executors in that it will catch_unwind your spawned tasks. Other executors propagate the panic to the thread that awaits the JoinHandles (eg. that awaits the NurseryStream). If you want a resilient application that works on all executors, use the catch_unwind combinator from the futures library. Again using TryStreamExt::try_next you can bail early if one task panics.

§Differences with FuturesUnordered

Nursery and NurseryStream wrap a FuturesUnordered internally. The main feature this gives us is that it allows to us to start polling the stream of outputs and still continue to spawn more subtasks. FuturesUnordered has a very strict two phase API. First spawn, then get output. This allows us to use NuseryStream as a long-lived container. Eg. if you are going to spawn network requests, you can continuously listen to NurseryStream for errors that happened during processing while continuing to spawn further requests. Then when the connection closes, we want to stop processing outstanding requests for this connection. By dropping the NurseryStream, we can do that.

Further a few conveniences are added:

§API

API documentation can be found on docs.rs.

§Contributing

Please check out the contribution guidelines.

§Testing

cargo test and rustup run nightly wasm-pack test --firefox --headless -- --features "implementation" --no-default-features.

§Code of conduct

Any of the behaviors described in point 4 “Unacceptable Behavior” of the Citizens Code of Conduct are not welcome here and might get you banned. If anyone, including maintainers and moderators of the project, fail to respect these/your limits, you are entitled to call them out.

§License

Unlicence

Nurseryimplementation

The sender part of the nursery. Wraps an unbounded sender. Can be cloned. To manage the spawned tasks and await their output, see NurseryStream.

NurseryStreamimplementation

Collection of JoinHandles of tasks spawned on the nursery. When this is dropped, all spawned tasks are canceled. You can poll the Stream implementation on this to obtain the outputs of your tasks. You can await the Future implementation if you don’t care about the outputs but just want to wait until all spawned tasks are done.

NurseErr

The error type for errors happening in async_nursery.

LocalNurse

Same as Nurse but doesn’t require the futures to be Send.

LocalNurseExt

Extension trait that allows passing in a future directly. Does the conversion to LocalFutureObjfor you.

Nurse

Implementors provide the possiblity to nurse futures. Technically this means you can spawn on this object without the tasks having to return () but still you get no JoinHandle.

NurseExt

Extension trait that allows passing in a future directly. Does the conversion to FutureObjfor you.