消息队列基础知识
一、消息队列应用场景
1. 异步处理 如:秒杀功能
通常秒杀功能包括:风险控制、库存锁定、生成订单、短信通知、数据统计等,实际上只要用户请求通过风险控制,并完成库存锁定,就可以返回秒杀结果了,对于后续的生成订单、短信通知等步骤,并不一定要在秒杀请求中完成。可以把请求数据放入请求队列,由队列异步地进行后续的操作
2. 流量控制 - 达到雪峰填谷的作用
如何避免过多的请求压垮我们的系统? 一个健壮的程序应该可以在海量的请求下,在自身能力范围内尽可能地处理请求,拒绝处理不了的请求,而正常的运行。
有两种方式:
使用消息队列隔离网关和后端服务,以达到流量控制和保护后端服务的目的
加入消息队列后,整个秒杀流程变为:
网关在收到请求后,将请求放入请求消息队列;
后端服务从请求消息队列中获取 APP 请求,完成后续秒杀处理过程,然后返回结果
当大量请求到达网关时,不会直接冲击后端服务,而先堆积在消息队列中,后端服务根据自己最大处理能力,从队列中进行消费请求。对于超时的请求可以直接丢弃,返回调用者失败即可。运维人员还可以随时增加后端服务的实例数量,进行水平扩容,而不需要对其他服务进行更改。
优点: 能根据下游的处理能力自动调节流量,达到“削峰填谷”的作用
缺点: 增加了系统调用链环节,导致总体的响应时延变长。上下游系统都要将同步调用改为异步消息,增加了系统的复杂度。
令牌桶的方式
令牌桶的原理是: 单位时间内只发放固定数量的令牌到令牌桶中,规定服务在处理请求之前必须先从令牌桶中拿出一个令牌,如果令牌桶中没有令牌,则拒绝请求。这样就保证单位时间内,能处理的请求不超过发放令牌的数量,起到了流量控制的作用。
令牌桶可以简单地用一个有固定容量的消息队列加一个“令牌发生器”来实现:令牌发生器按照预估的处理能力,匀速生产令牌并放入令牌队列(如果队列满了则丢弃令牌),网关在收到请求时去令牌队列消费一个令牌,获取到令牌则继续调用后端秒杀服务,如果获取不到令牌则直接返回秒杀失败
3. 服务解耦
在没有使用消息队列时,上游系统需要应对下游系统化的变化,任何一个下游系统变更都需要尚有系统重新上线一次。所以引入消息队列来解决类似系统耦合过于紧密的问题。引入消息队列后,上游系统在变化时发送一条消息到某主题,所有下游系统都订阅该主题,这样每个下游系统都可以获取一份实时完整的数据。
无论增加、减少或是下游系统需求变化,上游服务都无需做任何更改,实现服务间的解耦
消息队列带来的问题和局限性
- 引入消息队列带来延迟问题
- 增加系统的复杂度
- 可能产生数据不一致问题
二、如何选择消息队列
RabbitMQ
优点
- 轻量级的消息队列,容易部署和使用
- 拥有灵活的路由配置,提供
Exchange
模块,根据配置的路有规则将生产者消息分发到不同的队列中,支持自己实现路有规则,扩展容易
缺点
- 对消息堆积支持不好,在它设计里面,消息队列是一个管道,当大量消息积压的时候,会导致
RabbitMQ
性能急剧下降 - 性能相较于其它消息队列是最差的,大概每秒可以处理几万到十几万消息,如果对性能要求非常高,那就不要选择
RabbitMQ
RabbitMQ
使用Erlang
开发,如果你想基于它做一些扩展和二次开发什么的,建议慎重考虑
- 对消息堆积支持不好,在它设计里面,消息队列是一个管道,当大量消息积压的时候,会导致
RocketMQ
- 优点
RocketMQ
中文社区活跃,大多数问题可以找到中文的答案。另外使用Java
语言开发,贡献者大多都是中文人,源码比较容易读懂,容易对其进行扩展或二次开发- 对响应做了很多优化,大多数情况下可以做到毫秒级响应
- 性能要比
RabbitMQ
高一个量级,每秒大概能处理几十万条消息
- 缺点
- 作为国产消息队列,在国际上还没那么流行,与周边生态系统集成和兼容程度要略逊一筹
Kafka
- 优点
- 与周边生态系统的兼容性是做好的,尤其在大数据和流计算领域,几乎所有相关的开源系统都会优先支持
Kafka
- 设计上大量的使用了批量和异步的思想,使之有着超高的性能。但与
RocketMQ
并没有量级上的差异,大约每秒可以处理几十万条消息
- 与周边生态系统的兼容性是做好的,尤其在大数据和流计算领域,几乎所有相关的开源系统都会优先支持
- 缺点
- 同步收发消息延迟比较高,因为收到一条消息时,
Kafka
并不会立即发出去,而是要攒一会,一批一起发送。如果每秒消息数量没那么多,延迟反而会比较高
- 同步收发消息延迟比较高,因为收到一条消息时,
Pulsar
如何选择?
选择中间件的考量维度:可靠性
,性能
,功能
,可运维性
,可拓展性
,是否开源
及 社区活跃度
如果说,消息队列并不是你将要构建系统的主角之一,你对消息队列功能和性能都没有很高的要求,只需要一个开箱即用易于维护的产品,建议你使用
RabbitMQ
。如果你的系统使用消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,那
RocketMQ
的低延迟和金融级的稳定性是你需要的。如果你需要处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 是最适合你的消息队列
三、消息模型
主题和队列有什么区别?
队列
就是按照“队列”的数据结构来设计的。生产者(Producer)发消息就是入队操作,消费者(Consumer)收消息就是出队也就是删除操作。这里面隐含着的一个要求是,在消息入队出队过程中,需要保证这些消息严格有序,按照什么顺序写进队列,必须按照同样的顺序从队列中读出来
- 如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到
- 如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息,例如,对于一份订单数据,风控系统、分析系统、支付系统等都需要接收消息。这个时候,单个队列就满足不了需求,一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份。显然这是个比较蠢的做法
- 同样的一份消息数据被复制到多个队列中会浪费资源
- 更重要的是,生产者必须知道有多少个消费者。为每个消费者单独发送一份消息,这实际上违背了消息队列“解耦”这个设计初衷
主题
为了解决上述问题,演化出了另外一种消息模型:发布 - 订阅模型(Publish-Subscribe Pattern)。消息的发送方称为发布者(Publisher
),消息的接收方称为订阅者(Subscriber
),服务端存放消息的容器称为主题(Topic
)。发布者将消息发送到主题中,订阅者在接收消息之前需要先订阅主题。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息
两者的区别
生产者就是发布者,消费者就是订阅者,队列就是主题,并没有本质的区别。它们最大的区别其实就是,一份消息数据能不能被消费多次的问题。 实际上,在这种发布 - 订阅模型
中,如果只有一个订阅者,那它和队列模型就基本是一样的了。也就是说,发布 - 订阅模型
在功能层面上是可以兼容队列模型
的
RabbitMQ消息模型
RabbitMQ
是坚持使用队列模型的产品之一。在 RabbitMQ
中,有一个 Exchange
模块,位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange
,由 Exchange
上配置的策略来决定将消息投递到哪些队列中
同一份消息如果需要被多个消费者来消费,需要配置 Exchange
将消息发送到多个队列,每个队列中都存放一份完整的消息数据,变相地实现 发布 - 订阅模型
RocketMQ消息模型
RocketMQ
使用的消息模型是标准的 发布 - 订阅模型
请求 - 确认机制
- 生产端
生产者先将消息发送给服务端,也就是 Broker
,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息
- 消费端
消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认
模型
这个确认机制很好地保证了消息传递过程中的可靠性,但是,为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞,违背了有序性这个原则
也就是说,每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。为了解决这个问题,RocketMQ
在主题下面增加了队列的概念:每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的
RocketMQ
中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息
在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一
Kafka消息模型
Kafka
的消息模型和 RocketMQ
是完全一样的,我刚刚讲的所有 RocketMQ
中对应的概念,和生产消费过程中的确认机制,都完全适用于 Kafka
。唯一的区别是,在 Kafka
中,队列这个概念的名称不一样,Kafka
中对应的名称是“分区(Partition)”,含义和功能是没有任何区别的
四、如何确保消息不会丢失
检测消息是否丢失
分布式链路追踪
可以使用此类系统来追踪每一条消息
利用消息队列的有序性来验证
在 生产者
端给每个发出的消息加上一个连续递增的序号,在 消费者
端来检查这个序号的连续性
大多数消息队列都支持 拦截器机制
,可以 在生产者的拦截器中注入消息序号 , 在消费者的拦截器中检测序号的连续性 ,这样的好处是检测代码不会侵入到业务代码中,系统稳定后也方便关闭或删除
分布式系统中需要注意的问题
- 像
Kafka
和RocketMQ
这样的消息队列,它不能保证在Topic
上的严格顺序的,只能保证分区/队列上的消息是有序的,所以我们在发消息的时候必须要指定分区/队列,并且,在每个分区/队列单独检测消息序号的连续性 - 如果系统中生产者是多实例,由于并不好协调多个
Producer
之间的发送顺序,所以需要每个Producer
分别生成各自的序号,在Consumer
端按照每个Producer
分别来检测序号的连续性 Consumer
实例的数量最好和分区/队列数量一致,做到Consumer
和分区/队列一一对应,这样会比较方便地在Consumer
内检测消息序号的连续性
2.如何确保消息可靠传递
消息传递主要分为三个阶段:生产阶段
,存储阶段
,消费阶段
- 生产阶段: 消息从
Producer
创建出来,经过网络传输到Broker
端 - 存储阶段: 消息在
Broker
存储,如果是集群,消息会在这个阶段被复制到其他的副本上 - 消费阶段: 在这个阶段,**
Consumer
从Broker
上拉取消息,经过网络传输到Consumer
上**
生产阶段
在生产阶段,消息队列通过 请求确认机制 ,来保证消息的可靠传递:当调用发消息方法时,消息队列客户端会将消息发送到 Broker
,Broker
收到消息后,会给客户端返回一个确认的响应,表明消息已经收到了。客户端收到响应后,完成一次正常的消息发送
只要 Producer
收到了 Broker
的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户
在编写发送消息代码时,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失
存储阶段
在存储阶段正常情况下,只要 Broker
在正常运行,就不会出现丢失消息的问题,但是如果 Broker
出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的
如果对消息的可靠性要求非常高,一般可以通过配置 Broker
参数来避免因为宕机丢消息
单个节点Broker
对于单个节点的 Broker
,需要配置刷盘策略,将消息写入磁盘后再给 Producer
返回确认响应,这样即使宕机,由于消息已经写入磁盘,就不会丢失消息,恢复后还可以继续消费
在
RocketMQ
中,将刷盘方式 flushDiskType 配置成 SYNC_FLUSH 同步刷盘
多节点集群Broker
如果是由多个节点组成的集群,可以将 Broker
集群配置成:至少将消息发送到2个以上的节点,再给客户端恢复确认响应。这样即使某个 Broker
宕机时,其它的 Broker
可以替代宕机的,也不会发生消息丢失
消费阶段
消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker
拉取消息后,执行业务逻辑,成功后才给 Broker
发送消费确认响应 ,如果没有收到响应下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失
在编写消费代码时,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认
3.总结
- 在生产阶段,你需要捕获消息发送的错误,并重发消息
- 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失
- 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认
五、如何处理重复消息
消息重复的情况必然存在
在 MQTT
协议中,给出了三种消息传递的标准:
- At most once:至多一次。 消息至多会被送达一次,换个说法也就是允许消息丢失,但不允许消息重复
- At lease once:至少一次。 消息至少被送达一次,也就是不允许丢消息,但有可能重复