【AICon】AI 基础设施、LLM运维、大模型训练与推理,一场会议,全方位涵盖! >>> 了解详情
写点什么

如何在 Python 中实现异步执行?

  • 2019-10-02
  • 本文字数:5374 字

    阅读完需:约 18 分钟

如何在Python中实现异步执行?

轻量级定时任务调度库 Schedule 是一串可用性非常棒的代码。在计算机系统中,他们和操作系统拥有同样久远的历史。在真实世界中,他们也和我们的闹钟一样古老。在计算机世界里,程序员通常需要和操作员(一种职业)预约运行他们的代码。后来,程序员们认为这样的操作太过繁琐,因此就编写了一套调度算法(scheduling algorithms)来代替操作员的工作。


当计算机操作系统出现后,调度算法取代原先的计算机系统操作员将程序指令输入 CPU 内自动执行。但最近几年,随着 CPU 核心数量的增加,计算机的调配算法的复杂性也不断提升。硬件缓存层、RAM 以及硬盘为长期、中期、短期的调度算法提出了不同的需求。


在当前云计算与分布式架构的时代,在软件体系中,调配器逐渐变为一次性甚至不可用的体系结构组件。这些异步任务执行器,往往隐藏于电子邮件、登陆、通知、电子邮件的发送报告等窗口中。


CeleryRabbitMQRedisGoogle Task Queue API 以及 Amazon 的 SQS 都是分布式环境中任务调度的主要参与者。这篇文章将会介绍传统的任务队列系统、异步执行所处的位置以及上述几款调用工具的优缺点。


传统任务调度

传统的任务队列在使用数据库时通常拥有两个进程(分为生产者和消费者)。每一个任务被生产者创建后,在数据库里都有相应的状态,比如 NotStarted、Running、Completed、Failed 等等。无论什么时候,守护者任务(如一个从不停止的 Python 程序)都会不间断地对数据库进行查询,以期主动找到未完成的任务并开始运行它。尽管这种方式让队列系统变得更加简单,但依然存在一定的缺陷。


  • 在数据表上进行数据维护,意味着你的数据表单会随着任务的数量不断增长。当数据体量增长得如此之快,并且运维人员不得不处理因数据增长所带来的额外问题时,这种情况就变得非常复杂。

  • 每一个使用者都可以用任务调度来查询数据库,以获取可以运行的调度任务列表。当然,随着数据体量的增长,这样的每一次查询将会变得愈发昂贵。

Cron

Cron 是最简便的,能够设定时间执行异步任务的工具。这个工具专门用来维护一个叫做“crontab”的表。该工具以每分钟调用一次的规律来运作,会自动记录并运行在当前时间段下的每条命令。Cron 具备多种功能,如具备数据备份、临时文件清理以及提醒等。


依下图所示,这里编写了一个 Python 脚本来自动触发 Mac 的通知功能,该通知内容为每 20 分钟休息一下。


 import os
def notify(title, text): os.system(""" osascript -e 'display notification "{}" with title "{}"' """.format(text, title)) notify("Take a break", "You are sitting for too long")
复制代码


# > crontab -e*/20 * * * * python /<path_to_script>/notfication.py
复制代码


注意事项:


1、如果存在跨时区的用户怎么办?每当我们处理时间问题时,也等同于在处理跨时区的问题。Cron 被设置默认运行在系统所属的时区下,我们可以使用 TZ 环境变量来修改。但是如果我们在不同的时区下运行不同的表,则 Cron 并不适合这样的应用场景。


2、如果 Cron 挂掉了怎么办?当脚本执行 Cron 失败时,这个错误会被记录并等待 Cron 的下一次调度来运行。当然这并非处理错误最可靠的方法,我们通常会选择重启这个调度程序,直到达到了某个阈值。


3、伸缩性。如何在固定时间内处理大量的任务?如果其中的某个任务在被调用时占用了大量的内存怎么办?

代码示例——寻宝

这里将会以一个寻宝猎人程序为例去解释上文提到的一些概念。问题和背景都很简单,就是现在有 10000 个文件,其中一个文件包含一个单词宝藏,需要用这个寻宝程序从这些文件中找到宝藏。代码实例如下:


# Create treasuredef creating_treasure():      """      Creates N files with treasure randomly set in one of the files           """      treasure_in = randint(1, N)      for i in range(0, N):          logging.debug(i)          with open(f"treasure_data/file_{i}.txt", "w") as f:              if i != treasure_in:                  f.writelines(["Not a treasure\n"] * N)              else:                  f.writelines(["Treasure\n"] * N)      print (f"treasure is in {treasure_in}")  creating_treasure()
复制代码

Python 的异步执行

Python 的异步执行在 asyncio 标准库发布后变得越来越流行,尽管这与任务调度程序无关,但是了解它的作用和历史是十分重要的。

线程

Python 线程拥有一个很老的历史,尽管它提出了同时运行多条线程的想法,但实际上却无法实现。为什么?因为 CPython 中存在 GIL(Global Interpreter Lock,防止多线程并发执行机器码的全局锁)。除非你的程序有很多外部事件在等待,否则就无法显现出线程的作用。即使你的笔记本电脑拥有多个核心,由于 GIL 的存在,你会发现它们在 CPU 密集型任务上经常处于空闲状态。


使用线程实现的寻宝猎人程序,你可以通过线程的功能来查看任意范围的文件。


N = 10000   treasure_found = False  num_of_threads = 10  count = int(N/num_of_threads)
复制代码


将需要设置的线程统一以 treasure_found 为标志。


def find_treasure(start, end):      logging.debug(f"{start}, {end}")      global treasure_found      for i in range(start, end):          if treasure_found:              return          with open(f"treasure_data/file_{i}.txt", "r") as f:              if f.readlines()[0] == "Treasure\n":                  treasure_found = i                  return    start_time = time.time()  
threads = [threading.Thread(target=find_treasure, args=[i, i+count]) for i in range(0, N, count)] [thread.start() for thread in threads] [thread.join() for thread in threads] print("--- %s seconds ---" % (time.time() - start_time)) print (f"Found treasure {treasure_found}")
复制代码

异步多重处理

多重处理模块可以很好地解决现成的一些缺陷。有一点很容易理解,GIL 只适用于线程,而不适用于进程,这种特性为实现并行性提供了很好的思路。


Multiprocessing 多处理通常也适用于 CPU 的密集型任务,因为我们可以使用所有的可用内核。当设计多处理程序时,往往会出现多个进程共享同一个队列,并且每一个进程都能读取任务并用于下次的执行加载。


num_of_process = 100  
start_time = time.time() processes = [Process(target=find_treasure, args=[i, int(i+N/num_of_process)]) for i in range(0, N, int(N/num_of_process))] [process.start() for process in processes] [process.join() for process in processes] print("--- %s seconds ---" % (time.time() - start_time)) print (f"Found treasure {treasure_found}")
复制代码

Asyncio–异步 I/O

在上述两个案例中,可以看到线程和进程之间的切换是由 CPU 处理的。在某些情况下,开发者可能更加熟悉什么时候应该通过 CPU 进行上下文切换。Asyncio 让开发者决定一个应用程序在即将停止时如何保留其它的任务及进程,而不是变更进程或线程。


async def find_treasure(start, end):      global treasure_found      for i in range(start, end):          if treasure_found:               return          # Await until file is read        await read_file(i)      async def main():      tasks = [find_treasure(i, i+count)              for i in range(0, N, count)]      await asyncio.gather(              *tasks      )  
asyncio.run(main())
复制代码

Celery - Python 分布式任务队列

Celery 是一个基于分布式消息传递的异步任务调度工具,它专注于实时操作,同时也支持调度操作。


Celery 主要工作于队列和独立的职程周围。队列通常也被称为消息中间人(Broker),是一个可以异步执行任务的队列系统。通过职程(Worker)ping 向队列来执行任务。

Message Broker—消息中间人

不熟悉的开发者可能经常会混淆 Redis、Celery 以及 RabbitMQ 这三者。上文所提到的队列(Queue)并不是内置在 Celery 中的。并且,由于 Celery 使用 RabbitMQ 或 Redis 来构建队列系统,这就是为什么你会经常在文章中发现这些信息。

Worker—职程

开启一个 Celery 职程时,它会自动创建一个监督进程,会在执行池内推动一系列其它的进程执行。另外,Celery 职程可执行的任务数量取决于执行池中的进程数量。


Celery 的任务调度

与 Linux Crontab 不同的是,Celery 在默认的情况下不会安排在特定时间执行某个任务。Celery 使用自己的 Celery Beat 来支持作业调度。那么当任务执行失败时怎么样呢?当特定任务失败时,可以将 Celery 设置为重试状态直到发生特定的异常,或者通过设置 max_retries 参数,在启用之前重试 N 次。

Idempodency

假设一下,当需要将 N 项数据备份至数据库中,如果需要两个职程来执行,那么就需要调用函数来确保在数据库中不会执行两次相同的条目,职程就不会感知到运行特定任务所带来的副作用。

Redis Queue

通常情况下,Redis 只是一个内存数据库。RQ(Redis queue)是一个执行异步任务的任务调度程序,通过使用 Redis 的队列数据结构,并通过内置的职程实现,其结构与工作原理与 Celery 大致相同。


N = 10000  num_of_process = 10  count = int(N/num_of_process)    start_time = time.time()  q = Queue(connection=Redis())  results = [q.enqueue(find_treasure, i, i+count)             for i in range(0, N, count)]
复制代码

RQ vs Celery

  1. 如果有一天你醒来后决定要重构你的队列系统,虽然 RQ 仍然是建立在 Redis 的基础上,但 Celery 支持了消息代理广泛排列功能;

  2. Celery 支持子任务,RQ 不支持;

  3. RQ 通过让开发者将特定任务配置为优先级的方式来实现优先级队列的处理。Celery 是通过将任务路由到不同的服务器上来实现;

  4. RQ 只支持 Python,Celery 还支持其它语言;

  5. Celery 支持任务计划与调度;

  6. RQ 的开发者只能运行在能够实现 fork()的系统上。这意味着它无法在 Windows 系统下使用,当然如果你使用的是基于 Linux 的 Windows 子系统来运行这个 bash shell 的话,是完全可以的。

RabbitMQ

RabbitMQ 是一款基于 AMQP 协议构建的队列系统,与 Celery(只有职程)和 RQ(职程和队列都有)相比,RabbitMQ 只有一个队列系统,当然这个队列系统非常强壮。



RabbitMQ 运行在订阅发布模型上,它由多部分组成,其中有 4 个最重要的部分,分别是 Producers、Exchangers、Queues 以及 Consumers。


  • Producers,一串将任务部署在队列中的代码;

  • Exchangers,决定这个消息应该进入到哪个队列中;

  • Direct,用各自的表向队列发送信息;

  • Topic,向队列发送与特定路由模式匹配的消息;

  • Fan-out,向所有队列发布消息;

  • Queues,队列为使用人员提供了任务列表;

  • Consumers,负责执行实际任务,任何时间都可以将职程配置在特定队列上。


RabbitMQ 没有独立的职程,依赖于 Celery 的任务职程。


RabbitMQ 虽然支持多个队列,但当与 Celery 一起使用时,创建一个队列、绑定相应的键值并与一个带有标签的 Celery 交换,就会隐藏 RabbitMQ 的所有高级配置。


并且,RabbitMQ 还具备优美的文档格式、支持多种语言、消息执行后可被确认等诸多优势。同样,也存在难以追踪和 debug、错误队列往往被存储在不同的队列中、不支持优先级等问题。

Google Task Queue - 谷歌任务队列

Google 的任务队列系统提供了两种队列机制类型:


  1. Push Mode - 开发者主动将任务配置在队列中;

  2. Pull Queue - 由代码自动化将任务退出队列并执行。

Google Pub/Sub

虽然谷歌的发布订阅在运作机制上十分接近异步执行,但是这两者主要关注的是消息队列和消息流。如果在 Facebook 的群组中发送消息或内容,每一位在该群组队列内的用户都会得到警告。这也是 Google Pub/Sub 的经典用法。


Pub/Sub 都是异步的,并且都能够正确无误的执行任务,那为什么我们不能直接用任务队列来实现 Pub/Sub 的模型发布呢?针对此问题,Google 为我们提供了一个很清晰的解释。

Google Cloud 的选择:Cloud Tasks 和 Cloud Pub/Sub

Cloud Tasks 和 Cloud Pub/Sub 都可以被用来实现消息订阅和异步集成。尽管他们两者的概念十分相似,但是他们都是为不同的用例所设计出来的。点击这个页面可以看到 Google Cloud 如何在 Cloud Tasks 和 Cloud Pub/Sub 之间进行选择。

核心区别

这两者的核心区别在于隐式调用和显示调用的概念。


Cloud Pub/Sub 的发布者无法控制消息的最终交付,除非保证交付。通过这种方式,Cloud Pub/Sub 支持隐式调用,发布者通过隐式发布事件来执行订阅服务器。相反,Cloud Tasks 则是显式调用,其中发布者保留对执行的完全控制。值得注意的是,发布者可以指定每个消息所发送的端点。总之,云任务适用于任务生成器延迟控制特定的 webhook 或远程调用用例的情况。Cloud Pub/Sub(云发布/订阅)更加适用于数据摄取和分布模式的情况,通过牺牲对执行的控制来实现效果的保证。


参考文档 & 博客推荐:


  1. Asyncio in Python

  2. Python Concurrency

  3. Celery Execution Pool

  4. Celery and Celery beat Euro Python

  5. https://www.youtube.com/watch?v=ztyyn7hmcJo

  6. Celery Vs RQ

  7. https://www.fullstackpython.com/task-queues.html

  8. https://www.youtube.com/watch?v=nrzLdMWTRMM

  9. Google Task Queue vs Pub/Sub


原文链接:


https://bhavaniravi.com/blog/asynchronous-task-execution-in-python


2019-10-02 11:405997
用户头像
佘磊 策划编辑

发布了 50 篇内容, 共 20.3 次阅读, 收获喜欢 76 次。

关注

评论

发布
暂无评论
发现更多内容

60 K8S之EFK日志管理系统

穿过生命散发芬芳

k8s 28天写作 12月日更

【转】java开发之MyBatis 原理与核心组件

@零度

mybatis JAVA开发

网络安全好学吗?手把手教你学主动信息收集,网络安全基础教程

学神来啦

网络安全 信息安全 渗透测试· kali kali Linux

QCon-oCPX多目标多场景联合建模在OPPO的实践

安第斯智能云

算法

在线JSON转PHP Array工具

入门小站

工具

Spring框架基础知识(03)

海拥(haiyong.site)

28天写作 12月日更

技术“开源”对于金融业软件发展的影响

Speedoooo

安全 ios开发 APP开发 Andriod开发 小程序容器

【2021废钢铁大会】拾起卖旗下天津城矿再生资源回收有限公司三获“全国优质废钢加工配送企业”称号

InfoQ 天津

2021 优秀开源项目公布,Apache APISIX 位列其中!

API7.ai 技术团队

api 网关 Apache APISIX 优秀开源项目

HDFS源码解析:教你用HDFS客户端写数据

华为云开发者联盟

hdfs block appendChunk

尚硅谷大数据之Canal视频教程发布!

@零度

大数据

☕【权限设计系列】「认证授权专题」微服务架构的登陆认证问题

洛神灬殇

微服务架构 12月日更 权限认证机制 授权设计

架构训练营模块三作业

zhongwy

架构实战营 「架构实战营」

使用亚马逊云科技DevOps 工具构建 InnerSource 生态系统

亚马逊云科技 (Amazon Web Services)

开源 InnerSource

Kafka 消息存储与索引设计

编程江湖

kafka

固定资产管理平台系统解决方案

低代码小观

企业管理 资产管理 CRM 企业管理系统 CRM系统

填问卷抽大奖,中奖绝缘体的跨年福利快来领取!

InfoQ写作社区官方

热门活动

通过一个实际例子理解Kubernetes里pod的自动scale - 水平自动伸缩

Jerry Wang

Kubernetes k8s 28天写作 docker build 12月日更

面试官:方法重写时需要注意哪些问题?

王磊

实践解析可视化开发平台FlinkSever优势

华为云开发者联盟

flink kafka 流计算 华为FusionInsight MRS FlinkSever

大型集团企业云管平台建设参考架构

华为云开发者联盟

架构 运维 IT治理 分布式部署 ManageOne

带你认识三种kafka消息发送模式

华为云开发者联盟

kafka 时间 异步 消息发送 producer

基于磁盘量身定制,十亿规模高效向量检索方案

Zilliz

向量检索 anns 向量计算

28天写作感想

Tiger

28天写作

兄弟要盘吗?

为自己带盐

爬虫 dotnet 28天写作 12月日更

【转】大数据开发之Spark面试八股文

@零度

大数据 spark

前端React 开发中必须知道的5个技巧

@零度

前端开发 React

助车企升级,旺链科技与南方电网、联想等名企同斩获「创新案例奖」

旺链科技

区块链 产业区块链 供应链金融

平凯星辰获评 《金融电子化》2021 金融业新技术应用创新突出贡献奖

PingCAP

梦想起航

向往

盘点2021

Linux之atime,ctime,mtime的区别

入门小站

Linux

如何在Python中实现异步执行?_编程语言_Bhavani Ravi_InfoQ精选文章