看完前边这么多篇,上一篇总算看到一些比较有意思的东西了,但是,显然这并不能满足我们对于 Kombu 的期待,因为上篇说到,获取 queues 里面消息的时候居然是通过轮训每一个 queue 来实现的,这得多挫啊,难道 kombu 就这么没有技术水平?事实上可能前期是这样的,但是,发展到了后面,kombu 不满足于此,于是乎就出现了一些看上去更高级一些的技术,今天就来看看多了什么实现。

eventloop

还是江湖老规矩,先上一个例子,看看效果,这是一个实现消费者的例子,因为从我们前边的解析中可以发现,生产者其实很简单,没有太多的技术含量,但是,消费者就不一样啦,它具有十足的技术难度,比如,我们之前吐槽的轮训遍历就是出自于此,今天来个不一样的 Consumer:

前边基本上差异不大,还是创建 ConnectionConsumer,不过这个 Consumer 的创建和我们之前的稍有一点区别,之前是通过 SimpleQueue 创建的,这次我们手动创建,而且,看 Line 16 可以看到,我们居然没有进行类似于 consumer.get() 这样的操作,而是写了一句:

for _ in eventloop(connection)

然后就啥也不干了,这就神奇了,这里到底发生了什么事情,我们进 eventloop 看看,文件在 kombu/common.py Line 177:

乍一看怎么滴又是一个循环,但是细细看一下,有点不一样,这里是一个 generator,每次返回的都是 conn.drain_events,我去,老熟人啦,底层那不是和我们之前看的是一样的,里面还是一个死循环啊,只不过调用的方式变了,以前我们调用的方式是:queue.get(),现在却是直接放进 eventloop 里头即可。

Hub

eventloop 的实现看上去没什么惊艳的,那我们就来看另外一种有意思的实现,首先还是先来一个 Sample:

这里和前面 eventloop 不同之处在于 Line 17,之前 eventloop 是直接将 Connection 的对象作为参数,但是这里确实将 hub 作为参数,我们看一下这里又是怎么回事:

ok,这里已经将 Connection 具体化到 Transport 了,那我们直接找 RedisTransport 看看怎么一回事:

我们先来看看 _register_BRPOP,这里做了两个判断,第一个是判断当前的 channel 是否放进了 epoll 模型里面,如果没有,那么久放进去;同时,如果之前这个 channel 不在 epoll 里面,那么这次放进去了,但是,这个 connection 还没有对 epoll 其效果,所以发送一个 _brpop_startLine 301

这里可以看到,是对所有的 queue 都发起了监听请求,也就是说任一个队列有消息过来,那么都会被响应到,那么响应给谁呢?还得回到 kombu/transport/redis.py Line 1034,然后看看 add_reader 这个函数做了啥:

其实又跳回到我们之前哪里去了,这时候我们需要关注的是 on_readable 这个函数做了啥:

这里听说 Redis 已经准备好了,所以就来获取拿到的结果,然后就解析起来了,解析成功之后,自然要处理这个消息呀,于是乎又回到了这里 redis.py

回顾

回顾一番,虽然前面一顿刺激得寻根问底,但是,我们却没有停下脚步欣赏一下路边的风景,所以这里我们慢下来看看一些应该记住的东西:

Hub 是什么

我们走了一圈,但是,却没有理清 Hub 是什么这个概念。从 Hub 的注释上,我们知道它的职责是 Event Loop,既然是这样,那我就可以猜测出它内部肯定是有 EPoll 模型 了,这样也就可以大概了解一些东西了。

看到代码中的 _create_poller_close_poller 之类的就更加确定了我的猜测,但是,这里有一点复杂的就是它兼容好几种模型:Geventkqueueepoll and select,但是我不管,我喜欢 epoll 所以我就当 epoll 看,但是,似乎更多人喜欢的是 gevent

既然是 Event Loop 那么我们就要关注他的 FD 了,所以这个 add 函数的出现就对我的走读带来很大的帮助:

但是看到这个 run_forever 的实现又让我有点不适应,因为这和我熟悉的 Epoll 模型有点不一样啊:

那我得关注一下这个 loop 是啥,并且关注一下是如何实现的了,结果看到了 eventio.py 中的 epoll:

然后再对比 create_loop,那么一切都了然于胸了

感觉一些问题就得到了解答,hub 就是一个 event loop!

Connection 注册到 Hub

既然 Hub 是一个 event loop,那么注册 connnection 也是情理之中,但是又是怎么影响到 Consumer 的呢?通过走读 Redis 的 Connection 代码可以发现,register_with_event_loop 的时候没做啥注册的工作,但是我们可以清楚:一个 Connection 对应一个 Hub,它们之间的枢纽是 MultiChannelPoller,它负责找出哪个 Channel 是可用的,但是这些 Channel 都是来自同一个 Connection。

Consumer 和 Connection 是怎么联系起来的

看到现在我们可能都还没弄懂 Consumer 是怎么和 Connection 连接起来的,其实我们可以猜测,Consumer 其实就是绑定了消息的处理函数,我们只需要将对应的 Channel 和 Consumer callback 对应起来应该就可以了。

其实从最开始的 Sample 中就可以看到,其中的枢纽就是 Queue!每一个 Consumer 初始化的时候都是和 Channel 绑定的,也就是说我们 Consumer 包含了 Queue 也就和 Connection 关联起来了!

Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel

毫无疑问,Channel 又会甩锅给 connection,然后都到了 connection 了,只能叫 EPoll 上场啦。

Hub run_forever 在干什么

这个我们可以先行预测一波,既然是 run_forever,而且对外表示是 block 的状态,那么毫无疑问,我们可以猜测它是 EPoll 里面的 poll 咯,眼见为实去:

这个 loop 是啥,我们看过的咯,在 hub 里面他是 create_loop(),但是这是一个 generator,调用 next 之后并不会返回东西,我们在 Line 187 上也可以发现这里根本不接收返回值,而是让 genrator 迭代一次而已。

事实上,内部就轮训 connection,然后将可用的 connection 和 callback 对应起来,有消息来到之后就调用 callback,这样似乎一切就已经连接起来了,但是,connection 和 callback 的关系似乎我们还没理清。。。

Connection 中的 callback

从使用上可以看到,callback 最开始是在 Consumer 里面指定 on_message 传递的,那么又是如何就到了 Connection 中呢?看一下:

其实 Hub 中的 callback 只管到 fd,他不知道这个 fd 对应哪个 connection 和 channel,而这个工作是又:MultiChannelPoller 来管理的,它对应了 fd 和 Channel。

Channel 和 Connection 是什么关系