遇见 PPL:C++ 的并行和异步

阅读数:13558 2012 年 7 月 25 日

并行计算正弦值

假设我们有一个数组,里面包含一组随机生成的浮点数,现在要计算每个浮点数对应的正弦值,如果你看过我的《遇见 C++ Lambda》,你可能会想到用 for_each 函数,如代码 1 所示。为了可以把数组里的浮点数替换成对应的正弦值,我们需要把 Lambda 的参数声明为引用,如果你想保留那些浮点数,可以创建一个新的数组存放计算结果。

代码 1

值得提醒的是,这里使用 begin 和 end 两个函数分别获取数组的起止位置,这是 C++ 11 的推荐写法。此前,我们使用 STL 容器的 begin 和 end 两个成员函数分别获取起止位置,但这种做法无法覆盖 C 风格数组;如今,C++ 11 通过 begin 和 end 两个函数把获取 C 风格数组和 STL 容器的起止位置的写法统一起来,不难想象,遵循新的写法可以提高代码的一致性。

STL 提供的 for_each 函数是串行执行的,如果你想充分利用多核的优势,可以考虑换用 PPL(Parallel Patterns Library)提供的 parallel_for_each 函数,整个改造过程只需三步:

  1. #include <ppl.h>
  2. using namespace concurrency;
  3. 把 for_each 改为 parallel_for_each,如代码 2 所示

代码 2

需要说明的是,如果你在 Visual C++ 2010 上使用 PPL,你需要引用 Concurrency 命名空间(首字母大写),这里引用的 concurrency 命名空间(全小写)是 Visual C++ 2012 的 PPL 为了和其他常见的全小写命名空间(如 stl)保持一致而创建的命名空间别名。

如果你不想影响那些浮点数,可以创建一个新的数组,然后通过 parallel_for 函数把计算结果对应地存到新的数组里,如代码 3 所示。这里选择 parallel_for 函数主要是为了借助索引管理两个数组的元素的对应关系,如果你要在多个数组之间周旋,比如说,你要为 A、B、C 和 D 四个集合实现对应元素的 (A + B) / (C - D) 操作,那么使用 parallel_for 函数就会非常直观。

代码 3

对于我们这里的简单需求,如果你不想自己管理元素的对应关系,可以考虑 parallel_transform 函数,如代码 4 所示。

parallel_transform 函数的前两个参数指定输入容器的起止位置,第三个参数指定输出容器的开始位置,前两个参数指向的位置之间的元素个数必须小于或等于第三个参数指向的位置和输出容器的结束位置之间的元素个数,否则将会出错。

代码 4

并行数奇数个数

《遇见 C++ Lambda》里,我们通过 for_each 函数数一下随机生成的整数里有多少个奇数,这个过程可以并行化吗?可以的,一般的做法是声明一个变量存放个数,在迭代的过程中一旦发现奇数就递增一下这个变量,由于涉及到多线程,可以通过系统提供的 InterlockedIncrement 函数确保递增操作的安全,如代码 5 所示。

代码 5

上面的代码可以得到正确结果,但存在一个问题,每次发现奇数都要调用 InterlockedIncrement 函数,如果 nums 数组里的奇数占大多数,那么调用 InterlockedIncrement 函数带来的开销可能会抵消并行带来的好处,最终导致执行效率甚至比不上串行版本。为了避免这种影响,我们可以把 volatile 变量和 InterlockedIncrement 函数的组合写法替换成 PPL 提供的 combinable 对象,如代码 6 所示。

代码 6

combinable 对象是如何协助 parallel_for_each 函数提高执行效率的呢?这个需要稍微了解一下 parallel_for_each 函数的工作方式,简单的说,它会把我们传给它的数据分成 N 块,分别交给 N 个线程并行处理,但同一块数据会在对应的线程里串行处理,这意味着处理同一块数据的代码可以直接实现同步,combinable 对象正是利用这点减少不必要的同步,从而提高 parallel_for_each 函数的执行效率。

combinable 对象会为每个线程提供一个线程局部存储(Thread-Local Storage),每个线程局部存储都会使用创建对象时提供的 Lambda 进行初始化。我们可以通过 local 成员函数访问当前线程的线程局部存储,因为 combinable 对象保证 local 成员函数返回的对象一定是当前线程的,所以我们可以放心的直接操作。当每个线程的操作都完成之后,我们就可以调用 combine 成员函数把每个线程局部存储的结果汇总起来,这个时候会产生线程之间的同步,但同步工作由 combinable 对象负责,无需我们费心,我们只需告诉它汇总的方法就行了,在我们的示例里,这个逻辑是 STL 提供的 plus 函数对象。

parallel_for_each 函数和 combinable 对象的组合写法本质上就是一个 Reduce 过程,PPL 提供了一个 parallel_reduce 函数专门处理这类需求,如代码 7 所示,它非常直接地展示了 parallel_for_each 函数和 combinable 对象隐藏起来的二段处理过程。

代码 7

第一个阶段,parallel_reduce 函数会把我们传给它的数据分成 N 块,分别交给 N 个线程并行处理,每个线程执行的代码由第四个参数指定。在我们的示例里,这个参数是一个 Lambda,parallel_reduce 函数会通过 Lambda 的参数告诉我们每块数据的起止位置,以及计算的初始值,这个初始值其实来自 parallel_reduce 函数的第三个参数,而 Lambda 的函数体则是不折不扣的串行代码。所有线程执行完毕之后就会进入第二个阶段,汇总每个线程的执行结果,汇总的方法由第五个参数指定。

parallel_reduce 函数和前面提到的 parallel_transform 函数可以组合起来实现并行 MapReduce 操作,而 STL 提供的 transform 和 accumulate 两个函数则可以组合起来实现串行 MapReduce 操作。

同时执行不同任务

假设我们现在的任务是计算一组随机整数里的所有奇数之和与第一个素数的商,一般的做法是按顺序执行以下步骤:

  1. 生成一组随机整数
  2. 计算所有奇数的和
  3. 找出第一个素数
  4. 计算最终结果

由于第二、三步是相互独立的,它们只依赖于第一步的结果,我们可以同时执行这两步提高程序的整体执行效率。那么,如何同时执行两个不同的代码呢?可以使用 parallel_invoke 函数,如代码 8 所示。

代码 8

parallel_invoke 函数最多可以接受十个参数,换句话说,它可以同时执行最多十个不同的代码,如果我们需要同时执行超过十个代码呢?这个时候我们可以考虑创建一个 Lambda 数组,然后交给 parallel_for_each/parallel_for 函数去执行,如代码 9 所示。

代码 9

这些代码都能得到正确的结果,但它们都有一个缺点——阻塞当前线程。想想看,一般需要动用并行编程的地方都是计算量比较大的,如果要等它们算好才能继续,恐怕会把用户惹毛,但是,如果不等它们算好,后面的步骤可能没法正常运作,怎么办呢?

async + continuation

我们可以通过 task 对象异步执行第一步,然后通过 continuation 把后续步骤按照既定的顺序连结起来,这样既可避免阻塞当前线程,又能确保正确的执行顺序。

首先,把各个步骤需要共享的变量挪到前面,如代码 10 所示,这些变量将被对应的步骤捕获并使用。

代码 10

然后,通过 create_task 函数创建一个 task 对象,异步执行第一步,如代码 11 所示。create_task 函数负责用我们传给它的 Lambda 创建 task 对象,这个 Lambda 可以有返回值,但不能接受任何参数,否则将会编译出错。当我们需要从外部获取输入时,可以借助闭包或者调用其他函数。

代码 11

接着,在 create_task 函数返回的 task 对象上调用 then 函数创建一个 continuation,如代码 12 所示。这个 continuation 会在前一个 task 结束之后才开始,从而确保执行第二、三步所需的数据在执行之前准备好。

代码 12

最后,在 then 函数返回的 task 对象上调用 then 函数创建一个 continuation,执行第四步,如代码 13 所示。理论上,你可以通过 then 函数创建任意数目的 continuation。值得提醒的是,在 Metro 风格的应用程序里,continuation 默认是在 UI 线程里执行的,因此可以在 continuation 里直接更新 UI 控件而不必使用 Dispatcher 对象,但是,如果你想在后台执行 continuation,你需要把 task_continuation_context::use_arbitrary 传给 then 函数的 _ContinuationContext 参数。

代码 13

如果你把这些代码组合起来放在 main 函数里执行,并且在最后放置一个 cin.get() 等待结果,那么一切都会运作正常。但是,如果你把它们放在一个 work 函数里,然后在 main 函数里调用这个 work 函数,你可能会碰到异常,大概是说我们读了不该读的地方。这是因为我们的 task 是异步执行的,执行的过程中 work 函数可能已经返回了,连带那些分配在栈上的变量也一并销毁了,如果此时访问那些变量就会出错。怎么解决这个问题?

前面曾经说过,我们传给 create_task 函数的 Lambda 可以有返回值,这个返回值将会通过参数传给后续的 continuation,我们可以通过这个机制把那些变量内化到 Lambda 里,如代码 14 所示。

代码 14

值得提醒的是,我们通过 tuple 对象把第二、三步的计算结果传给第四步,然后通过 tie 函数把 tuple 对象里的数据提取到两个变量里,这种写法类似于 F# 的“let sum_of_odds, first_prime = operands”。

另外,如果你担心在 task 之间传递 vector会带来性能问题,可以通过智能指针单独处理,如代码 15 所示。智能指针本身是一个对象,会随着 work 函数的返回而销毁,因此需要通过按值传递的方式捕获它。

代码 15

到目前为止,我们还没有任何异常处理的代码,如果其中一个 task 抛出异常怎么处理?我们可以在任务链的末端加上一个特殊的 continuation,如代码 16 所示,它的参数是一个 task 对象,任务链上的任何一个 task 抛出来的异常都会传到这里,这个异常可以通过调用 get 函数重新抛出,因此我们用一个 try…catch 语句把 get 成员函数的调用包围起来,然后处理它抛出来的异常。

代码 16

你可能会问的问题

1 使用 PPL 需要什么条件?

parellel_for、parellel_for_each 和 parallel_invoke 等函数可以在 Visual Studio 2010 上使用,使用时需要包含 ppl.h 头文件并引用 Concurrency 命名空间,而 parellel_transform 和 parallel_reduce 函数,以及和 task 相关的部分则需要 Visual Studio 2012,使用时需要分别包含 ppl.h 和 ppltask.h 头文件。

2 能否推荐一些 PPL 的参考资料?

关于本文提到的 PPL 函数和类型,可以参考 MSDN 的concurrency 类库。另外,MSDN 的Parallel Patterns Library (PPL)Parallel Programming with Microsoft Visual C++: Design Patterns for Decomposition and Coordination on Multicore Architectures也是很好的学习资料。

3 STL 是否提供 task 的替代品?

C++ 11 的 STL 提供了 std::future 类,结合 std::async 函数可以实现 task 的异步效果,如代码 17 所示,但 std::future 类目前不支持 contiuation,只能通过 get 成员函数获取结果,调用 get 成员函数的时候,如果相关代码还在执行,则会阻塞当前线程。

代码 17

4 PPL 能否在 Windows 以外的平台上使用?

PPL 目前只能在 Windows 上使用,如果你想在其他平台上进行类似的并行编程,可以考虑Intel Threading Building Blocks,它同时支持 Windows、Mac OS X 和 Linux,提供的 API 和 PPL 的类似。TBB 是开源的,Intel 为它提供商业和 GPLv2 两种许可协议。

5 能否推荐一些 TBB 的参考资料?

Intel Threading Building Blocks: Outfitting C++ for Multi-Core Processor Parallelism是一本不错的学习资料,另外,Intel 也提供了丰富的示例代码


给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(@InfoQ)或者腾讯微博(@InfoQ)关注我们,并与我们的编辑和其他读者朋友交流。