【AICon】AI 基础设施、LLM运维、大模型训练与推理,一场会议,全方位涵盖! >>> 了解详情
写点什么

C# 8 中的 Async Streams

Learn about the New Async Stream Pattern

  • 2018-09-19
  • 本文字数:10677 字

    阅读完需:约 35 分钟

关键要点

  • 异步编程技术提供了一种提高程序响应能力的方法。
  • Async/Await 模式在 C# 5 中首次亮相,但只能返回单个标量值。
  • C# 8 添加了异步流(Async Streams),允许异步方法返回多个值,从而扩展了其可用性。
  • 异步流提供了一种用于表示异步数据源的绝佳方法。
  • 异步流是 Java 和 JavaScript 中使用的反应式编程模型的替代方案。

C# 5 引入了 Async/Await,用以提高用户界面响应能力和对 Web 资源的访问能力。换句话说,异步方法用于执行不阻塞线程并返回一个标量结果的异步操作。

微软多次尝试简化异步操作,因为 Async/Await 模式易于理解,所以在开发人员当中获得了良好的认可。

现有异步方法的一个重要不足是它必须提供一个标量返回结果(一个值)。比如这个方法 async Task<int> DoAnythingAsync(),DoAnythingAsync 的结果是一个整数(一个值)。

由于存在这个限制,你不能将这个功能与 yield 关键字一起使用,并且也不能将其与 async IEnumerable<int>(返回异步枚举)一起使用。

如果可以将 Async/Await 特性与 yield 操作符一起使用,我们就可以使用非常强大的编程模型(如异步数据拉取或基于拉取的枚举,在 F#中被称为异步序列)。

C# 8 中新提出的 Async Streams 去掉了标量结果的限制,并允许异步方法返回多个结果。

这个变更将使异步模式变得更加灵活,这样就可以按照延迟异步序列的方式从数据库中获取数据,或者按照异步序列的方式下载数据(这些数据在可用时以块的形式返回)。

例如:

复制代码
foreach await (var streamChunck in asyncStreams)
{
Console.WriteLine($“Received data count = {streamChunck.Count}”);
}

Reactive Extensions(Rx)是解决异步编程问题的另一种方法。Rx 越来越受到开发人员的欢迎。很多其他编程语言(如 Java 和 JavaScript)已经实现了这种技术(RxJava、RxJS)。Rx 基于推送式编程模型(Push Programming Model),也称为反应式编程。反应式编程是事件驱动编程的一种类型,它处理的是数据而不是通知。

通常,在推送式编程模型中,你不需要控制 Publisher。数据被异步推送到队列中,消费者在数据到达时消费数据。与 Rx 不同,Async Streams 可以按需被调用,并生成多个值,直到达到枚举的末尾。

在本文中,我将对拉取模型和推送模型进行比较,并演示每一种技术各自的适用场景。我将使用很多代码示例向你展示整个概念和它们的优点,最后,我将讨论 Async Streams 功能,并向你展示示例代码。

拉取式编程模型与推送式编程模型

图 -1- 拉取式编程模型与推送式编程模型

我使用的例子是著名的生产者和消费者问题,但在我们的场景中,生产者不是生成食物,而是生成数据,消费者消费的是生成的数据,如图 -1 所示。拉取模型很容易理解。消费者询问并拉取生产者的数据。另一种方法是使用推送模型。生产者将数据发布到队列中,消费者通过订阅队列来接收所需的数据。

拉取模型更合适“快生产者和慢消费者”的场景,因为消费者可以从生产者那里拉取其所需的数据,避免消费者出现溢出。推送模型更适合“慢生产者和快消费者”的场景,因为生产者可以将数据推送给消费者,避免消费者不必要的等待时间。

Rx 和 Akka Streams (流式编程模型)使用了回压技术(一种流量控制机制)。它使用拉取模型或推送模型来解决上面提到的生产者和消费者问题。

在下面的示例中,我使用了一个慢消费者从快生产者那里异步拉取数据序列。消费者在处理完一个元素后,会向生产者请求下一个元素,依此类推,直到到达序列的末尾。

动机和背景

要了解我们为什么需要 Async Streams,让我们来看下面的代码。

复制代码
// 对参数 (count) 进行循环相加操作
static int SumFromOneToCount(int count)
{
ConsoleExt.WriteLine("SumFromOneToCount called!");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
}
return sum;
}

方法调用:

复制代码
const int count = 5;
ConsoleExt.WriteLine($"Starting the application with count: {count}!");
ConsoleExt.WriteLine("Classic sum starting.");
ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");
ConsoleExt.WriteLine("Classic sum completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

我们可以通过使用 yield 运算符让这个方法变成惰性的,如下所示。

复制代码
static IEnumerable<int> SumFromOneToCountYield(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountYield called!");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
yield return sum;
}
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("Sum with yield starting.");
foreach (var i in SumFromOneToCountYield(count))
{
ConsoleExt.WriteLine($"Yield sum: {i}");
}
ConsoleExt.WriteLine("Sum with yield completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

正如你在输出窗口中看到的那样,结果被分成几个部分返回,而不是作为一个值返回。以上显示的累积结果被称为惰性枚举。但是,仍然存在一个问题,即 sum 方法阻塞了代码的执行。如果你查看线程,可以看到所有东西都在主线程中运行。

现在,让我们将 async 应用于第一个方法 SumFromOneToCount 上(没有 yield 关键字)。

复制代码
static async Task<int> SumFromOneToCountAsync(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountAsync called!");
var result = await Task.Run(() =>
{
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
}
return sum;
});
return result;
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("async example starting.");
// 相加操作是异步进行得!这样还不够,我们要求不仅是异步的,还必须是惰性的。
var result = await SumFromOneToCountAsync(count);
ConsoleExt.WriteLine("async Result: " + result);
ConsoleExt.WriteLine("async completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

我们可以看到计算过程是在另一个线程中运行,但结果仍然是作为一个值返回!

想象一下,我们可以按照命令式风格将惰性枚举(yield return)与异步方法结合起来。这种组合称为 Async Streams。这是 C# 8 中新提出的功能。这个新功能为我们提供了一种很好的技术来解决拉取式编程模型问题,例如从网站下载数据或从文件或数据库中读取记录。

让我们尝试使用当前的 C# 版本。我将 async 关键字添加到 SumFromOneToCountYield 方法中,如下所示。

图 -2 组合使用 async 关键字和 yield 发生错误

我们试着将 async 添加到 SumFromOneToCountYield,但直接出现错误,如上所示!

让我们试试别的吧。我们可以将 IEnumerable 放入任务中并删除 yield 关键字,如下所示:

复制代码
static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!");
var collection = new Collection<int>();
var result = await Task.Run(() =>
{
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
collection.Add(sum);
}
return collection;
});
return result;
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!");
var scs = await SumFromOneToCountTaskIEnumerable(count);
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!");
foreach (var sc in scs)
{
// 这不是我们想要的,结果将作为块返回!!!!
ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}");
}
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

可以看到,我们异步计算所有的内容,但仍然存在一个问题。结果(所有结果都在集合中累积)作为一个块返回,但这不是我们想要的惰性行为,我们的目标是将惰性行为与异步计算风格相结合。

为了实现所需的行为,你需要使用外部库,如 Ix(Rx 的一部分),或者你必须使用新提出的 C#特性 Async Streams。

回到我们的代码示例。我使用了一个外部库来显示异步行为。

复制代码
static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence)
{
ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called");
await sequence.ForEachAsync(value =>
{
ConsoleExt.WriteLineAsync($"Consuming the value: {value}");
// 模拟延迟!
Task.Delay(TimeSpan.FromSeconds(1)).Wait();
});
}
static IEnumerable<int> ProduceAsyncSumSeqeunc(int count)
{
ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
// 模拟延迟!
Task.Delay(TimeSpan.FromSeconds(0.5)).Wait();
yield return sum;
}
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("Starting Async Streams Demo!");
// 启动一个新任务,用于生成异步数据序列!
IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable();
ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#");
// 启动另一个新任务,用于消费异步数据序列!
var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence));
// 出于演示目的,等待任务完成!
consumingTask.Wait();
ConsoleExt.WriteLineAsync("Async Streams Demo Done!");

输出:

最后,我们实现了我们想要的行为!我们可以在枚举上进行异步迭代。

源代码在这里

客户端 / 服务器端的异步拉取

我将使用一个更现实的例子来解释这个概念。客户端 / 服务器端架构是演示这一功能优势的绝佳方法。

客户端 / 服务器端同步调用

客户端向服务器端发送请求,客户端必须等待(客户端被阻塞),直到服务器端做出响应,如图 -3 所示。

图 -3 同步数据拉取,客户端等待请求完成

异步数据拉取

客户端发出数据请求然后继续执行其他操作。一旦有数据到达,客户端就继续处理达到的数据。

图 -4 异步数据拉取,客户端可以在请求数据时执行其他操作

异步序列数据拉取

客户端发出数据块请求,然后继续执行其他操作。一旦数据块到达,客户端就处理接收到的数据块并询问下一个数据块,依此类推,直到达到最后一个数据块为止。这正是 Async Streams 想法的来源。图 -5 显示了客户端可以在收到任何数据时执行其他操作或处理数据块。

图 -5 异步序列数据拉取(Async Streams),客户端未被阻塞!

Async Streams

与 IEnumerable<T> 和 IEnumerator<T> 类似,Async Streams 提供了两个新接口 IAsyncEnumerable<T> 和 IAsyncEnumerator<T>,定义如下:

复制代码
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator();
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
Task<bool> MoveNextAsync();
T Current { get; }
}
// Async Streams Feature 可以被异步销毁
public interface IAsyncDisposable
{
Task DiskposeAsync();
}

Jonathan Allen 已经在 InfoQ 网站上介绍过这个主题,我不想在这里再重复一遍,所以我建议你也阅读一下他的文章

关键在于 Task<bool> MoveNextAsync() 的返回值(从 bool 改为 Task<bool>,bool IEnumerator.MoveNext())。这样可以让整个计算和迭代都保持异步。大多数情况下,这仍然是拉取模型,即使它是异步的。IAsyncDisposable 接口可用于进行异步清理。有关异步的更多信息,请点击此处

语法

最终语法应如下所示:

复制代码
foreach await (var dataChunk in asyncStreams)
{
// 处理数据块或做一些其他的事情!
}

如上所示,我们现在可以按顺序计算多个值,而不只是计算单个值,同时还能够等待其他异步操作结束。

重写微软的示例

我重写了微软的演示代码,你可以从我的GitHub 下载相关代码

这个例子背后的想法是创建一个大的 MemoryStream(20000 字节的数组),并按顺序异步迭代集合中的元素或 MemoryStream。每次迭代从数组中拉取 8K 字节。

在 (1) 处,我们创建了一个大字节数组并填充了一些虚拟值。在 (2) 处,我们定义了一个叫作 checksum 的变量。我们将使用 checksum 来确保计算的总和是正确的。数组和 checksum 位于内存中,并通过一个元组返回,如 (3) 所示。

在 (4) 处,AsEnumarble(或者叫 AsAsyncEnumarble)是一种扩展方法,用于模拟由 8KB 块组成的异步流( (6) 处所示的 BufferSize = 8000)。

通常,你不必继承 IAsyncEnumerable,但在上面的示例中,微软这样做是为了简化演示,如 (5) 处所示。

(7) 处是“foreach”,它从异步内存流中拉取 8KB 的块数据。当消费者(foreach 代码块)准备好接收更多数据时,拉取过程是顺序进行的,然后它从生产者(内存流数组)中拉取更多的数据。最后,当迭代完成后,应用程序将’c’的校验和与 checksum 进行比较,如果它们匹配,就打印出“Checksums match!”,如 (8) 所示!

微软演示的输出窗口:

概要

我们已经讨论过 Async Streams,它是一种出色的异步拉取技术,可用于进行生成多个值的异步计算。

Async Streams 背后的编程概念是异步拉取模型。我们请求获取序列的下一个元素,并最终得到答复。这与 IObservable<T> 的推送模型不同,后者生成与消费者状态无关的值。Async Streams 提供了一种表示异步数据源的绝佳方法,例如,当消费者尚未准备好处理更多数据时。示例包含了 Web 应用程序或从数据库中读取记录。

我已经演示了如何生成异步枚举数据,并使用外部异步序列库来消费枚举数据。我也演示了如何将这个功能用于从 Web 站点下载内容。最后,我们看到了新的 Async Streams 语法和一个完整的示例,该示例是基于微软的 Build Demo Code( 2018 年 5 月 7 日至 9 日,西雅图,华盛顿州)。

关于作者

Bassam Alugili 是 STRATEC AG 的高级软件专家和数据库专家。STRATEC 是全球领先的全自动分析仪系统、实验室数据管理软件和智能耗材的合作伙伴。

查看英文原文 Async Streams in C# 8

2018-09-19 18:322742
用户头像

发布了 731 篇内容, 共 434.4 次阅读, 收获喜欢 1997 次。

关注

评论

发布
暂无评论
发现更多内容

mac idea配置类和方法的注释

孙强

方法 Mac IDEA 添加注释

你了解自己的业务IO么?

焱融科技

云计算 技术 分布式 高性能 存储

RVB2601 应用开发实战系列一: Helloworld 最小系统

Roy夹馍

物联网 risc-v 嵌入式开发

没项目经历,面试有点怂....

Java架构师迁哥

执行update语句,用没用到索引,区别大吗?

Simon

MySQL 索引

关于takin-data,你想知道的都在这里(一)启动命令篇

TakinTalks稳定性社区

TLS协议分析 (一) 设计目标及历史

OpenIM

RVB2601应用开发实战系列二: 跑马灯

Roy夹馍

物联网 risc-v 嵌入式开发

Tapdata 肖贝贝:实时数据引擎系列(四)-关于 Oracle 与 Oracle CDC

tapdata

oracle

iOS 屏幕实时共享功能实践(内附详细代码)

融云 RongCloud

ios 音视频

做百度AI工程师,还要会“相牛”?

百度开发者中心

AI 最佳实践 方法论

【墨天轮专访第三期】达梦数据库冯源:丢掉幻想投入战斗,国产数据库的机遇窗口已经来临!

墨天轮

数据库 国产数据库 达梦

RVB2601应用开发实战系列四:FOTA镜像升级

Roy夹馍

物联网 risc-v 嵌入式开发

TLS协议分析 (二) 架构总览

OpenIM

做百度AI工程师,还要会“相牛”?

百度大脑

人工智能

三涧溪村:乡村产业插上数字化翅膀

浪潮云

工业互联网

RVB2601应用开发实战系列三: GUI图形显示

Roy夹馍

物联网 risc-v 嵌入式开发

手撕HashMap源码

程序员阿杜

Java 源码

21年字节+美团+腾讯,大厂必问面试真题总结(Java岗)

Java架构师迁哥

关于takin-data,你想知道的都在这里(二)trace日志篇

TakinTalks稳定性社区

Redis与Memcache对比

Linux服务器开发

数据库 redis 网络编程 Linux服务器开发 Memcache

Premo测试框架详解

趣链科技

区块链 测试工具 测试发开

浅谈实时语音质量监控系统

声网

音视频

NeonIO 云原生存储简介与应用

QingStor分布式存储

云原生 分布式存储

带你彻底认识Paxos算法、Zab协议和Raft协议的原理和本质

Java 架构 面试 分布式 计算机

逐梦航天—数字孪生技术仿真火箭发射!

ThingJS数字孪生引擎

大前端 物联网 可视化 航天 数字孪生

学生管理系统详细架构设计

Nullrable

网络攻防学习笔记 Day128

穿过生命散发芬芳

开发安全 9月日更

后疫情时代新机遇,运营商如何把握智能家居市场?

鲸品堂

智能家居 运营商 智能家居商业模式

tomcat启动失败常见错误

hasWhere

LeetCode刷题283-简单-移动零

ベ布小禅

9月日更

C# 8中的Async Streams_.NET_Bassam Alugili_InfoQ精选文章