Async Streams and Data Processing Pipelines in C#

Demonstrating how to build efficient, responsive data pipelines using async streams and IAsyncEnumerable for real-time or large-scale data handling

Md hasanuzzzaman
4 min readNov 23, 2024

In today’s world, applications often need to handle large amounts of data or real-time updates. Whether it’s streaming stock prices, log processing, or user-generated content, designing a responsive and efficient data pipeline is crucial. With C#’s async streams and IAsyncEnumerable, we can create a seamless flow of asynchronous data processing while maintaining excellent readability and performance.

In this post, we’ll explore how async streams simplify complex workflows, implement data processing pipelines, and handle real-world challenges.

What Are Async Streams?

Introduced in C# 8.0, async streams combine the power of asynchronous programming with enumerable collections. They use the IAsyncEnumerable<T> interface to allow iteration over a collection of asynchronous operations.

Key Benefits:

  1. Asynchronous Execution: Reduces blocking in high-latency operations like I/O or network calls.
  2. Lazy Evaluation: Data is processed only when needed, reducing memory overhead.
  3. Simplified Code: Eliminates the need for callbacks or event-driven models.

Here’s a quick example:

async IAsyncEnumerable<int> GenerateNumbersAsync()
{
for (int i = 1; i <= 5; i++)
{
await Task.Delay(500); // Simulate async work
yield return i;
}
}

await foreach (var number in GenerateNumbersAsync())
{
Console.WriteLine(number);
}

Output:

1
2
3
4
5

When Should You Use Async Streams?

  • Processing large datasets (e.g. reading logs or file streams).
  • Handling real-time data (e.g. IoT, live feeds).
  • Managing workflows with long-running operations.

Building a Data Processing Pipeline

Let’s build a data pipeline for processing log files in real-time. Imagine a system generating log entries that need to be filtered, transformed, and saved to a database. Here’s how async streams simplify this workflow:

Step 1: Generate Logs (Source)
The first step is to create an asynchronous log generator.

async IAsyncEnumerable<string> GenerateLogsAsync()
{
string[] logLevels = { "INFO", "WARN", "ERROR" };
for (int i = 1; i <= 10; i++)
{
await Task.Delay(300); // Simulate log generation delay
yield return $"{DateTime.UtcNow}: {logLevels[i % 3]} Log entry {i}";
}
}

Step 2: Filter Logs (Processing)
Let’s filter out logs with the level “INFO”.

async IAsyncEnumerable<string> FilterLogsAsync(IAsyncEnumerable<string> logs)
{
await foreach (var log in logs)
{
if (!log.Contains("INFO"))
{
yield return log;
}
}
}

Step 3: Transform Logs (Processing)
Convert log entries into a structured format.

record LogEntry(DateTime Timestamp, string Level, string Message);

async IAsyncEnumerable<LogEntry> TransformLogsAsync(IAsyncEnumerable<string> logs)
{
await foreach (var log in logs)
{
var parts = log.Split(' ', 3);
yield return new LogEntry(
DateTime.Parse(parts[0]),
parts[1].TrimEnd(':'),
parts[2]
);
}
}

Step 4: Save Logs (Sink)
Finally, store the logs in a database.

async Task SaveLogsAsync(IAsyncEnumerable<LogEntry> logs)
{
await foreach (var log in logs)
{
Console.WriteLine($"Saving log to DB: {log}");
await Task.Delay(100); // Simulate DB save latency
}
}

Step 5: Assemble the Pipeline
Putting it all together:

async Task ProcessLogsAsync()
{
var logs = GenerateLogsAsync();
var filteredLogs = FilterLogsAsync(logs);
var structuredLogs = TransformLogsAsync(filteredLogs);
await SaveLogsAsync(structuredLogs);
}

await ProcessLogsAsync();Putting it all together:

Output:

Saving log to DB: LogEntry { Timestamp = 2024-11-22T12:00:00Z, Level = WARN, Message = Log entry 2 }
Saving log to DB: LogEntry { Timestamp = 2024-11-22T12:00:01Z, Level = ERROR, Message = Log entry 3 }
...

Error Handling in Async Streams
Errors can occur at any stage of the pipeline. Use try-catch blocks inside the await foreach loop or within the generator:

async IAsyncEnumerable<string> SafeGenerateLogsAsync()
{
try
{
for (int i = 1; i <= 5; i++)
{
if (i == 3) throw new Exception("Simulated error");
yield return $"Log {i}";
}
}
catch (Exception ex)
{
yield return $"Error: {ex.Message}";
}
}

Handling errors in the pipeline:

await foreach (var log in SafeGenerateLogsAsync())
{
Console.WriteLine(log);
}

Best Practices for Async Streams

  1. Minimize Latency: Use asynchronous operations within each step to prevent bottlenecks.
  2. Limit Concurrency: Use Channel or Parallel.ForEachAsync for controlled parallelism.
  3. Resource Management: Dispose of resources like file handles or DB connections properly.
  4. Avoid Overloading: Don’t fetch or process more data than necessary.

Real-World Use Case: Streaming Stock Prices

Here’s an example of streaming and processing stock prices:

async IAsyncEnumerable<(string Symbol, decimal Price)> FetchStockPricesAsync(string[] symbols)
{
var random = new Random();
foreach (var symbol in symbols)
{
await Task.Delay(500); // Simulate data fetch
yield return (symbol, random.Next(100, 200) + random.NextDecimal());
}
}

Transform and filter the data:

async IAsyncEnumerable<string> AnalyzeStockPricesAsync(IAsyncEnumerable<(string Symbol, decimal Price)> prices)
{
await foreach (var (symbol, price) in prices)
{
if (price > 150)
{
yield return $"{symbol}: {price} is above the threshold!";
}
}
}

Conclusion

Async streams IAsyncEnumerable provide an elegant solution for building responsive and scalable data pipelines. From log processing to real-time data analysis, their potential applications are vast. By combining asynchronous programming with enumerable collections, they simplify complex workflows, reduce memory overhead, and enhance performance.

Sign up to discover human stories that deepen your understanding of the world.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Md hasanuzzzaman
Md hasanuzzzaman

Written by Md hasanuzzzaman

Software Architect | Senior Software Engineer | Backend Developer | Tech Lead | Azure | ASP.NET | Blazor | C# | AI

No responses yet

Write a response

Recommended from Medium

Lists

See more recommendations