简化异步操作(上):使用 CCR 和 AsyncEnumerator 简化异步操作

阅读数:8544 2009 年 2 月 19 日 10:50

在以前的文章中,我曾多次强调应用程序中异步化的重要性。尤其对于 IO 密集型操作来说,异步执行对于应用程序的响应能力和伸缩性有非常关键的影响。 正确使用异步编程能够使用尽可能少的线程来执行大量的 IO 密集型操作。可惜的是,即使异步编程有避免线程阻塞等诸多好处,但是这种编程方式至今没有被大量 采用。其原因有很多,其中最主要的一点可能就是异步模型在编程上较为困难,导致许多开发人员不愿意去做。

异步,则意味着一个任务至少要被拆分为“二段式”的调用方式:一个方法用于发起异步请求,另一个方法用于异步任务完成后的回调。与传统方法的调用方 式相比,异步调用时的中间数据不能存放在线程栈上,方法之间的也不能简单地通过参数传递的方式来共享数据。此外,传统方法调用中可使用的 try…catch…finally,using 等关键字都无法跨越方法边界,因此异步编程在处理异常,保护资源等方面也需要花更大的精力才行。如果一不 小心,轻则造成资源泄露,重则使整个应用程序崩溃。

因此,无论是微软官方还是社区中都出现了一些简化异步编程方式的组件,例如微软的 CCR 和 Wintellect's .NET Power Threading Library 中的 AsyncEnumerator。但是它们都有同样的局限性,例如操作之间存在依赖,则很难让它们并行执行。对于这样的场景,我们还需 要构建额外的解决方案,使多个有依赖关系的异步操作之间的协作调用得以尽可能的简化。

传统异步操作使用方式

.NET 平台中的异步编程方式为 APM(Asynchronous Programming Model,异步编程模型)模式。它使用 BeginXxx 和 EndXxx 两个方法形成了二段式调用,并且通过回调函数(AsyncCallback)和异 步状态(IAsyncResult)进行协作完成整个异步操作。例如,如果我们要异步读取一个文件的内容,我们可能会使用这种方法:

FileStream fs = new FileStream(
 @"C:\Sample.data",
 FileMode.Open,
 FileAccess.Read,
 FileShare.Read, 8192,
 FileOptions.Asynchronous);

Byte[] data = new Byte[fs.Length];
fs.BeginRead(data, 0, data.Length, result =>
{
 Int32 bytesRead = fs.EndRead(result);
 ProcessData(data); // 处理数据
 fs.Close(); // 关闭文件流
}, null);

在这段代码中,通过调用 FileStream 对象的 BeginRead 方法来发起一个异步的读取操作,并且使用 Lambda 表达式构造一个匿名 AsyncCallback 对象。AsyncCallback 回调函数将在异步读取完成之后由框架调用,其中会执行 FileSystem 对象的 EndRead 方法来获取异步操作的结果。最后,还必须显式地关闭文件流,而不能使用 using 关键字来进行辅助,这是由于文件流在发起异步操作之前打 开,而在回调函数中才能被关闭。在上面的代码中还有一个关键,那就是利用了.NET 2.0 中的匿名函数特性形成了一个闭包。由于闭包内部也可以访问外部方法的局部变量(例如 AsyncCallback 回调函数访问了 fs 对象),在一定程 度使得“二段式”的异步调用模型共享了“栈”上的数据——其实编译器已经将需要共享的字段和匿名函数放入了一个托管堆上的辅助对象里了。

从上面的代码看来,使用一个异步操作并不如想象中的困难,但是在实际生产中几乎不会出现如此简单的场景。一个比较常见的场景是:在从一个数据源中获 取了一篇文章的所有评论之后,需要根据每条评论的用户 ID 去另一个数据源获取用户信息,再加以组合并显示给用户查看。对于上层应用(如本例中的 UI 层)来 说,这两个异步操作为一个整体,只有两个异步操作完成后,一个完整的异步操作才算结束。这种组合往往会使代码进入一种匿名函数互相嵌套的关系:

// 发起 GetComment 异步操作

Service.BeginGetComments( 1, // 对象 ID

commentAsyncResult => // GetComments 操作的回调函数

{ // GetComments 操作完成, 获得一个 IEnumerable<Comment> 对象

var comments = Service.EndGetComments(commentAsyncResult); // 发起 GetUsers 异步操作

Service.BeginGetUsers( comments.Select(c => c.UserID), // 得到用户 ID

userAsyncResult => // GetUsers 操作的回调函数

{ // GetUsers 操作完成

var users = Service.EndGetUsers(userAsyncResult); // 处理数据

ProcessData(comments, users); }, null); }, null);

根据应用的复杂程度不同,异步操作的数量会越来越多,如果一味地进行嵌套,代码的维护性将会越来越差。但是如果您为了避免嵌套而把方法拆开,那么在 分散在各处的回调函数间共享或传递数据又会成为新的问题。因此,无论是微软官方还是社区中都出现了一些组件用于简化异步编程模型的使用。其中最著名的可能 就是微软的“并行与协调运行时”和 Wintellect's .NET Power Threading Library 中的 AsyncEnumerator。

使用 CCR 简化异步编程模型的使用

并行与协调运行时(Concurrency and Coordination Runtime,CCR)是微软面向机器人平台开发的一套框架,包含在Microsoft Robotics Developer Studio 中。 Microsoft Robotics Developer Studio 2008 Expression Edition 不允许再次分发,但是可以免费用于商业和非商业的目的,您可以在这里阅读它的授权协议

CCR 虽然源于机器人平台,但是它其实是一个轻量级的基于消息传递机制的编程框架。在 CCR 所构建的“消息——端口——队列”的处理模型中,几乎不会形成任何的线程阻塞,因此可以被用于各种需要高度并发的场景中,您可以在它的官方站点 Channel 9 上获得它的各种案例。有关 CCR 的资料并不多,但是在它的用户手册中提到了基于迭代器(Iterator)的异步开发方式

C# 2.0 中引入了 yield 关键字,开发人员可以利用这个新特性轻松实现编写一个迭代器。例如如下代码便是选择出数组中所有大于 5 的元素:

static IEnumerator<int> Get(int[] array)
{
 foreach (var a in array)
 { 
 if (a > 5) yield return a;
 }
}

编译器在这里又一次大显神威,它会根据如上寥寥数行代码自动构建一个复杂的,上百行代码的 IEnumerator<int> 对象,实现 了 Reset,Current,MoveNext,Dispose 等多个成员。在调用一次迭代器的 MoveNext 的方法之后,代码将在 yield return 语句之处返回(假设在没有其他退出的情况下),直到下次 MoveNext 方法被调用时才接着之前 yield return 语句的下一行继续执行。

一般来说,我们会使用 C#的 foreach 关键字来遍历一个迭代器中的所有元素,它会自动生成对于 MoveNext/Current 成员的调用。但 是现在,yield 特性被巧妙地用在了异步编程模型上。试想,开发人员可以在迭代器中发起一个异步操作,之后立即使用 yield return 语句返回,并且通过某个机制在异步操作结束之后(例如利用异步操作的 AsyncCallback 回调函数)再次调用迭代器的 MoveNext 方法,接着刚才的逻辑继续执行。通过这种“发起操作——yield return——完成操作——发起下一次操作——yield return——完成下一次操作……”的方式,我们可以使用接近于传统的开发方式来进行异步操作。我们现在使用 CCR Iterator 的方式,将之前异步获取评论和用户的代码进行改写:

static IEnumerator<ITask> GetEnumerator()
{
 var resultPort = new Port<IAsyncResult>();

 // 发起 GetComments 异步操作

Service.BeginGetComments(1, resultPort.Post, null); // 中断,等待 GetComments 操作完成

yield return resultPort.Receive(); // GetComments 操作完成,获取结果

var comments = Service.EndGetComments((IAsyncResult)resultPort.Test()); // 发起 GetUsers 异步操作

Service.BeginGetUsers(comments.Select(c => c.UserID), resultPort.Post, null); // 中断,等待 GetUsers 操作完成

yield return resultPort.Receive(); // GetUsers 操作完成,获取结果

var users = Service.EndGetUsers((IAsyncResult)resultPort.Test()); // 处理数据

ProcessData(comments, users); }

然后,我们可以使用如下方式调用这个迭代器:

Dispatcher dispatcher = new Dispatcher();
DispatcherQueue queue = new DispatcherQueue("read queue", dispatcher);
Arbiter.Activate(queue, Arbiter.FromIteratorHandler(CreateEnumerator));

使用这样方式来执行异步操作,不仅免去层层嵌套之苦,更在于它真真正正地使用了传统的开发方式——这意味着之前所谈到的各种缺陷,例如无法使用 try…catch…finally 和 using 的问题都不复存在了。异步世界一下子美好了许多。当然,CCR 的功能远不止如此,这里只是使用它的一小部 分功能而已,感兴趣的朋友们可以去之前给出的链接中更进一步了解 CCR 的强大功能。

使用 AsyncEnumerator 简化异步模型的使用

Wintellect's .NET Power Threading Library 是由 Jeffrey Richter 开 发的一套类库,包含了许多与多线程和异步编程相关的组件,而 AsyncEnumerator 则是其中之一。AsyncEnumerator 对于异步编程模 型的支持,在原理上与 CCR 相同,但是由于它是直接面向这种异步机制的辅助,因此在功能上更加完善,使用也较为方便。例如,之前的例子可以改写为:

static IEnumerator<int> GetEnumerator(AsyncEnumerator enumerator)
{
 // 发起 GetComments 异步操作

Service.BeginGetComments(1, enumerator.End(), null); // 中断,等待 GetComments 操作完成

yield return 1; // GetComments 操作完成,获取结果

var comments = Service.EndGetComments(enumerator.DequeueAsyncResult()); // 发起 GetUsers 异步操作

Service.BeginGetUsers(comments.Select(c => c.UserID), enumerator.End(), null); // 中断,等待 GetUsers 操作完成

yield return 1; // GetUsers 操作完成,获取结果

var users = Service.EndGetUsers(enumerator.DequeueAsyncResult()); // 处理数据

ProcessData(comments, users); }

在使用时,开发人员需要构造一个 IEnumerator<int> 对象来指引 AsyncEnumerator 的调度。 AsyncEnumerator 的 End 方法会返回一个 AsyncCallback 对象,需要交给每个发起异步操作的方法,用于在一个异步操作完成时进行 通知。在 AsyncEnumerator 中会维护一个队列,某个异步操作完成后,它的 IAsyncResult 对象就会放入这个队列中,而 DequeueAsyncResult 方法便可将 IAsyncResult 对象从队列中取出。每次 yield return 的值,则表明需要等 AsyncEnumerator 中存在“多少个”未出队列的 IAsyncResult 对象才继续执行下一行代码。利用这个 特性,我们可以在 yield return 语句之前发起多个异步操作,并且使用一句 yield return 来“等待”多个异步操作完成。例如在以下的代码中,只有在所有异步操作(即所有的 GetResponse 操作)完成之后才能从 yield return 的下一条语句开始继续执行:

static IEnumerator<int> GetEnumerator(AsyncEnumerator enumerator, IEnumerable<string> urls)
{
 int count = 0;
 foreach (string url in urls)
 {
 count++;
 WebRequest request = HttpWebRequest.Create(url);
 request.BeginGetResponse(enumerator.End(), request);
 }

 yield return count;

 for (int i = 0; i < count; i++)
 {
 IAsyncResult asyncResult = enumerator.DequeueAsyncResult();
 WebRequest request = (WebRequest)asyncResult.AsyncState;
 WebResponse response = request.EndGetResponse(asyncResult);

 ProcessResponse(response);
 }
}

在构建完 IEnumerator 之后,您可以使用 AsyncEnumerator 的 Execute 方法执行整个异步操作:

AsyncEnumerator asyncEnumerator = new AsyncEnumerator();
asyncEnumerator.Execute(GetEnumerator(asyncEnumerator, ...));

不过 Execute 方法会阻塞调用线程,因此,AsyncEnumerator 也同样提供了 BeginExecute 和 EndExecute 方法组成了一个标准的 APM 模式。

如果您希望对 AsyncEnumerator 有更多了解,可以参考 Jeffrey Richter 在 MSDN Magazine 上的 Concurrent Affairs 专栏里的文章:《 Simplified APM with C#》、《 Simplified APM with the AsyncEnumerator 》以及《 More AsyncEnumerator Features 》。

CCR 或 AsyncEnumerator 的局限

有了 CCR 或 AsyncEnumerator 的支持,开发由多个异步操作组合而成的异步调用并非难事,因为现在的异步开发从编码方式上就已经与普通 的方法非常接近了。无论从逻辑控制还是资源管理,都可以使用传统的手段进行开发,异步操作似乎从来没有那么容易过。但是光靠这样的辅助并不能够在某些场景 下得到最好的解决方案。试想您在开发一个 ASP.NET 页面用于展示一篇文章,其中需要显示各种信息:

  1. 文章内容
  2. 评论信息
  3. 对评论内容进行打分的用户
  4. 打分者的收藏

由于程序架构的原因,数据需要从各个不同服务或数据源中获取(这是个很常见的情况)。因此,程序中已经准备了如下的数据读取接口:

  1. Begin/EndGetContent:根据文章 ID(Int32),获取文章内容(String)
  2. Begin/EndGetComments:根据文章 ID(Int32),获取所有评论(IEnumerable<Comment>)
  3. Begin/EndGetUsers:根据多个用户 ID(IEnumerable<int>),获取一批用户(Dictionary<int, User>)
  4. Begin/EndGetCommentRaters:根据多个评论 ID(IEnumerable<int>),获取所有打分者(IEnumerable<User>)
  5. Begin/EndGetFavorites:根据多个用户 ID(IEnumerable<int>),获取所有收藏(IEnumerable<string>)

如果使用 AsyncEnumerator 辅助开发,您可能会写出如下的代码:

private IEnumerator<int> GetSerialEnumerator(AsyncEnumerator enumerator, int articleId)
{
 // 获取文章内容

Service.BeginGetContent(articleId, enumerator.End(), null); yield return 1; this.Content = Service.EndGetContent(enumerator.DequeueAsyncResult()); // 获取评论

Service.BeginGetComments(articleId, enumerator.End(), null); yield return 1; var comments = Service.EndGetComments(enumerator.DequeueAsyncResult()); // 获取评论者信息,并结合评论绑定至控件

Service.BeginGetUsers(comments.Select(c => c.UserID), enumerator.End(), null); yield return 1; var users = Service.EndGetUsers(enumerator.DequeueAsyncResult()); this.rptComments.DataSource = from c in comments select new

{ Comment = c, User = users[c.UserID] }; this.rptComments.DataBind(); // 获取评论的打分者,并绑定至控件

Service.BeginGetCommentRaters(comments.Select(c => c.CommentID), enumerator.End(), null); yield return 1; var raters = Service.EndGetCommentRaters(enumerator.DequeueAsyncResult()); this.rptRaters.DataSource = raters; this.rptRaters.DataBind(); // 获取打分者的收藏,并绑定至控件

Service.BeginGetFavorites(raters.Select(u => u.UserID), enumerator.End(), null); yield return 1; this.rptFavorites.DataSource = Service.EndGetFavorites(enumerator.DequeueAsyncResult()); this.rptFavorites.DataBind(); }

似乎一切正常,不是吗?为了发现问题,我们做一个略为夸张的假设:“每个操作都会使用 2 秒钟实现才能完成”,并且使用示意图来表现所有操作的运行时段:

诚然,由于充分地并且合理地利用了异步操作,因此在整个执行过程中只有极小部分时间才会占用线程进行运算。但是,相信您也已经发现了问题:由于 所有操作都是串行的,因此总共需要 10 秒钟时间才能完成全部操作。这大可不必。导致所有操作串行的原因往往是它们的之间存在着的依赖关系(自然也有可能是 其他原因,例如“资源的竞争”,但是我们这里暂时不考虑这些因素),在我们的示例中最明显的依赖关系,便是一个操作的输出将作为另一个操作的输入。如果我 们将这种关系绘制成图示,那么操作之间的依赖便一目了然了:

这五个操作刚好可以分为三个阶段,其中 A 和 B,C 和 D 均可同时运行。因此,在理想情况下,五个操作的执行阶段应该如下图所示:

在资源充足的情况下,并行的性能往往优于串行,这是不争的事实,但是要做到这一点其实并不容易。CCR 或 AsyncEnumerator 的优势 在于把各异步操作使用普通编程方式串连了起来,因此我们才能使用 try…catch…finally 和 using 等关键字来简化我们的逻辑实现。如果一旦 要求并行,那用传统的编程方式则又无法实现了。如果看了之前的内容,您可能会觉得使用 AsyncEnumerator 也可以实现并行,只要“在 yield return 语句之前发起多个异步操作”不就可以了吗?其实不然,因为无论是 CCR 还是 AsyncEnumerator 都有个“硬伤”:在获取一个 IAsyncResult 对象之后,必须由开发人员来指定这个对象的归属问题,这样才能将它交由合适的 End 方法来完成一个异步操作。在“串行的异步”中 做到这点并不困难,因为 yield return 语句后获得的 IAsyncResult 对象必然属于之前发起的异步操作。如果在“并行”的情况下,则需要通过额外的机制来保持这种异步关系。

例如,在之前 WebRequest 的示例中,我们使用 asyncState 来保存 IAsyncResult 对象所对应的 WebRequest。 但是我们这么做需要一个前提:并行的操作完全相同,只是从不同的对象发起。也只有如此,才能让开发人员确定 IAsyncRequst 对象的操作方式,否则 繁琐的 if…else 无可避免。就拿上例五个异步操作来说,虽然操作 A 肯定比操作 E 要提前开始,但是我们很可能无法保证 A 比 E 要提前完成。此外,一个异步 操作可能会依赖于其他多个异步操作,因此一个异步操作完成之后,并不能说明依赖于它的异步操作已经能够开始。我们几乎可以肯定,直接使用 AsyncEnumerator 会编写出混乱而难以维护的“并行”代码。

在下一片文章中,我们将构建一个组件来解决这方面的问题,使多个有依赖关系的异步操作之间的协作调用得以尽可能的简化。

总结

对于 IO 密集型操作来说,异步执行对于应用程序的响应能力和伸缩性有非常关键的影响。正确使用异步编程能够使用尽可能少的线程来执行大量的 IO 密集型操作。可惜的是异步模型在编程上较为困难,导致许多开发人员不愿意去做。微软推出的 CCR,以及 Wintellect's .NET Power Threading Library 中的 AsyncEnumerator 都能够在一定程度上简化异步程序的开发。不过,现有的辅助还不足以面对一些复杂的场景。例如,要使多个 有依赖的异步操作尽可能的“并行”,我们还需要构建额外的解决方案。


给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。

评论

发布