myrelaxsauna.com

Effective Utility Extensions for IAsyncEnumerable in .NET

Written on

Chapter 1: Introduction to IAsyncEnumerable

With the release of async streams in .NET Core 3, which utilize the IAsyncEnumerable interface, and the direct support in C# 8 for iterating with await foreach, .NET developers have a standardized approach for implementing asynchronous streams. This enhancement has become integral in various frameworks, including Entity Framework Core and ASP.NET Core, making async streams a common tool in our daily programming tasks.

In this article, I will discuss some prevalent scenarios I encounter while working with IAsyncEnumerable and demonstrate solutions through utility extensions.

Chapter 2: Guidelines for Creating Async Stream Extensions

When developing extensions for async streams, certain best practices should be considered.

  1. Input Validation: Instead of performing null checks directly in the async method, create a wrapper method for validation that subsequently calls the internal implementation. This keeps the stack trace cleaner and easier to troubleshoot.
  2. CancellationToken Management: Although a CancellationToken is often necessary, avoid including it in the public method signature. Instead, use the WithCancellation extension method provided by Microsoft on any IAsyncEnumerable. Additionally, by utilizing the [EnumeratorCancellation] attribute, the compiler will automatically apply the cancellation token from the WithCancellation method across your methods, minimizing confusion for developers.

For instance, when creating a simple Where extension to filter an async stream, the implementation would look like this:

public static IAsyncEnumerable<T> Where<T>(

this IAsyncEnumerable<T> source,

Func<T, bool> predicate

)

{

ArgumentNullException.ThrowIfNull(source);

ArgumentNullException.ThrowIfNull(predicate);

return Core(source, predicate);

static async IAsyncEnumerable<T> Core(

IAsyncEnumerable<T> source,

Func<T, bool> predicate,

[EnumeratorCancellation] CancellationToken ct = default

)

{

await foreach (var item in source.WithCancellation(ct))

{

if (predicate(item))

yield return item;

}

}

}

The validations occur outside the core implementation, adhering to the guidelines regarding CancellationToken.

Chapter 3: Common Utility Extensions for Async Streams

Section 3.1: Implementing Timeout Between Fetches

When fetching data from an IAsyncEnumerable, it can be uncertain how many items will be received or the time it will take. To prevent indefinite waiting, we can enforce a maximum timeout between item fetches. A typical scenario is when integrating with Large Language Models (LLMs) APIs that utilize Server-Sent Events (SSE) to stream responses to users in real-time. If a token response takes too long, users might perceive the application as unresponsive.

Here’s a code snippet that links a CancellationTokenSource to the original CancellationToken, setting an internal timer to cancel after a specified duration:

public static IAsyncEnumerable<T> Timeout<T>(

this IAsyncEnumerable<T> source,

int millisecondsTimeout

)

{

ArgumentNullException.ThrowIfNull(source);

ArgumentOutOfRangeException.ThrowIfLessThan(millisecondsTimeout, -1);

return Core(source, millisecondsTimeout);

static async IAsyncEnumerable<T> Core(

IAsyncEnumerable<T> source,

int millisecondsTimeout,

[EnumeratorCancellation] CancellationToken ct = default

)

{

using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);

cts.CancelAfter(millisecondsTimeout);

await foreach (var element in source.WithCancellation(ct))

{

cts.CancelAfter(-1); // Pause the timer while processing the item

yield return element;

cts.CancelAfter(millisecondsTimeout); // Restart the timer before the next item

}

}

}

This approach is efficient and straightforward, allowing for intuitive usage similar to standard LINQ methods.

Section 3.2: Batching with Maximum Waiting Time

To optimize database performance when saving data from an IAsyncEnumerable, batching can be advantageous. Instead of performing 500 individual database operations for 500 items, batching them into groups of 10 can significantly reduce database load. Below is an example of converting the MoreLINQ Batch method into an IAsyncEnumerable implementation:

public static IAsyncEnumerable<T[]> Batch<T>(

this IAsyncEnumerable<T> source,

int size

)

{

ArgumentNullException.ThrowIfNull(source);

ArgumentOutOfRangeException.ThrowIfNegativeOrZero(size);

return Core(source, size);

static async IAsyncEnumerable<T[]> Core(

IAsyncEnumerable<T> source,

int size,

[EnumeratorCancellation] CancellationToken ct = default

)

{

T[] batch = null;

var count = 0;

await foreach (var item in source.WithCancellation(ct))

{

batch ??= new T[size];

batch[count++] = item;

if (count == size)

{

yield return batch;

batch = null;

}

}

if (count > 0)

{

Array.Resize(ref batch, count);

yield return batch;

}

}

}

Section 3.3: Throttling

To prevent overwhelming the application during high item throughput from an IAsyncEnumerable, throttling can be employed. This is effectively combined with the timeout batching method to maintain a consistent data flow.

The implementation is straightforward: calculate the timeout for each item, and if necessary, introduce a delay before processing the next one.

public static IAsyncEnumerable<T> Throttling<T>(

this IAsyncEnumerable<T> source,

int millisecondsDelay

)

{

ArgumentNullException.ThrowIfNull(source);

ArgumentOutOfRangeException.ThrowIfNegative(millisecondsDelay);

return Core(source, millisecondsDelay);

static async IAsyncEnumerable<T> Core(

IAsyncEnumerable<T> source,

int millisecondsDelay,

[EnumeratorCancellation] CancellationToken ct = default

)

{

var timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsDelay);

await foreach (var item in source.WithCancellation(ct))

{

if (DateTime.UtcNow < timeoutOn)

await Task.Delay(timeoutOn - DateTime.UtcNow, ct);

yield return item;

timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsDelay);

}

}

}

Section 3.4: Conclusion

This article covered several essential utility methods for working with IAsyncEnumerable, including timeout handling, batching with maximum wait times, and throttling to manage application load. Feel free to adapt these examples to suit your specific needs or use them as inspiration for crafting your utility methods.

Here is the complete code sample to integrate into your projects:

namespace System.Collections.Generic;

public static class AsyncEnumerableExtensions

{

// Timeout method

// Batch method

// Throttling method

}

If you found this information helpful, check out my other writings or visit my blog for more insights. Your feedback is always appreciated!

Share the page:

Twitter Facebook Reddit LinkIn

-----------------------

Recent Post:

Understanding Why Software Engineers Decide to Leave Their Jobs

Explore the factors that lead software engineers to resign, focusing on the push-pull theory and how to navigate the job market.

# Understanding Schrödinger's Cat and the Many Worlds Theory

Explore Schrödinger's Cat and the Many Worlds Interpretation of Quantum Mechanics, revealing the coexistence of realities.

Unlock Your Math Potential: Are You Ready for Oxford's Challenges?

Explore if you have the aptitude for studying math at Oxford with engaging challenges and tips.