11 月 19 - 20 日 Apache Pulsar 社区年度盛会来啦,立即报名! 了解详情
写点什么

以 Python 为例的 Async/Await 的编程基础

  • 2019-12-09
  • 本文字数:3514 字

    阅读完需:约 12 分钟

以 Python 为例的 Async/Await 的编程基础

近年来,许多编程语言都在努力改进它们的并发原语。Go 语言有 goroutines,Ruby 有 fibers,当然,还有 Node.js 帮助普及的 async/await,这是当今使用最为广泛的并发操作类型。在本文中,我将以 python 为例讨论 async/await 的基础知识。我选择 python 语言,是因为这个功能在 python 3 中比较新,很多用户可能对它还不是很熟悉。


使用 async/await 的主要原因是通过减少 I/O 执行时的空闲时间来提高程序的吞吐量。使用这个操作符的程序通过隐式地使用一个称为事件循环的抽象来同时处理多个执行路径。在某些方面,这些事件循环类似于多线程编程,但是事件循环通常存在于单个线程中,因此,它不能同时执行多个计算。正因为如此,单独的事件循环不能提高计算密集型应用程序的性能。但是,对于进行大量网络通信的程序,比如连接到 Redis 数据库的应用程序,它可以极大地提高性能。


每次程序向 Redis 发送一个命令时,它都会等待 Redis 的响应,如果 Redis 部署在另一台机器上,就会出现网络延迟。而一个不使用事件循环的单线程应用程序在等待响应时处于空闲状态,会占用大量的 CPU 周期。需要注意的是,网络延迟是以毫秒为单位的,而 CPU 指令需要纳秒来执行,这两者相差六个数量级。


这里举个例子,下面的代码样例是用来跟踪一个游戏的获胜排行榜。每个流条目都包含获胜者的名字,我们的程序会更新一个 Redis 的有序集合(Sorted Set),这个有序集合用来作为排行榜。这里我们主要关注的是阻塞代码和非阻塞代码的性能。


import redis
# The operation to perform for each eventdef add_new_win(conn, winner): conn.zincrby('wins_counter', 1, winner) conn.incr('total_games_played')
def main(): # Connect to Redis conn = redis.Redis() # Tail the event stream last_id = '$' while True: events = conn.xread({'wins_stream': last_id}, block=0, count=10) # Process each event by calling `add_new_win` for _, e in events: winner = e['winner'] add_new_win(conn, winner) last_id = e['id']
if __name__ == '__main__':main()
复制代码


我们使用 aio-libs/aioredis 实现与上面代码有相同效果的异步版本。


aio-libs 社区正在重写许多 Python 网络库,以包括对 asyncio 的支持,asyncio 是 Python 事件循环的标准库实现。下面是上面代码的非阻塞版本:


import asyncioimport aioredis
async def add_new_win(pool, winner): await pool.zincrby('wins_counter', 1, winner) await pool.incr('total_games_played')
async def main(): # Connect to Redis pool = await aioredis.create_redis_pool('redis://localhost', encoding='utf8') # Tail the event stream last_id = '$' while True: events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10) # Process each event by calling `add_new_win` for _, e_id, e in events: winner = e['winner'] await add_new_win(pool, winner) last_id = e_id
if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
复制代码


这段代码与上面那段代码相比,除了多了一些 await 关键字之外,其他的几乎是相同的。最大的不同之处在最后两行。在 Node.js 中,环境会默认加载事件循环,而在 Python 中,必须显示地开启。


重写之后,我们可能会认为这么做就可以提高性能了。不幸的是,我们代码的非阻塞版本还没有提高性能。这里的问题在于我们编写代码的细节,而不仅仅是使用 async / await 的一般思想。


Await 使用的限制

我们重写代码后的主要问题是我们过度使用了 await。当我们在异步调用前面加上 await 时,我们做了以下两件事:


  1. 为执行做相应的调度

  2. 等待完成


有时候,这样做是对的。例如,在完成对第 15 行流的读取之前,我们不能对每个事件进行迭代。在这种情况下,await 关键字是有意义的,但是看看 add_new_win 方法:


async def add_new_win(pool, winner):    await pool.zincrby('wins_counter',  1, winner)    await pool.incr('total_games_played')
复制代码


在这个函数中,第二个操作并不依赖于第一个操作。我们可以将第二个命令与第一个命令一起发送,但是当我们发送第一个命令时,await 将阻塞执行流。我们其实更想要一种能立即执行这两个操作的方法。为此,我们需要一个不同的同步原语。


async def add_new_win(pool, winner):    task1 = pool.zincrby('wins_counter', 1, winner)    task2 = pool.incr('total_games_played')    await asyncio.gather(task1, task2)
复制代码


首先,调用一个异步函数不会执行其中的任何代码,而是会先实例化一个“任务”。根据选择的语言,这可能被称为 coroutine, promise 或 future 等等。对我们来说,任务是一个对象,它表示一个值,该值只有在使用了 await 或其他同步原语(如 asyncio.gather)之后才可用。


在 Python 的官方文档中,你可以找到更多关于 asyncio.gather 的信息。简而言之,它允许我们在同一时间执行多个任务。我们需要等待它的结果,因为一旦所有的输入任务完成,它就会创建一个新的任务。Python 的 asyncio.gather 相当于 JavaScript 的 Promise.all,C# 的 Task.WhenAll, Kotlin 的 awaitAll 等等。


改进我们的主循环代码

我们对 add_new_win 所做的事情也可以用于主流事件处理循环。这是我所指的代码:


last_id = '$'while True:    events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)    for _, e_id, e in events:        winner = e['winner']        await add_new_win(pool, winner)        last_id = e_id
复制代码


到目前为止,你会注意到我们是顺序地处理每个事件。因为在第 6 行中,使用 await 既可以执行又可以等待 add_new_win 的完成。有时这正是你希望发生的情况,因为如果你不按顺序执行,程序逻辑就会中断。在我们的例子中,我们并不真正关心排序,因为我们只是更新计数器。


last_id = '$'while True:    events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)    tasks = []    for _, e_id, e in events:        winner = e['winner']        tasks.append(add_new_win(pool, winner))        last_id = e_id    await asyncio.gather(*tasks)
复制代码


我们现在也在并发地处理每一批事件,并且对代码的改动是最小的。最后要记住,有时即使不使用 asyncio.gather,程序也可以是高性能的。特别是,当你为 web 服务器编写代码并使用像 Sanic 这样的异步框架时,该框架将以并发的方式调用你的请求处理程序,即使你在等待每个异步函数调用,也能确保巨大的吞吐量。


总结

下面是我们进行上面两个更改之后的完整代码示例:


import asyncioimport aioredis
async def add_new_win(pool, winner): # Creating tasks doesn't schedule them # so you can create multiple and then # schedule them all in one go using `gather` task1 = pool.zincrby('wins_counter', 1, winner) task2 = pool.incr('total_games_played') await asyncio.gather(task1, task2) async def main(): # Connect to Redis pool = await aioredis.create_redis_pool('redis://localhost', encoding='utf8') # Tail the event stream last_id = '$' while True: events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10) tasks = [] for _, e_id, e in events: winner = e['winner'] # Again we don't actually schedule any task, # and instead just prepare them tasks.append(add_new_win(pool, winner)) last_id = e_id # Notice the spread operator (`*tasks`), it # allows using a single list as multiple arguments # to a function call. await asyncio.gather(*tasks)
if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
复制代码


为了利用非阻塞 I/O,你需要重新考虑如何处理网络操作。值得高兴的是这并不是很困难,你只需要知道顺序性什么时候重要,什么时候不重要。尝试使用 aioredis 或等效的异步 redis 客户端,看看可以在多大程度上提高应用程序的吞吐量。


本文转载自公众号中间件小哥(ID:huawei_kevin)。


原文链接:


https://mp.weixin.qq.com/s/0E6QJDcyOPOY2Cgj3XcUtg


2019-12-09 13:48738

评论

发布
暂无评论
发现更多内容

什么是RESTful,REST api设计时应该遵守什么样的规则?

wljslmz

RESTful 6月月更

web技术分享| 【高德地图】实现自定义的轨迹回放

anyRTC开发者

前端 Web 音视频 地图 轨迹回放

企业级软件开发新模式:低代码

力软低代码开发平台

活动预约|阿里云如何搭建云服务 SRE 与可观测体系

阿里巴巴云原生

阿里云 云原生 可观测 峰会

如何为政企移动办公加上一道“安全锁”?

WorkPlus Lite

视频爆炸时代,谁在支撑视频生态网高速运行?

郑州埃文科技

flow IP地址 NetFlow

学习 | 写论文看这一篇就够了~

写程序的小王叔叔

学习笔记 论文阅读 论文写作 6月月更

洞见科技牵头的全球「首个」IEEE隐私计算「互联互通」国际标准正式启动

洞见科技

隐私计算 IEEE 互联互通

关河因果将机器学习融合逻辑规则,突破黑盒壁垒

6979阿强

数据分析 大数据分析 关河因果 关河智图 因果分析

VHEDT业务发展框架

凌晞

框架 构架

Vue-17-组件

Python研究所

6月月更

机器学习实践:基于支持向量机算法对鸢尾花进行分类

华为云开发者联盟

人工智能 模型 华为云

万字攻略,详解腾讯面试(T1-T9)核心技术点,面试题整理

C++后台开发

后台开发 面试题 Linux服务器开发 C++后台开发 腾讯面试

IntelliJ IDEA中有什么让你相见恨晚的好用插件?

Jackpop

如何做好研发效能度量及指标选取

思码逸研发效能

研发效能

[译]关于 Python 中的数字你可能不知道的 3 件事

宇宙之一粟

Python 6月月更

详解openGauss多线程架构启动过程

华为云开发者联盟

数据库 后端

Java不支持协程?那是你不知道Quasar!

码农参上

协程 Java后端

2022年中国重卡智能化升级专题研究

易观分析

智能汽车

短视频源码开发,优质的短视频源码需要做好哪几点?

开源直播系统源码

软件开发 短视频源码

网页制作存在的一些难点

源字节1号

华为云招募工业智能领域合作伙伴,强力扶持+商业变现

华为云开发者联盟

云计算 华为云 工业数据智能

如何给研发团队分钱?

菜根老谭

研发体系 绩效管理 激励体系

"不敢去怀疑代码,又不得不怀疑代码"记一次网络请求超时分析

华为云开发者联盟

前端 开发 HTTP 华为云

预约直播|机器学习PAI:AI加速计划

阿里云大数据AI技术

AI 模型开发训练

再读凤凰架构-分布式架构更清晰

小翁牌坦克

分布式 凤凰架构

DAP事实表加工汇总功能应用说明

agileai

数据分析 数据集成 数仓建设 基础事实表 汇总事实表

以 Python 为例的 Async/Await 的编程基础_文化 & 方法_中间件小哥_InfoQ精选文章