Async Streams and Data Processing Pipelines in C#
Demonstrating how to build efficient, responsive data pipelines using async streams and IAsyncEnumerable for real-time or large-scale data handling

In today’s world, applications often need to handle large amounts of data or real-time updates. Whether it’s streaming stock prices, log processing, or user-generated content, designing a responsive and efficient data pipeline is crucial. With C#’s async streams and IAsyncEnumerable, we can create a seamless flow of asynchronous data processing while maintaining excellent readability and performance.
In this post, we’ll explore how async streams simplify complex workflows, implement data processing pipelines, and handle real-world challenges.
What Are Async Streams?
Introduced in C# 8.0, async streams combine the power of asynchronous programming with enumerable collections. They use the IAsyncEnumerable<T> interface to allow iteration over a collection of asynchronous operations.
Key Benefits:
- Asynchronous Execution: Reduces blocking in high-latency operations like I/O or network calls.
- Lazy Evaluation: Data is processed only when needed, reducing memory overhead.
- Simplified Code: Eliminates the need for callbacks or event-driven models.
Here’s a quick example:
async IAsyncEnumerable<int> GenerateNumbersAsync()
{
for (int i = 1; i <= 5; i++)
{
await Task.Delay(500); // Simulate async work
yield return i;
}
}
await foreach (var number in GenerateNumbersAsync())
{
Console.WriteLine(number);
}
Output:
1
2
3
4
5
When Should You Use Async Streams?
- Processing large datasets (e.g. reading logs or file streams).
- Handling real-time data (e.g. IoT, live feeds).
- Managing workflows with long-running operations.
Building a Data Processing Pipeline
Let’s build a data pipeline for processing log files in real-time. Imagine a system generating log entries that need to be filtered, transformed, and saved to a database. Here’s how async streams simplify this workflow:
Step 1: Generate Logs (Source)
The first step is to create an asynchronous log generator.
async IAsyncEnumerable<string> GenerateLogsAsync()
{
string[] logLevels = { "INFO", "WARN", "ERROR" };
for (int i = 1; i <= 10; i++)
{
await Task.Delay(300); // Simulate log generation delay
yield return $"{DateTime.UtcNow}: {logLevels[i % 3]} Log entry {i}";
}
}
Step 2: Filter Logs (Processing)
Let’s filter out logs with the level “INFO”.
async IAsyncEnumerable<string> FilterLogsAsync(IAsyncEnumerable<string> logs)
{
await foreach (var log in logs)
{
if (!log.Contains("INFO"))
{
yield return log;
}
}
}
Step 3: Transform Logs (Processing)
Convert log entries into a structured format.
record LogEntry(DateTime Timestamp, string Level, string Message);
async IAsyncEnumerable<LogEntry> TransformLogsAsync(IAsyncEnumerable<string> logs)
{
await foreach (var log in logs)
{
var parts = log.Split(' ', 3);
yield return new LogEntry(
DateTime.Parse(parts[0]),
parts[1].TrimEnd(':'),
parts[2]
);
}
}
Step 4: Save Logs (Sink)
Finally, store the logs in a database.
async Task SaveLogsAsync(IAsyncEnumerable<LogEntry> logs)
{
await foreach (var log in logs)
{
Console.WriteLine($"Saving log to DB: {log}");
await Task.Delay(100); // Simulate DB save latency
}
}
Step 5: Assemble the Pipeline
Putting it all together:
async Task ProcessLogsAsync()
{
var logs = GenerateLogsAsync();
var filteredLogs = FilterLogsAsync(logs);
var structuredLogs = TransformLogsAsync(filteredLogs);
await SaveLogsAsync(structuredLogs);
}
await ProcessLogsAsync();Putting it all together:
Output:
Saving log to DB: LogEntry { Timestamp = 2024-11-22T12:00:00Z, Level = WARN, Message = Log entry 2 }
Saving log to DB: LogEntry { Timestamp = 2024-11-22T12:00:01Z, Level = ERROR, Message = Log entry 3 }
...
Error Handling in Async Streams
Errors can occur at any stage of the pipeline. Use try-catch blocks inside the await foreach loop or within the generator:
async IAsyncEnumerable<string> SafeGenerateLogsAsync()
{
try
{
for (int i = 1; i <= 5; i++)
{
if (i == 3) throw new Exception("Simulated error");
yield return $"Log {i}";
}
}
catch (Exception ex)
{
yield return $"Error: {ex.Message}";
}
}
Handling errors in the pipeline:
await foreach (var log in SafeGenerateLogsAsync())
{
Console.WriteLine(log);
}
Best Practices for Async Streams
- Minimize Latency: Use asynchronous operations within each step to prevent bottlenecks.
- Limit Concurrency: Use Channel or
Parallel.ForEachAsync
for controlled parallelism. - Resource Management: Dispose of resources like file handles or DB connections properly.
- Avoid Overloading: Don’t fetch or process more data than necessary.
Real-World Use Case: Streaming Stock Prices
Here’s an example of streaming and processing stock prices:
async IAsyncEnumerable<(string Symbol, decimal Price)> FetchStockPricesAsync(string[] symbols)
{
var random = new Random();
foreach (var symbol in symbols)
{
await Task.Delay(500); // Simulate data fetch
yield return (symbol, random.Next(100, 200) + random.NextDecimal());
}
}
Transform and filter the data:
async IAsyncEnumerable<string> AnalyzeStockPricesAsync(IAsyncEnumerable<(string Symbol, decimal Price)> prices)
{
await foreach (var (symbol, price) in prices)
{
if (price > 150)
{
yield return $"{symbol}: {price} is above the threshold!";
}
}
}
Conclusion
Async streams IAsyncEnumerable provide an elegant solution for building responsive and scalable data pipelines. From log processing to real-time data analysis, their potential applications are vast. By combining asynchronous programming with enumerable collections, they simplify complex workflows, reduce memory overhead, and enhance performance.