Async streams - C# feature specifications (original) (raw)

Note

This article is a feature specification. The specification serves as the design document for the feature. It includes proposed specification changes, along with information needed during the design and development of the feature. These articles are published until the proposed spec changes are finalized and incorporated in the current ECMA specification.

There may be some discrepancies between the feature specification and the completed implementation. Those differences are captured in the pertinent language design meeting (LDM) notes.

You can learn more about the process for adopting feature speclets into the C# language standard in the article on the specifications.

Champion issue: https://github.com/dotnet/csharplang/issues/43

Summary

C# has support for iterator methods and async methods, but no support for a method that is both an iterator and an async method. We should rectify this by allowing for await to be used in a new form of async iterator, one that returns an IAsyncEnumerable<T> or IAsyncEnumerator<T> rather than an IEnumerable<T> or IEnumerator<T>, with IAsyncEnumerable<T> consumable in a new await foreach. An IAsyncDisposable interface is also used to enable asynchronous cleanup.

Detailed design

Interfaces

IAsyncDisposable

There has been much discussion of IAsyncDisposable (e.g. https://github.com/dotnet/roslyn/issues/114) and whether it's a good idea. However, it's a required concept to add in support of async iterators. Since finally blocks may contain awaits, and since finally blocks need to be run as part of disposing of iterators, we need async disposal. It's also just generally useful any time cleaning up of resources might take any period of time, e.g. closing files (requiring flushes), deregistering callbacks and providing a way to know when deregistration has completed, etc.

The following interface is added to the core .NET libraries (e.g. System.Private.CoreLib / System.Runtime):

namespace System
{
    public interface IAsyncDisposable
    {
        ValueTask DisposeAsync();
    }
}

As with Dispose, invoking DisposeAsync multiple times is acceptable, and subsequent invocations after the first should be treated as no-ops, returning a synchronously completed successful task (DisposeAsync need not be thread-safe, though, and need not support concurrent invocation). Further, types may implement both IDisposable and IAsyncDisposable, and if they do, it's similarly acceptable to invoke Dispose and then DisposeAsync or vice versa, but only the first should be meaningful and subsequent invocations of either should be a nop. As such, if a type does implement both, consumers are encouraged to call once and only once the more relevant method based on the context, Dispose in synchronous contexts and DisposeAsync in asynchronous ones.

(How IAsyncDisposable interacts with using is a separate discussion. And coverage of how it interacts with foreach is handled later in this proposal.)

Alternatives considered:

IAsyncEnumerable / IAsyncEnumerator

Two interfaces are added to the core .NET libraries:

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        ValueTask<bool> MoveNextAsync();
        T Current { get; }
    }
}

Typical consumption (without additional language features) would look like:

IAsyncEnumerator<T> enumerator = enumerable.GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        Use(enumerator.Current);
    }
}
finally { await enumerator.DisposeAsync(); }

Discarded options considered:

The following subsection discuss alternatives that weren't chosen.

Viable alternative:

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator();
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        ValueTask<bool> WaitForNextAsync();
        T TryGetNext(out bool success);
    }
}

TryGetNext is used in an inner loop to consume items with a single interface call as long as they're available synchronously. When the next item can't be retrieved synchronously, it returns false, and any time it returns false, a caller must subsequently invoke WaitForNextAsync to either wait for the next item to be available or to determine that there will never be another item. Typical consumption (without additional language features) would look like:

IAsyncEnumerator<T> enumerator = enumerable.GetAsyncEnumerator();
try
{
    while (await enumerator.WaitForNextAsync())
    {
        while (true)
        {
            int item = enumerator.TryGetNext(out bool success);
            if (!success) break;
            Use(item);
        }
    }
}
finally { await enumerator.DisposeAsync(); }

The advantage of this is two-fold, one minor and one major:

However, there are non-trivial downsides, including significantly increased complexity when consuming these manually, and an increased chance of introducing bugs when using them. And while the performance benefits show up in microbenchmarks, we don't believe they'll be impactful in the vast majority of real usage. If it turns out they are, we can introduce a second set of interfaces in a light-up fashion.

Discarded options considered:

Cancellation

There are several possible approaches to supporting cancellation:

  1. IAsyncEnumerable<T>/IAsyncEnumerator<T> are cancellation-agnostic: CancellationToken doesn't appear anywhere. Cancellation is achieved by logically baking the CancellationToken into the enumerable and/or enumerator in whatever manner is appropriate, e.g. when calling an iterator, passing the CancellationToken as an argument to the iterator method and using it in the body of the iterator, as is done with any other parameter.
  2. IAsyncEnumerator<T>.GetAsyncEnumerator(CancellationToken): You pass a CancellationToken to GetAsyncEnumerator, and subsequent MoveNextAsync operations respect it however it can.
  3. IAsyncEnumerator<T>.MoveNextAsync(CancellationToken): You pass a CancellationToken to each individual MoveNextAsync call.
  4. 1 && 2: You both embed CancellationTokens into your enumerable/enumerator and pass CancellationTokens into GetAsyncEnumerator.
  5. 1 && 3: You both embed CancellationTokens into your enumerable/enumerator and pass CancellationTokens into MoveNextAsync.

From a purely theoretical perspective, (5) is the most robust, in that (a) MoveNextAsync accepting a CancellationToken enables the most fine-grained control over what's canceled, and (b) CancellationToken is just any other type that can passed as an argument into iterators, embedded in arbitrary types, etc.

However, there are multiple problems with that approach:

An earlier version of this document recommended (1), but we since switched to (4).

The two main problems with (1):

There are two main consumption scenarios:

  1. await foreach (var i in GetData(token)) ... where the consumer calls the async-iterator method,
  2. await foreach (var i in givenIAsyncEnumerable.WithCancellation(token)) ... where the consumer deals with a given IAsyncEnumerable instance.

We find that a reasonable compromise to support both scenarios in a way that is convenient for both producers and consumers of async-streams is to use a specially annotated parameter in the async-iterator method. The [EnumeratorCancellation] attribute is used for this purpose. Placing this attribute on a parameter tells the compiler that if a token is passed to the GetAsyncEnumerator method, that token should be used instead of the value originally passed for the parameter.

Consider IAsyncEnumerable<int> GetData([EnumeratorCancellation] CancellationToken token = default). The implementer of this method can simply use the parameter in the method body. The consumer can use either consumption patterns above:

  1. if you use GetData(token), then the token is saved into the async-enumerable and will be used in iteration,
  2. if you use givenIAsyncEnumerable.WithCancellation(token), then the token passed to GetAsyncEnumerator will supersede any token saved in the async-enumerable.

foreach

foreach will be augmented to support IAsyncEnumerable<T> in addition to its existing support for IEnumerable<T>. And it will support the equivalent of IAsyncEnumerable<T> as a pattern if the relevant members are exposed publicly, falling back to using the interface directly if not, in order to enable struct-based extensions that avoid allocating as well as using alternative awaitables as the return type of MoveNextAsync and DisposeAsync.

Syntax

Using the syntax:

foreach (var i in enumerable)

C# will continue to treat enumerable as a synchronous enumerable, such that even if it exposes the relevant APIs for async enumerables (exposing the pattern or implementing the interface), it will only consider the synchronous APIs.

To force foreach to instead only consider the asynchronous APIs, await is inserted as follows:

await foreach (var i in enumerable)

No syntax would be provided that would support using either the async or the sync APIs; the developer must choose based on the syntax used.

Semantics

The compile-time processing of an await foreach statement first determines the collection type, enumerator type and iteration type of the expression (very similar to https://github.com/dotnet/csharpstandard/blob/draft-v8/standard/statements.md#1395-the-foreach-statement). This determination proceeds as follows:

The above steps, if successful, unambiguously produce a collection type C, enumerator type E and iteration type T.

await foreach (V v in x) «embedded_statement»

is then expanded to:

{
    E e = ((C)(x)).GetAsyncEnumerator();
    try {
        while (await e.MoveNextAsync()) {
            V v = (V)(T)e.Current;
            «embedded_statement»
        }
    }
    finally {
        ... // Dispose e
    }
}

The body of the finally block is constructed according to the following steps:

  finally {  
      await e.DisposeAsync();  
  }  
  finally {  
      await ((System.IAsyncDisposable)e).DisposeAsync();  
  }  
finally {  
    System.IAsyncDisposable d = e as System.IAsyncDisposable;  
    if (d != null) await d.DisposeAsync();  
}  

except that if E is a value type, or a type parameter instantiated to a value type, then the conversion of e to System.IAsyncDisposable shall not cause boxing to occur.

finally {  
}  

ConfigureAwait

This pattern-based compilation will allow ConfigureAwait to be used on all of the awaits, via a ConfigureAwait extension method:

await foreach (T item in enumerable.ConfigureAwait(false))
{
   ...
}

This will be based on types we'll add to .NET as well, likely to System.Threading.Tasks.Extensions.dll:

// Approximate implementation, omitting arg validation and the like
namespace System.Threading.Tasks
{
    public static class AsyncEnumerableExtensions
    {
        public static ConfiguredAsyncEnumerable<T> ConfigureAwait<T>(this IAsyncEnumerable<T> enumerable, bool continueOnCapturedContext) =>
            new ConfiguredAsyncEnumerable<T>(enumerable, continueOnCapturedContext);

        public struct ConfiguredAsyncEnumerable<T>
        {
            private readonly IAsyncEnumerable<T> _enumerable;
            private readonly bool _continueOnCapturedContext;

            internal ConfiguredAsyncEnumerable(IAsyncEnumerable<T> enumerable, bool continueOnCapturedContext)
            {
                _enumerable = enumerable;
                _continueOnCapturedContext = continueOnCapturedContext;
            }

            public ConfiguredAsyncEnumerator<T> GetAsyncEnumerator() =>
                new ConfiguredAsyncEnumerator<T>(_enumerable.GetAsyncEnumerator(), _continueOnCapturedContext);

            public struct ConfiguredAsyncEnumerator<T>
            {
                private readonly IAsyncEnumerator<T> _enumerator;
                private readonly bool _continueOnCapturedContext;

                internal ConfiguredAsyncEnumerator(IAsyncEnumerator<T> enumerator, bool continueOnCapturedContext)
                {
                    _enumerator = enumerator;
                    _continueOnCapturedContext = continueOnCapturedContext;
                }

                public ConfiguredValueTaskAwaitable<bool> MoveNextAsync() =>
                    _enumerator.MoveNextAsync().ConfigureAwait(_continueOnCapturedContext);

                public T Current => _enumerator.Current;

                public ConfiguredValueTaskAwaitable DisposeAsync() =>
                    _enumerator.DisposeAsync().ConfigureAwait(_continueOnCapturedContext);
            }
        }
    }
}

Note that this approach will not enable ConfigureAwait to be used with pattern-based enumerables, but then again it's already the case that the ConfigureAwait is only exposed as an extension on Task/Task<T>/ValueTask/ValueTask<T> and can't be applied to arbitrary awaitable things, as it only makes sense when applied to Tasks (it controls a behavior implemented in Task's continuation support), and thus doesn't make sense when using a pattern where the awaitable things may not be tasks. Anyone returning awaitable things can provide their own custom behavior in such advanced scenarios.

(If we can come up with some way to support a scope- or assembly-level ConfigureAwait solution, then this won't be necessary.)

Async Iterators

The language / compiler will support producing IAsyncEnumerable<T>s and IAsyncEnumerator<T>s in addition to consuming them. Today the language supports writing an iterator like:

static IEnumerable<int> MyIterator()
{
    try
    {
        for (int i = 0; i < 100; i++)
        {
            Thread.Sleep(1000);
            yield return i;
        }
    }
    finally
    {
        Thread.Sleep(200);
        Console.WriteLine("finally");
    }
}

but await can't be used in the body of these iterators. We will add that support.

Syntax

The existing language support for iterators infers the iterator nature of the method based on whether it contains any yields. The same will be true for async iterators. Such async iterators will be demarcated and differentiated from synchronous iterators via adding async to the signature, and must then also have either IAsyncEnumerable<T> or IAsyncEnumerator<T> as its return type. For example, the above example could be written as an async iterator as follows:

static async IAsyncEnumerable<int> MyIterator()
{
    try
    {
        for (int i = 0; i < 100; i++)
        {
            await Task.Delay(1000);
            yield return i;
        }
    }
    finally
    {
        await Task.Delay(200);
        Console.WriteLine("finally");
    }
}

Alternatives considered:

LINQ

There are over ~200 overloads of methods on the System.Linq.Enumerable class, all of which work in terms of IEnumerable<T>; some of these accept IEnumerable<T>, some of them produce IEnumerable<T>, and many do both. Adding LINQ support for IAsyncEnumerable<T> would likely entail duplicating all of these overloads for it, for another ~200. And since IAsyncEnumerator<T> is likely to be more common as a standalone entity in the asynchronous world than IEnumerator<T> is in the synchronous world, we could potentially need another ~200 overloads that work with IAsyncEnumerator<T>. Plus, a large number of the overloads deal with predicates (e.g. Where that takes a Func<T, bool>), and it may be desirable to have IAsyncEnumerable<T>-based overloads that deal with both synchronous and asynchronous predicates (e.g. Func<T, ValueTask<bool>> in addition to Func<T, bool>). While this isn't applicable to all of the now ~400 new overloads, a rough calculation is that it'd be applicable to half, which means another ~200 overloads, for a total of ~600 new methods.

That is a staggering number of APIs, with the potential for even more when extension libraries like Interactive Extensions (Ix) are considered. But Ix already has an implementation of many of these, and there doesn't seem to be a great reason to duplicate that work; we should instead help the community improve Ix and recommend it for when developers want to use LINQ with IAsyncEnumerable<T>.

There is also the issue of query comprehension syntax. The pattern-based nature of query comprehensions would allow them to "just work" with some operators, e.g. if Ix provides the following methods:

public static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, TResult> func);
public static IAsyncEnumerable<T> Where(this IAsyncEnumerable<T> source, Func<T, bool> func);

then this C# code will "just work":

IAsyncEnumerable<int> enumerable = ...;
IAsyncEnumerable<int> result = from item in enumerable
                               where item % 2 == 0
                               select item * 2;

However, there is no query comprehension syntax that supports using await in the clauses, so if Ix added, for example:

public static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TResult>> func);

then this would "just work":

IAsyncEnumerable<string> result = from url in urls
                                  where item % 2 == 0
                                  select SomeAsyncMethod(item);

async ValueTask<int> SomeAsyncMethod(int item)
{
    await Task.Yield();
    return item * 2;
}

but there'd be no way to write it with the await inline in the select clause. As a separate effort, we could look into adding async { ... } expressions to the language, at which point we could allow them to be used in query comprehensions and the above could instead be written as:

IAsyncEnumerable<int> result = from item in enumerable
                               where item % 2 == 0
                               select async
                               {
                                   await Task.Yield();
                                   return item * 2;
                               };

or to enabling await to be used directly in expressions, such as by supporting async from. However, it's unlikely a design here would impact the rest of the feature set one way or the other, and this isn't a particularly high-value thing to invest in right now, so the proposal is to do nothing additional here right now.

Integration with other asynchronous frameworks

Integration with IObservable<T> and other asynchronous frameworks (e.g. reactive streams) would be done at the library level rather than at the language level. For example, all of the data from an IAsyncEnumerator<T> can be published to an IObserver<T> simply by await foreach'ing over the enumerator and OnNext'ing the data to the observer, so an AsObservable<T> extension method is possible. Consuming an IObservable<T> in a await foreach requires buffering the data (in case another item is pushed while the previous item is still being processing), but such a push-pull adapter can easily be implemented to enable an IObservable<T> to be pulled from with an IAsyncEnumerator<T>. Etc. Rx/Ix already provide prototypes of such implementations, and libraries like https://github.com/dotnet/corefx/tree/master/src/System.Threading.Channels provide various kinds of buffering data structures. The language need not be involved at this stage.