消息队列有什么用?

通常来说,消息队列有三个作用:

  1. 异步
  2. 削峰
  3. 解耦

除此之外,消息队列还有一个频繁使用的场景是延时队列。

异步

通过将次要操作异步处理,可以减少接口的响应时间,提高系统性能。

比如在购买火车票的场景中,用户在下订单时不需要立马发送短信通知,可以将发送短信通知放入消息队列中,由消费者进行消费该消息进行真正的短信发送。

削峰

在一些电商的秒杀场景中,可能会短时间发生非常高的并发,但是系统没有能力一下子处理全部的请求。

这个时候就可以将这些消息放入消息队列中,然后消费者再根据自身的能力慢慢去处理这些消息,这样就避免了服务被直接压垮。

解耦

使用消息队列还可以降低系统耦合性。我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。

比如下订单后,订单模块发送一条创建订单成功的消息到消息队列,而如果财务(扣款)、仓储(库存管理)、物流(发货)、消息通知(通知用户发货)、风控(风险评估)等服务需要监听创建订单后并执行相应的操作,只需要订阅这个消息即可,这样就把生产者和消费者解耦了。

延时队列

在一些电商场景中,生成订单后15分钟之内如果没有支付,则需要取消订单,这个场景就可以使用消息队列的延时队列功能。

消息队列如何选型

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支撑高吞吐10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内
可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同 RocketMQ
功能支持MQ 领域的功能极其完备基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

综上,各种对比之后,有如下建议:

一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了。

后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高。

不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

参考:https://github.com/doocs/advanced-java/blob/main/docs/high-concurrency/why-mq.md

如何保证 RabbitMQ 的消息可靠性

所谓保证消息可靠性,其实就是保证生产者 -> MQ -> 消费者这整个链路成功执行。

而在这条链路中可能发生消息丢失的场景有:

  1. 生产者 -> MQ
  2. MQ宕机
  3. MQ -> 消费者

1、生产者 -> MQ

为了避免因为网络故障或闪断问题导致消息无法正常发送到 RabbitMQ Server 的情况,RabbitMQ 提供了两种方案让生产者可以感知到消息是否正确无误的发送到 RabbitMQ Server 中,这两种方案分别是 事务机制发送方确认机制

事务机制其实是同步操作,存在阻塞生产者的情况直到 RabbitMQ Server 应答,这样其实会很大程度上降低发送消息的性能,所以一般不会使用事务机制来保证生产者的消息可靠性,而是使用发送方确认机制。

在发送MQ消息成功后,通过调用 setConfirmCallback() 实现异步 confirm 模式感知消息发送结果,如果消息发送失败,可以重新发送,或者业务告警由开发人员手动介入。

2、MQ宕机

对于消息的持久化,只需要在发送消息时将消息持久化,并且在创建交换机和队列时也保证持久化即可。

3、MQ -> 消费者

RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息,默认情况下,消费者应答机制是自动应答的,也就是 RabbitMQ 将消息推送给消费者,便会从队列删除该消息,如果消费者在消费过程失败时,消息就存在丢失的情况。所以需要将消费者应答机制设置为手动应答,只有消费者确认消费成功后才会删除消息,从而避免消息的丢失。

可以通过调用 channel.basicAck() 与 channel.basicNack()来根据业务的执行成功选择是手动确认消费还是手动丢弃消息。

除了消费者应答机制外,Spring Boot 也提供了一种重试机制,只需要通过配置即可实现消息重试从而确保消息的可靠性。

参考:https://xie.infoq.cn/article/12c4bd997a7bd20985f2ddc00

怎么防止消息重复消费

在消息传递过程中,如果出现传递失败的情况,发送方会执行重试,重试的过程中就有可能会产生重复的消息。对使用消息队列的业务系统来说,如果没有对重复消息进行处理,就有可能会导致系统的数据出现错误。

一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性,一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同

下面我给你介绍几种常用的设计幂等操作的方法。

1、利用数据库的唯一约束实现幂等

详细略。

2、为更新的数据设置前置条件

对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。

但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?

更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

3、记录并检查操作

如果上面提到的两种实现幂等方法都不能适用于你的场景,我们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

参考:https://www.cnblogs.com/shengyang17/p/14254515.html#_label1

消息积压问题

消息积压处理:
1、发送端优化,增加批量和线程并发两种方式处理
2、消费端优化,优化业务逻辑代码、水平扩容增加并发并同步扩容分区数量
查看消息积压的方法:
1、消息队列内置监控,查看发送端发送消息与消费端消费消息的速度变化
2、查看日志是否有大量的消费错误
3、打印堆栈信息,查看消费线程卡点信息

面试解决消息积压的方法:
(1)临时扩容,增加消费端,用硬件提升消费速度。
(2)服务降级,关闭一些非核心业务,减少消息生产。
(3)通过日志分析,监控等找到挤压原因,消息队列三部分,上游生产者是否异常生产大量数据,中游消息队列存储层是否出现问题,下游消费速度是否变慢,就能确定哪个环节出了问题
(4)根据排查解决异常部分。
(5)等待积压的消息被消费,恢复到正常状态,撤掉扩容服务器。

参考:https://www.cnblogs.com/shengyang17/p/14254515.html#_label2

如何保证消息的顺序性?

1、RabbitMQ 保证消息的顺序性

RabbitMQ 的问题是由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。解决这个问题,我们可以给 RabbitMQ 创建多个 queue,每个消费者固定消费一个 queue 的消息,生产者发送消息的时候,同一个订单号的消息发送到同一个 queue 中,由于同一个 queue 的消息是一定会保证有序的,那么同一个订单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性。

2、Kafka 保证消息的顺序性

Kafka 从生产者到消费者消费消息这一整个过程其实都是可以保证有序的,导致最终乱序是由于消费者端需要使用多线程并发处理消息来提高吞吐量,比如消费者消费到了消息以后,开启 32 个线程处理消息,每个线程线程处理消息的快慢是不一致的,所以才会导致最终消息有可能不一致。

所以对于 Kafka 的消息顺序性保证,其实我们只需要保证同一个订单号的消息只被同一个线程处理的就可以了。由此我们可以在线程处理前增加个内存队列,每个线程只负责处理其中一个内存队列的消息,同一个订单号的消息发送到同一个内存队列中即可。

3、RocketMQ 保证消息的顺序性

RocketMQ 的消息乱序是由于同一个订单号的 binlog 进入了不同的 MessageQueue,进而导致一个订单的 binlog 被不同机器上的 Consumer 处理。

要解决 RocketMQ 的乱序问题,我们只需要想办法让同一个订单的 binlog 进入到同一个 MessageQueue 中就可以了。因为同一个 MessageQueue 内的消息是一定有序的,一个 MessageQueue 中的消息只能交给一个 Consumer 来进行处理,所以 Consumer 消费的时候就一定会是有序的。

参考:https://xie.infoq.cn/article/c84491a814f99c7b9965732b1