消息队列

1. 消息队列的作用?

  • 异步处理

    • 可以更快地返回结果;

    • 减少等待,自然实现了步骤之间的并发,提升系统总体的性能。

  • 流量控制

  • 服务解耦

2. 消息队列选型

如果说,消息队列并不是你将要构建系统的主角之一,你对消息队列功能和性能都没有很高的要求,只需要一个开箱即用易于维护的产品,我建议你使用 RabbitMQ。

如果你的系统使用消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,那 RocketMQ 的低延迟和金融级的稳定性是你需要的。

如果你需要处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 是最适合你的消息队列。

3. 主题和队列有什么区别?

这两个概念的背后实际上对应着两种不同的消息模型:队列模型和发布 - 订阅模型。然后你需要理解,这两种消息模型其实并没有本质上的区别,都可以通过一些扩展或者变化来互相替代。

常用的消息队列中,RabbitMQ 采用的是队列模型,但是它一样可以实现发布 - 订阅的功能。RocketMQ 和 Kafka 采用的是发布 - 订阅模型,并且二者的消息模型是基本一致的。

4. RabbitMQ 的消息模型

在 RabbitMQ 中,Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。

同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务。这也可以变相地实现新发布 - 订阅模型中,“一份消息数据可以被多个订阅者来多次消费”这样的功能

5. 如何利用事务消息实现分布式事务?

首先,订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。

半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。

RocketMQ 的事务反查机制,这种机制通过定期反查事务状态,来补偿提交事务消息可能出现的通信失败。在 Kafka 的事务功能中,并没有类似的反查机制,需要用户自行去解决这个问题。

6. 如何确保消息不会丢失

  • 持久化:RabbitMQ 支持消息的持久化,即使在 RabbitMQ 服务器重启后,消息也能够得到保留。要实现消息的持久化,需要设置队列和消息的持久化属性。

  • 确认机制:RabbitMQ 提供了确认机制,即生产者在将消息发送到队列后,会等待 RabbitMQ 的确认。只有当 RabbitMQ 成功接收到消息并将其写入磁盘后,才会发送确认给生产者。如果 RabbitMQ 在接收消息之前发生故障,生产者将会收到连接异常的通知,从而可以采取相应的处理措施。

  • 备份交换器:RabbitMQ 支持备份交换器机制,通过将备份交换器绑定到主交换器上,可以将未能被正确路由的消息发送到备份交换器指定的队列中,从而避免消息丢失。

  • 高可用集群:通过搭建 RabbitMQ 的高可用集群,可以实现消息的冗余备份。当某个节点发生故障时,其他节点可以接管该节点的工作,保证消息的可靠传递。

7. 检测消息丢失的方法

  • 使用分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。

  • 我们可以利用消息队列的有序性来验证是否有消息丢失。在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。

8. 如何处理消费过程中的重复消息?

可以通过幂等消费来解决消息重复的问题

  • 可以利用数据库的约束来防止重复更新数据

  • 可以为数据更新设置一次性的前置条件,来防止重复消息。

  • 还可以用“记录并检查操作”的方式来保证幂等,这种方法适用范围最广,但是实现难度和复杂度也比较高,一般不推荐使用。

9. 消息积压了该如何处理?

优化消息收发性能,预防消息积压的方法有两种,增加批量或者是增加并发,在发送端这两种方法都可以使用,在消费端需要注意的是,增加并发需要同步扩容分区数量,否则是起不到效果的。因为对于消费者来说,在每个分区上实际上只能支持单线程消费。

对于系统发生消息积压的情况,需要先解决积压,再分析原因,毕竟保证系统的可用性是首先要解决的问题。快速解决积压的方法就是通过水平扩容增加 Consumer 的实例数量。

10. 如何保证消息的严格顺序?

保证局部严格顺序,可以这样来实现。在发送端,我们使用账户 ID 作为 Key,采用一致性哈希算法计算出队列编号,指定队列来发送消息。一致性哈希算法可以保证,相同 Key 的消息总是发送到同一个队列上,这样可以保证相同 Key 的消息是严格有序的。如果不考虑队列扩容,也可以用队列数量取模的简单方法来计算队列编号。

11. RabbitMQ 的缺点

  • 系统可用性降低

  • 系统复杂度提高

  • 一致性问题

12. RabbitMQ 的工作模式

  1. Simple 模式(即最简单的收发模式)

  • 消息产生消息,将消息放入队列。

  • 消息的消费者 (consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的 ack,但如果设置成手动 ack,处理完后要及时发送 ack 消息给队列,否则会造成内存溢出)。

  1. Work 工作模式(资源的竞争)

消息产生者将消息放入队列,消费者可以有多个,消费者 1、消费者 2 同时监听同一个队列,共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关 (syncronize) 保证一条消息只能被一个消费者使用)。

  1. Publish/Subscribe 发布订阅(共享资源)

  • 每个消费者监听自己的队列。

  • 生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

  1. Routing 路由模式

  • 消息生产者将消息发送给交换机按照路由判断,路由是字符串 (info) 当前产生的消息携带路由字符 (对象的方法),交换机根据路由的 key,匹配上对应的消息队列,对应的消费者才能消费消息。

  • 根据业务功能定义路由字符串。

  • 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

  1. Topic 主题模式

  • 星号井号代表通配符

  • 星号代表多个单词,井号代表一个单词

  • 路由功能添加模糊匹配

  • 消息产生者产生消息,把消息交给交换机

  • 交换机根据 key 的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

13. Kafka 如何实现高性能

  • 磁盘顺序读写 Kafka 对磁盘的应用,得益于消息队列的存储特性。与普通的关系型数据库、各类 NoSQL 数据库等不同,消息队列对外提供的主要方法是生产和消费,不涉及数据的 CRUD。所以在写入磁盘时,可以使用顺序追加的方式来避免低效的磁盘寻址。

  • 批量操作优化 Kafka 的批量包括批量写入、批量发布等。它在消息投递时会将消息缓存起来,然后批量发送;同样,消费端在消费消息时,也不是一条一条处理的,而是批量进行拉取,提高了消息的处理速度。

  • Sendfile 零拷贝 Kafka 把所有的消息都存放在单独的文件里,在消息投递时直接通过 Sendfile 方法发送文件,减少了上下文切换,因此大大提高了性能。

  • MMAP 技术 Kafka 使用 Memory Mapped Files 完成内存映射,Memory Mapped Files 对文件的操作不是 write/read,而是直接对内存地址的操作。如果是调用文件的 read 操作,则把数据先读取到内核空间中,然后再复制到用户空间。 但 MMAP 可以将文件直接映射到用户态的内存空间,省去了用户空间到内核空间复制的开销,所以说 MMAP 也是一种零拷贝技术。

14. MQTT 中的 QoS

  1. QoS 0:最多分发一次(At most once) — 这是一种最低级别的服务,消息传递给接收者没有任何的确认机制,消息可能会丢失,也不可知。同时,这种模式的性能最好,因为没有重传或者确认的开销。

  2. QoS 1:至少分发一次(At least once) — 针对这种级别的消息,确保消息至少会被传递到接收者一次,但可能会有重复。发送者将发送消息直到他收到接收者已收到消息的确认。这种模式可能会导致消息的重复传送。

  3. QoS 2:仅分发一次(Exactly once) — 这是最高级别的消息分发保证,确保消息只被接收者接收和处理一次。这种模式适用于所有对消息重复敏感的场景,但它的开销也是最大的。

15. MQTT 的优点

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,特别适合在网络带宽低、网络不稳定、硬件性能差等情况下使用。所以,它经常被用于硬件设备的交互。以下是几个重要的原因:

  1. 资源消耗小:MQTT 设计得非常精简且易于实现,因此占用的资源非常低。对于性能有限的硬件设备来说,这一点非常重要。

  2. 网络带宽需求低:因为 MQTT 使用了发布/订阅模型,所以消息可以有效地在所有订阅的客户端之间分发,而无需创建多个连接。此外,MQTT 的报头也非常小(只有 2 字节),因此减少了网络带宽的需求。

  3. 遥测特性:MQTT 协议是为大规模远程设备构建的,非常适合物联网(IoT)应用。

  4. 质量服务:MQTT 支持 3 种不同级别的消息传送质量(QoS),可以根据需要选择最适合设备和网络条件的级别。

16. Kafka 是如何做数据持久化的

Kafka 通过日志文件的方式实现数据持久化。具体来说,Kafka 将消息追加写入到一个称为日志(Log)的文件中,这个日志文件是一个持久化的、有序的、不可修改的消息记录。一旦消息写入到日志文件中,就会被存储在磁盘上,即使 Kafka 服务发生故障或 Broker 重启,消息数据仍然可以从磁盘上加载并重新构建。

为了快速检索消息,Kafka 维护了一个消息索引,存储了每个分区中消息的偏移量和物理位置,使得 Kafka 能够快速定位和检索消息。此外,为了进一步提高可靠性,Kafka 支持消息的复制,每个分区的消息可以有多个副本,它们分布在不同的 Broker 上,通过 ISR(In-Sync Replica)机制保障了 Leader 和 Follower 之间的数据同步,保证了消息的持久性。

Last updated