Effective Utility Extensions for IAsyncEnumerable in .NET
Written on
Chapter 1: Introduction to IAsyncEnumerable
With the release of async streams in .NET Core 3, which utilize the IAsyncEnumerable interface, and the direct support in C# 8 for iterating with await foreach, .NET developers have a standardized approach for implementing asynchronous streams. This enhancement has become integral in various frameworks, including Entity Framework Core and ASP.NET Core, making async streams a common tool in our daily programming tasks.
In this article, I will discuss some prevalent scenarios I encounter while working with IAsyncEnumerable and demonstrate solutions through utility extensions.
Chapter 2: Guidelines for Creating Async Stream Extensions
When developing extensions for async streams, certain best practices should be considered.
- Input Validation: Instead of performing null checks directly in the async method, create a wrapper method for validation that subsequently calls the internal implementation. This keeps the stack trace cleaner and easier to troubleshoot.
- CancellationToken Management: Although a CancellationToken is often necessary, avoid including it in the public method signature. Instead, use the WithCancellation extension method provided by Microsoft on any IAsyncEnumerable. Additionally, by utilizing the [EnumeratorCancellation] attribute, the compiler will automatically apply the cancellation token from the WithCancellation method across your methods, minimizing confusion for developers.
For instance, when creating a simple Where extension to filter an async stream, the implementation would look like this:
public static IAsyncEnumerable<T> Where<T>(
this IAsyncEnumerable<T> source,
Func<T, bool> predicate
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(predicate);
return Core(source, predicate);
static async IAsyncEnumerable<T> Core(
IAsyncEnumerable<T> source,
Func<T, bool> predicate,
[EnumeratorCancellation] CancellationToken ct = default
)
{
await foreach (var item in source.WithCancellation(ct))
{
if (predicate(item))
yield return item;}
}
}
The validations occur outside the core implementation, adhering to the guidelines regarding CancellationToken.
Chapter 3: Common Utility Extensions for Async Streams
Section 3.1: Implementing Timeout Between Fetches
When fetching data from an IAsyncEnumerable, it can be uncertain how many items will be received or the time it will take. To prevent indefinite waiting, we can enforce a maximum timeout between item fetches. A typical scenario is when integrating with Large Language Models (LLMs) APIs that utilize Server-Sent Events (SSE) to stream responses to users in real-time. If a token response takes too long, users might perceive the application as unresponsive.
Here’s a code snippet that links a CancellationTokenSource to the original CancellationToken, setting an internal timer to cancel after a specified duration:
public static IAsyncEnumerable<T> Timeout<T>(
this IAsyncEnumerable<T> source,
int millisecondsTimeout
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfLessThan(millisecondsTimeout, -1);
return Core(source, millisecondsTimeout);
static async IAsyncEnumerable<T> Core(
IAsyncEnumerable<T> source,
int millisecondsTimeout,
[EnumeratorCancellation] CancellationToken ct = default
)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(millisecondsTimeout);
await foreach (var element in source.WithCancellation(ct))
{
cts.CancelAfter(-1); // Pause the timer while processing the item
yield return element;
cts.CancelAfter(millisecondsTimeout); // Restart the timer before the next item
}
}
}
This approach is efficient and straightforward, allowing for intuitive usage similar to standard LINQ methods.
Section 3.2: Batching with Maximum Waiting Time
To optimize database performance when saving data from an IAsyncEnumerable, batching can be advantageous. Instead of performing 500 individual database operations for 500 items, batching them into groups of 10 can significantly reduce database load. Below is an example of converting the MoreLINQ Batch method into an IAsyncEnumerable implementation:
public static IAsyncEnumerable<T[]> Batch<T>(
this IAsyncEnumerable<T> source,
int size
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(size);
return Core(source, size);
static async IAsyncEnumerable<T[]> Core(
IAsyncEnumerable<T> source,
int size,
[EnumeratorCancellation] CancellationToken ct = default
)
{
T[] batch = null;
var count = 0;
await foreach (var item in source.WithCancellation(ct))
{
batch ??= new T[size];
batch[count++] = item;
if (count == size)
{
yield return batch;
batch = null;
}
}
if (count > 0)
{
Array.Resize(ref batch, count);
yield return batch;
}
}
}
Section 3.3: Throttling
To prevent overwhelming the application during high item throughput from an IAsyncEnumerable, throttling can be employed. This is effectively combined with the timeout batching method to maintain a consistent data flow.
The implementation is straightforward: calculate the timeout for each item, and if necessary, introduce a delay before processing the next one.
public static IAsyncEnumerable<T> Throttling<T>(
this IAsyncEnumerable<T> source,
int millisecondsDelay
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfNegative(millisecondsDelay);
return Core(source, millisecondsDelay);
static async IAsyncEnumerable<T> Core(
IAsyncEnumerable<T> source,
int millisecondsDelay,
[EnumeratorCancellation] CancellationToken ct = default
)
{
var timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsDelay);
await foreach (var item in source.WithCancellation(ct))
{
if (DateTime.UtcNow < timeoutOn)
await Task.Delay(timeoutOn - DateTime.UtcNow, ct);
yield return item;
timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsDelay);}
}
}
Section 3.4: Conclusion
This article covered several essential utility methods for working with IAsyncEnumerable, including timeout handling, batching with maximum wait times, and throttling to manage application load. Feel free to adapt these examples to suit your specific needs or use them as inspiration for crafting your utility methods.
Here is the complete code sample to integrate into your projects:
namespace System.Collections.Generic;
public static class AsyncEnumerableExtensions
{
// Timeout method
// Batch method
// Throttling method
}
If you found this information helpful, check out my other writings or visit my blog for more insights. Your feedback is always appreciated!