消息队列
1. 消息队列的作用?
异步处理
可以更快地返回结果;
减少等待,自然实现了步骤之间的并发,提升系统总体的性能。
流量控制
服务解耦
2. 消息队列选型
如果对消息队列功能和性能都没有很高的要求,只需要一个开箱即用易于维护的产品,建议使用 RabbitMQ。
如果消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,那 RocketMQ 的低延迟和金融级的稳定性更优秀。
如果你需要处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 更优秀。
3. 主题和队列有什么区别
这两个概念的背后实际上对应着两种不同的消息模型:队列模型和发布 - 订阅模型。这两种消息模型其实并没有本质上的区别,都可以通过一些扩展或者变化来互相替代。
常用的消息队列中,RabbitMQ 采用的是队列模型,但是它一样可以实现发布 - 订阅的功能。RocketMQ 和 Kafka 采用的是发布 - 订阅模型,并且二者的消息模型是基本一致的。
4. RabbitMQ 的消息模型
在 RabbitMQ 中,Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。
同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务。这也可以变相地实现新发布 - 订阅模型中,“一份消息数据可以被多个订阅者来多次消费”这样的功能
5. 如何利用事务消息实现分布式事务?
首先,订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息包含的内容就是完整的消息内容,和普通消息的唯一区别是:在事务提交之前,对于消费者来说,这个消息是不可见的。
半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。
RocketMQ 的事务反查机制通过定期反查事务状态,来补偿提交事务消息可能出现的通信失败。
6. 如何确保消息不会丢失
生产阶段
在生产阶段,消息从生产者发送到消息队列(Broker)。此阶段可能会因为网络问题导致消息丢失。为了确保消息不丢失,可以采取以下措施:
确认机制:使用消息确认机制,确保 Broker 成功接收并存储消息后,才返回成功响应给生产者。
重试机制:如果生产者长时间未收到确认响应,可以自动重试发送消息,直到达到最大重试次数或明确失败。
存储阶段
在存储阶段,消息被存储在 Broker 中,这个过程也可能导致消息丢失,特别是在 Broker 故障时。为确保消息的持久性,可以采取以下策略:
持久化存储:配置 Broker 使用同步刷盘(SYNC_FLUSH)策略,即在消息写入磁盘后再返回成功响应,这样即使 Broker 宕机,消息也不会丢失。
高水位设置:在集群模式下,确保消息被复制到多个 Broker 节点(主从复制),这样即使主节点出现故障,从节点也能提供备份,防止消息丢失。
消费阶段
在消费阶段,消费者从 Broker 拉取消息并进行处理。此阶段同样可能导致消息丢失,主要是因为消费者未能成功处理消息或未确认消费。为避免此类问题,可以采取以下措施:
手动确认 offset:消费者在成功处理消息后,手动提交 offset,确保 Broker 知道该消息已被成功消费。如果处理失败,消费者可以选择不提交 offset,从而在下次拉取时重新获取该消息。
重试机制:如果消费者处理消息失败,可以在处理逻辑中实现重试机制,确保消息最终被成功处理。
消息丢失的检测
为了检测消息是否丢失,可以利用消息的有序性。生产者在发送每条消息时附加一个递增的序号,消费者在接收消息时检查这些序号的连续性。如果发现序号不连续,则说明有消息丢失。
7. 检测消息丢失的方法
使用分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。
我们可以利用消息队列的有序性来验证是否有消息丢失。在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。
8. 如何处理消费过程中的重复消息?
通过幂等消费来解决消息重复的问题
唯一约束:在数据库中设置唯一约束,确保重复的插入操作不会导致数据重复。
状态检查:在处理消息前检查状态,如果消息已处理则跳过。例如,使用状态标识(如已处理标志)来记录消息处理状态。
去重表:使用去重表记录已处理的消息 ID,处理前检查消息 ID 是否存在于去重表中。
9. 消息积压了该如何处理?
优化消息收发性能,预防消息积压的方法有两种,增加批量或者是增加并发,在发送端这两种方法都可以使用,在消费端需要注意的是,增加并发需要同步扩容分区数量,否则是起不到效果的。因为对于消费者来说,在每个分区上实际上只能支持单线程消费。
对于系统发生消息积压的情况,需要先解决积压,再分析原因,毕竟保证系统的可用性是首先要解决的问题。快速解决积压的方法就是通过水平扩容增加 Consumer 的实例数量。
10. 如何保证消息的严格顺序?
使用单一消费者处理消息,确保消息按照发送顺序依次处理。
将消息按照某种规则(如消息键、用户 ID 等)分配到不同的分区,每个分区由一个消费者处理,确保同一分区内的消息顺序。可以在发送端使用账户 ID 作为 Key,采用一致性哈希算法计算出队列编号,指定队列来发送消息。这样可以保证相同 Key 的消息是严格有序的。
11. RabbitMQ 的缺点
系统可用性降低
系统复杂度提高
管理复杂性 —— RabbitMQ 的安装、配置和管理相对复杂
12. RabbitMQ 的工作模式
Simple 模式(即最简单的收发模式)
消息产生消息,将消息放入队列。
消息的消费者 (consumer) 监听消息队列,消息被拿走后,自动从队列中删除(隐患:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的 ack,但如果设置成手动 ack,处理完后要及时发送 ack 消息给队列,否则会造成内存溢出)。
Work 工作模式(资源的竞争)
消息产生者将消息放入队列,消费者可以有多个,消费者 1、消费者 2 同时监听同一个队列,共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关 (syncronize) 保证一条消息只能被一个消费者使用)。
Publish/Subscribe 发布订阅(共享资源)
每个消费者监听自己的队列。
生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
Routing 路由模式
消息生产者将消息发送给交换机按照路由判断,路由是字符串 (info) 当前产生的消息携带路由字符 (对象的方法),交换机根据路由的 key,匹配上对应的消息队列,对应的消费者才能消费消息。
根据业务功能定义路由字符串。
从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
Topic 主题模式
星号井号代表通配符
星号代表多个单词,井号代表一个单词
路由功能添加模糊匹配
消息产生者产生消息,把消息交给交换机
交换机根据 key 的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
13. Kafka 如何实现高性能
磁盘顺序读写 Kafka 对磁盘的应用,得益于消息队列的存储特性。与普通的关系型数据库、各类 NoSQL 数据库等不同,消息队列对外提供的主要方法是生产和消费,不涉及数据的 CRUD。所以在写入磁盘时,可以使用顺序追加的方式来避免低效的磁盘寻址。
批量操作优化 Kafka 的批量包括批量写入、批量发布等。它在消息投递时会将消息缓存起来,然后批量发送;同样,消费端在消费消息时,也不是一条一条处理的,而是批量进行拉取,提高了消息的处理速度。
Sendfile 零拷贝 Kafka 把所有的消息都存放在单独的文件里,在消息投递时直接通过 Sendfile 方法发送文件,减少了上下文切换,因此大大提高了性能。
MMAP 技术 Kafka 使用 Memory Mapped Files 完成内存映射,Memory Mapped Files 对文件的操作不是 write/read,而是直接对内存地址的操作。如果是调用文件的 read 操作,则把数据先读取到内核空间中,然后再复制到用户空间。 但 MMAP 可以将文件直接映射到用户态的内存空间,省去了用户空间到内核空间复制的开销,所以说 MMAP 也是一种零拷贝技术。
14. MQTT 中的 QoS
QoS 0:最多分发一次(At most once) — 这是一种最低级别的服务,消息传递给接收者没有任何的确认机制,消息可能会丢失,也不可知。同时,这种模式的性能最好,因为没有重传或者确认的开销。
QoS 1:至少分发一次(At least once) — 针对这种级别的消息,确保消息至少会被传递到接收者一次,但可能会有重复。发送者将发送消息直到他收到接收者已收到消息的确认。这种模式可能会导致消息的重复传送。
QoS 2:仅分发一次(Exactly once) — 这是最高级别的消息分发保证,确保消息只被接收者接收和处理一次。这种模式适用于所有对消息重复敏感的场景,但它的开销也是最大的。
15. MQTT 的优点
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,特别适合在网络带宽低、网络不稳定、硬件性能差等情况下使用。所以,它经常被用于硬件设备的交互。
资源消耗小:MQTT 设计得非常精简且易于实现,因此占用的资源非常低。对于性能有限的硬件设备来说,这一点非常重要。
网络带宽需求低:因为 MQTT 使用了发布/订阅模型,所以消息可以有效地在所有订阅的客户端之间分发,而无需创建多个连接。MQTT 的报头也非常小(只有 2 字节),因此减少了网络带宽的需求。
遥测特性:MQTT 协议是为大规模远程设备构建的,非常适合物联网(IoT)应用。
质量服务:MQTT 支持 3 种不同级别的消息传送质量(QoS),可以根据需要选择最适合设备和网络条件的级别。
16. Kafka 是如何做数据持久化的
Kafka 通过日志文件的方式实现数据持久化。具体来说,Kafka 将消息追加写入到一个称为日志(Log)的文件中,这个日志文件是一个持久化的、有序的、不可修改的消息记录。一旦消息写入到日志文件中,就会被存储在磁盘上,即使 Kafka 服务发生故障或 Broker 重启,消息数据仍然可以从磁盘上加载并重新构建。
为了快速检索消息,Kafka 维护了一个消息索引,存储了每个分区中消息的偏移量和物理位置,使得 Kafka 能够快速定位和检索消息。此外,为了进一步提高可靠性,Kafka 支持消息的复制,每个分区的消息可以有多个副本,它们分布在不同的 Broker 上,通过 ISR(In-Sync Replica)机制保障了 Leader 和 Follower 之间的数据同步,保证了消息的持久性。
17. Kafka 如何保证消息有序
1 个 Topic 只对应一个 Partition。
发送消息的时候指定 key/Partition。
18. Kafka 如何保证消息不重复消费
Kafka 出现消息重复消费的原因:
服务端侧已经消费的数据没有成功提交 offset(根本原因)。
Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:
消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。
将
enable.auto.commit
参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交 offset 合适?处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
19. Kafka 的分区分配策略
RangeAssignor(范围分配策略):这是默认的分配策略。它首先将分区按数字顺序排列,然后将消费者按字典序排列。分区数量除以消费者数量,以确定每个消费者应负责的分区数量。如果无法均匀分配,则前面的消费者会多一个分区。
RoundRobinAssignor(轮循分配策略):该策略将所有分区和消费者按字典序排序,然后通过轮询的方式将分区依次分配给每个消费者。这种方法适用于消费者数量和分区数量相对接近的情况。
StickyAssignor(粘性分配策略):这是较新的分配策略,旨在减少重新平衡时的分区变更,提供更好的负载均衡和更少的分区迁移。它在分配时优先考虑保持消费者之前的分配状态。
分区分配的实施过程
分区分配的过程主要由 Kafka 的组协调器(GroupCoordinator)管理。其实施步骤如下:
消费者提案:每个消费者向 GroupCoordinator 发送 JoinGroupRequest 请求,包含其分配策略和订阅信息。
选举 Leader 和分配策略:GroupCoordinator 收集所有消费者的提案,选举出一个消费者作为组的 Leader,并确定使用的分区分配策略。
执行分配:Leader 根据选定的分配策略执行分区的具体分配,并将结果通知所有消费者。
重新平衡:当消费者加入或离开、主题新增分区时,Kafka 会触发重新平衡,重新分配分区。
20. 哪些 MQ 能有序消费
RocketMQ
RocketMQ 支持两种有序消费模式:
全局顺序消息: 某个 Topic 下的所有消息都保证严格的 FIFO 顺序。适用于性能要求不高的场景。
分区顺序消息: 根据 sharding key 将消息分区。同一个分区内消息按 FIFO 顺序发布和消费。适用于性能要求高的场景。
实现原理是生产者有序存储消息到不同队列,消费者从队列中有序拉取消费。
Kafka
Kafka 支持分区有序,即同一个分区内的消息是有序的。不同分区之间是并行无序的。
RabbitMQ
RabbitMQ 支持通过设置消费者 prefetch_count 参数为 1 来实现单个消费者的有序消费。
Last updated