在 Celery 中,除了远程控制之外,还有一个元素可以让我们对分布式中的任务的状态有所掌控,而且从实际意义上来说,这个元素对 Celery 更为重要,这就是在本文中将要说到的 Event。

在 Celery 中,注册了很多的 Event,这些 Event 将会在 Task/Worker 的状态发生变化的时候被发出,然后被绑定的 Event 消费者(Receiver)所接受,绑定的 Event 消费者可以是一连串的回调函数,相信细心的同学在前面的源码解析过程中也有发现一些关于 event 的蛛丝马迹,但是,我都是忽略了先,下面就正式得给大家介绍 Event。

Event 有什么用

前面说了,Celery 在 Task/Worker 的状态发生变化的时候就会发出 Event,所以,一个很明显的应用就是监控 Event 的状态,例如 Celery 大家所熟知的基于 WebUI 的管理工具 flower 就用到了 Event,但是,这也是一个比较明显的应用,除此之外,我们还可以利用 Event 来给 Task 做快照,甚至实时对 Task 的状态转变做出响应,例如任务失败之后触发报警,任务成功之后执行被依赖的任务等等,总结一下,其实就是:

  1. 对 Task 的状态做快照
  2. 对 Task 的状态做实时处理
  3. 监控 Celery(Worker/Task) 的执行状态

Event 的实现

了解完 Event 的功能之后,我们这里直接跳过了 Event 的使用实例,因为这个可以不用实例,而是我们根据前面的介绍,然后我们就明白了需要了解一下:

  1. Event 是如何产生的
  2. Event 的传递机制是如何实现的
  3. Event 的处理机制如何

我也将遵循这几个问题的顺序对 Celery 的实现进行一个总结。

Event 是如何产生的

现在我们已经知道了 Event 是 Task/Worker 产生的,所以出处必然在这些实现中,这就毫无难度了。不妨,我们就从最简单的地方出发,看看 Worker 的 Event 是如何产生的,据我所知,现在 Worker 拥有的 Event 有三个:

对于 worker-online 那么应该就是在 Worker 的启动过程中,所以我们还是回到第一篇文章中的介绍,看看里面有什么可以参考的。如果你回去看了的话,肯定会发现 Consumer 这个 Blueprint 里面有一个叫做 EventBootstep,这里很可疑,不妨去看看:

well,这里看上去没啥有意思的,但是,看 Line 26 我们可以肯定的一点是 Event 是否可用还会取决于我们是否允许 gossip,这个是啥我们还不知道,但是无妨,先继续看下去,这里还有一个东西值得我们关注,那就是 event_dispatcher,但是这里还没啥可看的,毕竟是 None 嘛。

我们只是看到了冰山一角,继续看看 start 又在干嘛:

这里第一句上来就是 close,有点蒙蔽啊,啥都不知道你就先上来 close 了,是不是很被动,没关系,我告诉你这里是干啥的,这里就是清除 Celery 之前的 event_dispatcher,然后将之前的 event_dispatcher 返回回来,返回来干啥?在 Line 47 会根据之前的配置设置新的 event_dispatcher 啊,至于你先知道 event_dispatcher 是啥,看 Line 36 就知道啦,可以看到这就是一个 Dispatcher 的对象,所以我们需要关注一下这个对象了。

但是由于 Dispatcher 这个类太复杂,我就不一一摊开讲了,不妨看看我们需要面对的几个方法,第一个是 extend_buffer,看看:

这里的 _outbound_buffer 是一个 deque,所以我们可以知道其实就是将旧的 event 继承过来,替别人背一下锅。继续看看 flush 在干些啥:

哟,这个稍微复杂点了,但是无妨,还是要看看,Line 204 这里只是简单得将 deque 转换为 list,然后 Line 207 、208 这里有点意思啦,这里就是发送 Event 了!!!难道我们已经完成任务了?已经发现了如何产生消息了?但是,马上我们在后面又发现了还有一个 groups 的东西,这里发送消息又不一样?不管了,先来看看 _publish 干啥:

看一下 _publish 的代码,感觉没了意思,又是使用 AMQP,Celery 这是讲 MQ 贯彻到底啊!那似乎没办法了,这里就算完了,但是,我们的事情却还没完,因为这里都是针对的旧的任务,我们希望看到的 worker-online 还没看到呢,但是 Bootstep 的工作却是完成了,似乎这里线索就断了。

但是,同样细心的同学可能会记得,我们之前曾经说过一个叫做 HeartBootstep,它的职责是干啥来着?如果忘记了,不妨回到第一篇回顾一下,记得的话,我们进代码看看,哈哈哈

nice,你会发现,这个 Bootstep 是依赖于 Events 的,同时在 Line 29 中给你会发现就用到了我们刚刚初始化的 event_dispatcher,然后就调用 start 了,所以不妨一起看看:

嘿嘿,看到没,这里就是 worker-online 的发生地了,而且我们还顺便捕捉到了 worker-heartbeat 这个 Event,so lucky,但是有个地方我们不明的,那就是这个 _send 干了什么,如果不出意料的话,应该是调用的 dispatcher._publish,走,看看去:

好,并没有按照我的套路来,调用的居然是 event_dispatchersend,那么它和 _publish 有什么区别呢?不妨一起看一看:

这里和 _publish 的唯一不一样的地方就是做了缓存处理,就是在 Line 185 这里,如果需要缓存,那么缓存一波,在 Line 192 这里如果缓存满了,那么就发送呗。有一点值得注意的就是在 Line 198,这里调用的是 publish 而不是 _publish,搞那么多事,那么这里有有什么不一样?

好呗,从这里看似乎除了对 clock 进行一个操作之外,没有其他特殊之处,那么这个 clock 又是什么,起到什么作用呢?略微查找就知道了,这又是 Kombu 的东西,然后看解释就知道了这是一个 Counter,可以用来给 Consumer 判定是否接受这个 Event 用的,所以我们可以先 pass。所以,总得来说,我们可以发现,这里已经对 Event 的产生有了一定了解了,这里可以产生的一个比较明显的问题点就是:Celery 中 Event 的 send、publish 和 _publish 的区别是啥?

消息的传递机制

在跟踪 Event 的产生的过程中我们已经顺便将 Event 的发送给看了,其实还是 Kombu 的 AMQP 在作用,然后通过 Connection 发送到对应的 MQ 中,再后面就被 Consumer 接收,全链路就是这样:

Event<Producer> ------> MQ ---------> Event<Consumer>(Receiver)

前半部分我们已经清楚了,但是后半部分还不清楚,所以我们的重点就是看看后半部分具体是怎么做的。但是后半部分要从何处入手这是个问题,我这里省去了查找的过程,直接说一下入口吧,位置就在 celery/bin/events.py,对于任一一种 Events,我们需要关注的是 run_evtop 这个函数,所以先来看看:

这里很简短,继续跟下去看看咯:

这里有点意思了,但是还是可以比较简单得看到 Line 529 是关键所在:

看到这里我们就该偷笑了,看到 while 1 就意味差不多到最后了,哈哈,Line 508 使用的是 read 的 Connection,然后 Line 512 创建了一个 Receiver,在 Line 515 进行 capture,所以我们可以断定,我们想找的就在这两句里面了,直接看 Line 515 吧:

这里有点意思的就是又是遇到 Kombu 的锅:

class kombu.mixins.ConsumerMixin[source] Convenience mixin for implementing consumer programs.

It can be used outside of threads, with threads, or greenthreads (eventlet/gevent) too.

The basic class would need a connection attribute which must be a Connection instance, and define a get_consumers() method that returns a list of kombu.Consumer instances to use. Supporting multiple consumers is important so that multiple channels can be used for different QoS requirements.

这里其实是有多个 EventReceiver 绑定了这个 Connection,然后 ConsumerMixin 帮助协调这些 Receiver,每个 Receiver 都可以收到这些 Event,但是能不能处理就看他们的 routing_key 设置得好不好了。

Event 的处理机制

看完 Event 的接收机制我们知道了 Event 是以 AMQP 的形式接收的,那么毫无以为,处理逻辑应该在回调机制上回调的,所以关键还是在于 Line 512 中的 handlers,我们来看着:

Receiver 中的 process 我们发现了他是这么用 handlers 的,那么没问题,state.event 才是最后的关键,state 中间做了两层的封装,到最后就成了 _create_dispatcher,但是同样得,这个函数也是比较复杂,所以我这里对他进行精简一下,概括起来是这样的:

  1. 先找 group 的 handler,有的话就用这个了,否则看下面;这个默认是没东西的,所以可以先pass
  2. 如果是 worker 的 Event,就执行 worker 对应的处理
  3. 如果是 task 的 Event,就执行 task 的对应处理

OK,这差不多就是 Event 的内容啦,关于 Event,后面有更精彩的应用会说到,不知道用 Celery 的同学平时对这个特性有没有使用的场景?