• 欢迎访问 winrains 的个人网站!
  • 本网站主要从互联网整理和收集了与Java、网络安全、Linux等技术相关的文章,供学习和研究使用。如有侵权,请留言告知,谢谢!

RocketMq之Consumer原理浅析

消息队列 winrains 来源:大王叫下 1周前 (06-28) 8次浏览

Consumer是怎么启动的

源码很长,这里就不仔细看了,其实主要就是初始化了三个组件,然后启动后台定时任务

  • RebalanceImpl

均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。

  • PullAPIWrapper

拉取消息组件

  • offsetStore

消费进度组件


  • PullMessageService

从阻塞队列pullRequestQueue中获取consumer的pull请求

  • RebalanceService

负载均衡定时任务,给 Consumer 分配可消费的 MessageQueue

  • fetchNameServerAddr

定时获取 NameSever 地址

  • updateTopicRouteInfoFromNameServer

定时更新Topic路由信息

  • cleanOfflineBroker

定时清理下线Broker

  • sendHeartbeatToAllBrokerWithLock

发送心跳

  • persistAllConsumerOffset

持久化消费进度 ConsumerOffset

启动流程图如下:

ConsumerGroup是怎么分配MessageQueue的

当一个业务系统部署多台机器时,每台机器都启动了一个Consumer,并且这些Consumer都在同一个ConsumerGroup也就是消费组中,此时一个消费组中多个Consumer消费一个Topic,而一个Topic中会有多个MessageQueue。
那么就会有一个问题,比如有2个Consumer,3个MessageQueue,那么这3个MessageQueue怎么分配呢?这就涉及到Consumer的负载均衡了。
首先 Consumer 在启动时,会把自己注册给所有 Broker ,并保持心跳,让每一个 Broker 都知道消费组中有哪些 Consumer 。
然后 Consumer 在消费时,会随机链接一台 Broker ,获取消费组中的所有 Consumer 。
主要流程如下:

注意这里会对Consumer集合做一个排序,为什么要这样做呢?因为每个 consumer 都是在本地负载均衡,所以要排序,否则多个Consumer之间会有冲突。

Consumer是怎么从Broker获取消息的

消费方式

对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:

  • Pull即消费者每隔一定时间主动去 Broker 拉取消息
    • 优点消费速度、数量可控
    • 缺点如果间隔时间短,可能会拉空,并且频繁 RPC 请求增加网络开销 如果间隔时间长,则可能会有消息延迟 消费进度offset需要consumer自己来维护
  • Push即 Broker 主动实时推送消息给消费者
    • 优点消息实时,保持长链接,不会频繁建立链接
    • 缺点如果消息数量过大,消费者吞吐量小,肯能会造成消费者缓冲区溢出

Push 本质上也是基于消费者主动拉取实现的,只不过名字叫push,意思是 Broker 会尽可能实时的把消息给消费者处理。

Push消费模式流程简析

  • 后台独立线程RebalanceServic根据Topic中消息队列个数和当前消费组内消费者个数进行负载均衡,将产生的对应PullRequest实例放入阻塞队列pullRequestQueue中。
  • Consumer端开启后台独立的线程PullMessageService不断地从阻塞队列pullRequestQueue中获取PullRequest请求并通过网络通信模块异步发送Pull消息的RPC请求给Broker端。这里算是比较典型的生产者-消费者模型,实现了准实时的自动消息拉取。
  • PullMessageService异步拉取到消息后,通过PullCallback进行回调处理,如果拉取成功,则更新消费进度,putPullRequest到阻塞队列pullRequestQueue中,接着立即进行拉取
  • PullCallback会把拉取到的消息交给Consumerrequest进行处理,Consumerrequest会调用消费者业务方实现的consumeMessage()接口处理具体业务,消费者业务方处理完成后返回ACK给Consumerrequest,如果消费者ACK返回的失败,则在集群模式下把消息发回 Broker 进行重试(广播模型重试的成本太高),最后更新消费进度offsetTable
  • 在Broker端,PullMessageProcessor业务处理器收到Pull消息的RPC请求后,通过MessageStore实例从commitLog获取消息。如果第一次尝试Pull消息失败(比如Broker端没有可以消费的消息),则通过长轮询机制先hold住并且挂起该请求,然后通过Broker端的后台线程PullRequestHoldService重新尝试和后台线程ReputMessageService进行二次处理。

Push消息流程图:

RocketMQ消息消费的长轮询机制

  • 普通轮询比较简单,就是定时发起请求,服务端收到请求后不论数据有没有更新都立即返回优点就是实现简单,容易理解。
    缺点就是服务端是被动的,服务端要不断的处理客户端连接,并且服务端无法控制客户端pull的频率以及客户端数量
  • 长轮询是对普通轮询的优化,依然由客户端发起请求,服务端收到后并不立即响应而是hold住客户端连接,等待数据产生变更后(或者超过指定时间还未产生变更)才回复客户端说白了,就是对普通轮询加了个控制,你客户端可以随时请求我,但是回不回复我说了算,这就保证了服务端不会被客户端带节奏,导致自己的压力不可控

在 RocketMq 中消费者主动发起pull请求,broker在处理消息拉取请求时,如果没有查询到消息,将不返回消费者任何信息,而是先hold住并且挂起请求,使其不会立即发起下一次拉取请求,会将请求信息pullRequest添加到pullRequestTable中,等待触发通知消费者的事件。
(pullRequestTable表示待处理的消息拉取请求集合,它的key是Topic+queueId,value中包含了消费者信息(与该消费者的长连接channel),以及其想要拉取的消息位置,后面需要根据这些信息来将对应的新消息返回给对应的消费者)。

然后在Broker端,通过后台独立线程PullRequestHoldService遍历所有挂起的请求pullRequestTable,如果有消息,则返回响应给消费者。
同时,另外一个ReputMessageService线程不断地构建ConsumeQueue/IndexFile数据,不断的检测是否有新消息产生,如果有新消息,则从pullRequestTable通过Topic+queueId的key获取对应hold住的请求pullRequest,再根据其中的长链接channel进行通信响应。
通过这种长轮询机制,即可解决Consumer端需要通过不断地发送无效的轮询Pull请求,而导致整个RocketMQ集群中Broker端负载很高的问题。

长轮询流程

资料引用:
http://www.jianshu.com/p/fac642f3c… blog.csdn.net/wb_snail/ar… segmentfault.com/a/119000002…

作者:大王叫下

来源:https://juejin.im/post/5eca204bf265da76ef2867f6


版权声明:文末如注明作者和来源,则表示本文系转载,版权为原作者所有 | 本文如有侵权,请及时联系,承诺在收到消息后第一时间删除 | 如转载本文,请注明原文链接。
喜欢 (0)