Celery 源码解析二: Worker 的执行引擎

之前在 Celery 的第一篇文章中我们已经快速得过了一遍 Celery 的 Worker 的 Bootstep,并且对他们的重要性都进行了一个评级。而本文就将继续跟着前面的步骤走,肯定是先来看看最重要的高优先级的几个 BootStep,今天就先来看看 Worker 的,因为 Worker 的职责是执行任务嘛,所以这一部分的核心应该就是任务的执行引擎了,话不多说,走起。

Hub 和 Pool

一上来我就想给大家展示的就是 Hub 和 Pool 了,相信跟着 Kombu 源码来的同学应该对这两个概念都不会陌生,事实上,当你看 Celery 的代码的时候你更会亲切感倍增:

哈哈,有没有发现,其实就是直接用的 kombu 的,是不是觉得不用看这个了。那就带你看看另外一个——Pool,这个 Bootstep 就比较厉害了,他是 worker 真正的执行引擎;这里的 Pool 之所以又做了一层封装,是因为它需要设定一个伸缩值,也就是所谓的 autoscaler,但是不用太担心,也就是设置了一个属性而已,事实上他自己并不做这个工作。这里其实还是蛮奇怪的,因为这个工作我觉得是应该交给 Autoscaler 这个 Bootstep 来实现的。

Pool 其实就做了这么些微小的工作:

然后就完了,不过也够了,因为我们已经构建了各种池,后面有 task 直接往 Pool 里头丢就行啦。

Connection 和 Task

看完 Worker Blueprint 里面比较重要的 Bootstep 之后,我们再来看看 Consumer 的,Consumer 中的 Bootstep 的任务更多是获取消息,所以你会发现和我们前面的 Kombu 耦合得非常深,但是,随着我们的深入,你会发现很有有趣的东西。。。

还是先看看 Connection 吧,看下这个实现里面有什么东西:

你会发现非常简单,因为它把事情都交给 Consumer 这个类做了,我们稍微得看一点 Consumer 的实现:

这里有一点值得注意的就是用的是:connection_for_read,也就是说还存在 connection_for_write,Celery 是将读写的连接独立分开的,好吧,Stop here,尽管记住,Connection 用的是 Kombu 的 Connection 就好啦,Line 408 和 409 是不是很熟悉,翻翻 Kombu 的文章就会发现一毛一样的。就这样,我们的连接已经建立了,那么怎么获取的消息呢?消息怎么处理的呢?再来看看下一个。

Task 承载的功能就是启动消息消费者了,它的实现看上去也是很简单,我们可以看一下:

这里的核心实现应该就是 Line 36 和 37 了,看到这里我们可能又回想起之前 Kombu 那里写的一个简单的 Consumer 的 Sample 了,你跟进去看看你就会发现这里就是那个 Consumer,好吧,我就不进去看了,这里提一下就好,那么 Task 的任务也完成啦。

消息的处理

可能看到这里,有同学就按耐不住内心的洪荒之力了,怎么感觉刚来点劲,就断了呢?消息是怎么被接收到,并且消费掉的没说呀...。别急,我们看看第一篇文章中的 Bootstep 的图,可以发现最顶部也就是最后一个启动的 Bootstep 是谁,EvLoop!

那我们就去看看它的实现:

哈哈,好简短,简短得我们都措手不及了,其实也可以看出它就是启动了一个 loop 来获取等待消息,只是说我们需要更清楚它是怎么做到的,所以还是的看看 loop 的代码咯:

这里用的就是 loops 里面的 asynloop 咯,那里面有什么秘密呢,去看看:

哈哈哈,有木有,这里已经对 consumer 进行一些设置,然后既然是作为最后一个 Bootstep,那么应该是等待消息的地方了吧,so:

这是异步的情况下的实现,哈哈,那么同步的又是怎么实现的呢:

看完你会发现,同步和异步的代码差太多了,好像也是同步的比较容易理解,你觉得呢?但是,值得说一点的就是,同步的效率肯定是不如异步的啦,毕竟消息多了同步会消化不良的。

所以呢,真正的消息处理(解析消息并执行)的逻辑应该是这个:

这里奇怪的可能就是 strategy 了,这个你大可以看这段代码就比较清晰了:

然而可能你并不知道 app.tasks 里面有啥,所以我给你准备了这个:

这是我的一个小 Demo 里面的 tasks,然后的然后,你就知道了这是什么了。其实你会发现还是交给 Task 这个类来完成,至于 Task 这个类是怎么实现的,下一篇文章会介绍,现在我们先来看下所谓的 strategy 是啥。

跟随 strategy 的实现,我们可以走到 celery/worker/strategy.py 中,但是这个继承了一贯的 Celery 尿性,很臭很长,所以我们抽取其中的核心部分来看看,也就是解析消息的那一部分,这里有好几处关键的要素,所以我们都将一一细致解析

不执行任务的情形

在 Worker 收到任务的时候,是有一定的脾气不执行任务的,例如任务过期了,执行过啦等等,代码中是这样体现的:

这里有一个逻辑是我们需要关注的,既然可以使用 req.id in revoked_tasks 就可以判断任务是否已经执行过,那么为什么还需要 req.revoked() 呢,所以值得我们看一看

可以发现原来除了正常结束会被记录在 revoked_tasks 中之外,还可能出现被终止执行失败 这些情况下也算作执行过啦,所以就忽略咯

延时执行任务

在 Celery 执行任务的时候,我们可以指定多久之后再执行任务,这和定时任务是有区别的,定时任务是定的时间点,但是延迟任务是定的时间段,例如 3 分钟后才执行,那么这个逻辑是怎么实现的:

前面的转换时间我们就不用管了,可以直接忽略,这里的 qos 我们也先不看,直接看最后,这个 call_at 名字就很有意思了,可以指定时间执行,看一下怎么做到的:

嚯,有意思哈,这里用的是 consumer 的 timer,而我们之前看过 Consumer 的代码,知道这个 timer 其实就是 Worker 的 Timer,而 Worker 的 Timer 就是 kombu 中的 kombu/async/timer.py 中的 Timer,那么 call_at 的实现就得追述到 Kombu 咯

这里代码没有很复杂,我们可以一目扫完,然后我们知道,这个任务其实还没被放进 amqp 的队列里,反而是被放进了 Timer 的定时队列里,然后等待得被执行。

限制频率执行任务

在 Celery 中,我们可以指定一个任务的执行限制,例如每秒钟最多执行 10 个相同的任务,但是,在文档中没有明确指明是单机 10 个还是集群 10 个,因为毕竟 Celery 号称自己是分布式消息系统嘛,所以我们试图通过代码来解释这些问题:

看来这里的关键就是 bucket 了,那么什么是 bucket,它的值是啥,其实这里没有太多得悬念,只是开发者把名词定义得比较高大上一些,这里其实就是一个 key/value 对,然后 key 自然就是你的 task_name 了,而 value 就是对应的 rate_limit 了,只不过 rate_limit 做了一层封装:

然后是该看看 limit_task 的执行是什么情况了,然而你会发现 Celery 又甩锅给 Kombu 了,十分有意思,所以我就说 Kombu 和 Celery 偶尔得如此之深嘛,一言不可就给 Kombu,然后 Kombu 会告诉你他是参考这个实现的:Token bucket,所以我不是很感兴趣,就不看了。

普通执行任务

最后一种执行就是普通的执行了,就是没有什么猫腻,worker 自己直接把任务吞下来,消费掉。同样得,在消费任务之前,worker 会先设置一番,比如说声明一下已经接收到任务了,然后在调用一下各个注册在 worker 里面的回调函数,然后才真正得执行任务。

我们想找到任务最终是在哪里执行的,结果找到最后你会发现又转回到 Worker 这个 Blueprint 中了,然后就成了

这里其实已经很明显了,但是,我们不怕麻烦得还是再看一点点:

预料之中咯,直接收工!

· EOF ·

作者: 行者酱油君
个人主页: 利强的博客
微信公众账号:后台架构
扫描左边二维码(或者长按识别二维码)关注 后台架构
本文版权归作者所有,未经作者同意不得转载,本人保留一切权利。
最近爬虫很是凶猛,标注下文章的原文地址: https://liuliqiang.info/post/celery-source-analysis-worker-execute-engine/