消息队列

  1. 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
  2. 消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在,从而实现模块间的解耦合。
  3. 消息队列是一种应用间的异步协作机制,同时消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。

消息队列的作用

  1. 解耦: 消息队列可以将系统内不同模块之间的通信解耦,使得模块之间不直接依赖于彼此的实现。这降低了系统的复杂性,使得各个组件可以独立演化。
  2. 异步通信: 消息队列支持异步通信,发送方将消息放入队列后即可继续执行其他操作,而不需要等待接收方的处理。这提高了系统的响应性,特别是在处理大量请求或在网络延迟较高的情况下。
  3. 流量削峰: 消息队列可以用于缓冲和平滑处理系统的流量,防止突发的高峰值请求对系统造成压力。通过控制消息的消费速率,可以有效地削平流量峰值。

常见消息队列对比

常见的消息队列系统包括RocketMQ、RabbitMQ和Kafka,它们在一些方面有一些共同点,但也有一些明显的区别。以下是它们的一些对比:

  1. 消息传递模型:
    • RocketMQ: 提供点对点和发布/订阅两种消息传递模型。
    • RabbitMQ: 同样支持点对点和发布/订阅模型,非常灵活。
    • Kafka: 主要支持发布/订阅模型。
  2. 性能:
    • RocketMQ: 注重高吞吐量和低延迟,适合实时消息处理。
    • RabbitMQ: 相对较低的延迟,适用于一些对延迟敏感的场景。
    • Kafka: 专注于高吞吐量和持久性,适用于大规模数据处理。
  3. 可靠性:
    • RocketMQ: 提供了较好的消息可靠性,支持同步和异步复制。
    • RabbitMQ: 可以配置为提供高可靠性,但需要在配置上更多的努力。
    • Kafka: 通过分区和副本机制来确保消息的可靠性。
  4. 水平扩展性:
    • RocketMQ: 支持较好的水平扩展性,可以通过添加节点来增加容量。
    • RabbitMQ: 可以通过集群来实现水平扩展,但可能相对复杂。
    • Kafka: 具有出色的水平扩展性,适用于大规模数据流处理。
  5. 使用场景:
    • RocketMQ: 适用于大规模实时消息处理,例如电商交易系统。
    • RabbitMQ: 适用于传统企业应用,也可以用于实时数据处理。
    • Kafka: 适用于大规模数据流处理,如日志收集、事件流处理等。

RocketMQ

架构设计

  1. 生产者(Producer):消息的发送者,负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。
  2. 消费者(Consumer):消息接收者,负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。
  3. 消息服务器(Broker):暂存和传输消息;是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。
  4. 名称服务器(NameServer):管理Broker,用来保存 Broker 相关 Topic 等元信息并给 Producer ,提供 Consumer 查找 Broker 信息。
  5. 主题(Topic):区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者 可以订阅一个或者多个Topic消息
  6. 消息队列(Message Queue):相当于是Topic的分区;用于并行发送和接收消息
  7. 标签(Tag): 用于对消息进行二级分类。一个 Topic 可以有多个 Tag,用于更细粒度地过滤消息

启动流程

图片损坏
  1. 启动 NameServer,NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。
  2. 启动 Broker,Broker启动后与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
  3. 创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
  4. 生产者发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker发消息。
  5. 消费者接受消息,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,然后开始消费消息。

特点

顺序消费

顺序消息是 RocketMQ 提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的 Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。

分区顺序消息

对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。

  1. 适用场景:适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
  2. 示例:
    • 用户注册需要发送验证码,以用户 ID 作为 Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
    • 电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

全局顺序消息

对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。

  1. 适用场景:适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
  2. 示例:
  • 在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。
  1. 全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同。因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。

消息可靠性

保证消息可靠性也就是保证消息一定会被消费,不会丢失,而消息丢失可能会在这三个阶段发生:生产阶段、存储阶段、消费阶段

  1. 生产阶段:请求确认机制
    • 同步发送的时候,要注意处理响应结果和异常。如果返回响应OK,表示消息成功发送到了Broker,如果响应失败,或者发生其它异常,都应该重试。
    • 异步发送的时候,应该在回调方法里检查,如果发送失败或者异常,都应该进行重试。
    • 如果发生超时的情况,也可以通过查询日志的API,来检查是否在Broker存储成功。
  2. 存储阶段:通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,也就是进行数据持久化
    • 消息只要持久化到CommitLog(日志文件)中,即使Broker宕机,未消费的消息也能重新恢复再消费。
    • Broker的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在pagecache中(内存中),但是同步刷盘更可靠,它是Producer发送消息后等数据持久化到磁盘之后再返回响应给Producer。
    • Broker通过主从模式来保证高可用,Broker支持Master和Slave同步复制、Master和Slave异步复制模式,生产者的消息都是发送给Master,但是消费既可以从Master消费,也可以从Slave消费。同步复制模式可以保证即使Master宕机,消息肯定在Slave中有备份,保证了消息不会丢失。
  3. 消费阶段
    • Consumer保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

死信队列

  1. 死信队列用于处理无法被正常消费的消息,即死信消息。
  2. 当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列。
  3. 死信消息的特点:
    • 不会再被消费者正常消费。
    • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,需要在死信消息产生后的 3 天内及时处理。
  4. 死信队列的特点:
    • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
    • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
    • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
    • RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。

延迟队列

RocketMQ是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:

1
2
3
4
5
6
7
8
9
10
11
12
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}

但是目前RocketMQ支持的延时级别是有限的:

1
2
private String messageDelayLevel = 
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

RocketMQ通过临时存储+定时任务来实现延时消息
Broker收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX)的相应时间段的Message Queue中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标Topic的队列中,然后消费者就可以正常消费这些消息。

常见问题解决

消息幂等性

对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的“有且仅有一次” 。RocketMQ择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
处理消息重复问题,主要由业务端保证,主要的方式有两种:业务幂等和消息去重

  1. 业务幂等:保证消费逻辑的幂等性,也就是多次调用和一次调用的效果是一样的。这样一来,不管消息消费多少次,对业务都没有影响。
  2. 消息去重:业务端对重复的消息就不再消费了。这种方法,需要保证每条消息都有一个唯一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性。
    基于以下两个方向思考:
  3. 接口只允许调用一次,比如mysql唯一索引,基于redis分布式锁机制
  4. 对数据的影响只会触发一次,比如乐观锁或状态机(数据的状态信息是向前变化的,因此如果数据的当前状态小于记录的状态,就可以忽略这次修改的信息)

消息积压

事前处理:上线之前对流量有个预估,压测得出消费者的消费能力上限,再根据实际情况进行部署以支撑整个服务
事中处理:先保证服务,进行消费者的临时扩容操作

  1. 消费者扩容:
    • 如果当前Topic的Message Queue的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。
  2. 消息迁移Queue扩容:
    • 如果当前Topic的Message Queue的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容Message Queue。可以新建一个临时的Topic,临时的Topic多设置一些Message Queue,然后先用一些消费者把消费的数据丢到临时的Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的Topic里的数据,消费完了之后,恢复原状。
      事后处理:
  3. 提高并行消费度,如:合理设置消费者组;增加队列分区,考虑把消息分散到多个队列中,避免单个队列出现积压
  4. 批量方式消费,提高消费吞吐量
  5. 调整消息处理的优先级:根据消息的重要性和紧急程度,调整消息处理的优先级。优先处理重要的消息,确保关键业务的及时性,而对于非关键的消息可以进行降级处理、跳过、或延后处理。
  6. 优化每条消息的消费过程(业务)
    5. 合理设置超时机制
  7. 数据清理和重试:定期清理过期或无效信息
  8. 扩容:若MQ出现性能瓶颈,对MQ扩容
  9. 增加消费者数量:增加消费能力

消息过滤

  1. 在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。
  2. 在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。
  3. 一般采用Cosumer端过滤,如果希望提高吞吐量,可以采用Broker过滤。
  4. 对消息的过滤有三种方式:
    • 根据Tag过滤:高效简单
    • SQL 表达式过滤:更加灵活
    • Filter Server 方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤

高可用(消息持久化)

  1. RocketMQ的高可用主要是在体现在Broker的读和写的高可用,Broker的高可用是通过主从集群实现的
  2. Broker可以配置两种角色:Master和Slave
    • Master角色的Broker支持读和写
    • Slave角色的Broker只支持读
    • Master会向Slave同步消息。
    • Producer只能向Master角色的Broker写入消息,Cosumer可以从Master和Slave角色的Broker读取消息。
  3. 读的高可用:自动切换
    • Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。
  4. 写的高可用:多Master
    • 在创建 Topic 的时候,把 Topic 的多个Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,其他组Master 仍然可用, Producer 仍然可以发送消息
    • RocketMQ 目前还不支持把Slave自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker。