RocketMQ入门
消息队列
- 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
- 消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在,从而实现模块间的解耦合。
- 消息队列是一种应用间的异步协作机制,同时消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。
消息队列的作用
- 解耦: 消息队列可以将系统内不同模块之间的通信解耦,使得模块之间不直接依赖于彼此的实现。这降低了系统的复杂性,使得各个组件可以独立演化。
- 异步通信: 消息队列支持异步通信,发送方将消息放入队列后即可继续执行其他操作,而不需要等待接收方的处理。这提高了系统的响应性,特别是在处理大量请求或在网络延迟较高的情况下。
- 流量削峰: 消息队列可以用于缓冲和平滑处理系统的流量,防止突发的高峰值请求对系统造成压力。通过控制消息的消费速率,可以有效地削平流量峰值。
常见消息队列对比
常见的消息队列系统包括RocketMQ、RabbitMQ和Kafka,它们在一些方面有一些共同点,但也有一些明显的区别。以下是它们的一些对比:
- 消息传递模型:
- RocketMQ: 提供点对点和发布/订阅两种消息传递模型。
- RabbitMQ: 同样支持点对点和发布/订阅模型,非常灵活。
- Kafka: 主要支持发布/订阅模型。
- 性能:
- RocketMQ: 注重高吞吐量和低延迟,适合实时消息处理。
- RabbitMQ: 相对较低的延迟,适用于一些对延迟敏感的场景。
- Kafka: 专注于高吞吐量和持久性,适用于大规模数据处理。
- 可靠性:
- RocketMQ: 提供了较好的消息可靠性,支持同步和异步复制。
- RabbitMQ: 可以配置为提供高可靠性,但需要在配置上更多的努力。
- Kafka: 通过分区和副本机制来确保消息的可靠性。
- 水平扩展性:
- RocketMQ: 支持较好的水平扩展性,可以通过添加节点来增加容量。
- RabbitMQ: 可以通过集群来实现水平扩展,但可能相对复杂。
- Kafka: 具有出色的水平扩展性,适用于大规模数据流处理。
- 使用场景:
- RocketMQ: 适用于大规模实时消息处理,例如电商交易系统。
- RabbitMQ: 适用于传统企业应用,也可以用于实时数据处理。
- Kafka: 适用于大规模数据流处理,如日志收集、事件流处理等。
RocketMQ
架构设计
- 生产者(Producer):消息的发送者,负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。
- 消费者(Consumer):消息接收者,负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。
- 消息服务器(Broker):暂存和传输消息;是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。
- 名称服务器(NameServer):管理Broker,用来保存 Broker 相关 Topic 等元信息并给 Producer ,提供 Consumer 查找 Broker 信息。
- 主题(Topic):区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者 可以订阅一个或者多个Topic消息
- 消息队列(Message Queue):相当于是Topic的分区;用于并行发送和接收消息
- 标签(Tag): 用于对消息进行二级分类。一个 Topic 可以有多个 Tag,用于更细粒度地过滤消息
启动流程

- 启动 NameServer,NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。
- 启动 Broker,Broker启动后与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
- 创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
- 生产者发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker发消息。
- 消费者接受消息,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,然后开始消费消息。
特点
顺序消费
顺序消息是 RocketMQ 提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的 Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
分区顺序消息
对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
- 适用场景:适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
- 示例:
- 用户注册需要发送验证码,以用户 ID 作为 Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
- 电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
全局顺序消息
对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
- 适用场景:适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
- 示例:
- 在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。
- 全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同。因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。
消息可靠性
保证消息可靠性也就是保证消息一定会被消费,不会丢失,而消息丢失可能会在这三个阶段发生:生产阶段、存储阶段、消费阶段
- 生产阶段:请求确认机制
- 同步发送的时候,要注意处理响应结果和异常。如果返回响应OK,表示消息成功发送到了Broker,如果响应失败,或者发生其它异常,都应该重试。
- 异步发送的时候,应该在回调方法里检查,如果发送失败或者异常,都应该进行重试。
- 如果发生超时的情况,也可以通过查询日志的API,来检查是否在Broker存储成功。
- 存储阶段:通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,也就是进行数据持久化
- 消息只要持久化到CommitLog(日志文件)中,即使Broker宕机,未消费的消息也能重新恢复再消费。
- Broker的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在pagecache中(内存中),但是同步刷盘更可靠,它是Producer发送消息后等数据持久化到磁盘之后再返回响应给Producer。
- Broker通过主从模式来保证高可用,Broker支持Master和Slave同步复制、Master和Slave异步复制模式,生产者的消息都是发送给Master,但是消费既可以从Master消费,也可以从Slave消费。同步复制模式可以保证即使Master宕机,消息肯定在Slave中有备份,保证了消息不会丢失。
- 消费阶段
- Consumer保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
死信队列
- 死信队列用于处理无法被正常消费的消息,即死信消息。
- 当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列。
- 死信消息的特点:
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,需要在死信消息产生后的 3 天内及时处理。
- 死信队列的特点:
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
- 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
- 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
- RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。
延迟队列
RocketMQ是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:
1 | // 实例化一个生产者来产生延时消息 |
但是目前RocketMQ支持的延时级别是有限的:
1 | private String messageDelayLevel = |
RocketMQ通过临时存储+定时任务来实现延时消息
Broker收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX)的相应时间段的Message Queue中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标Topic的队列中,然后消费者就可以正常消费这些消息。
常见问题解决
消息幂等性
对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的“有且仅有一次” 。RocketMQ择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
处理消息重复问题,主要由业务端保证,主要的方式有两种:业务幂等和消息去重
- 业务幂等:保证消费逻辑的幂等性,也就是多次调用和一次调用的效果是一样的。这样一来,不管消息消费多少次,对业务都没有影响。
- 消息去重:业务端对重复的消息就不再消费了。这种方法,需要保证每条消息都有一个唯一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性。
基于以下两个方向思考: - 接口只允许调用一次,比如mysql唯一索引,基于redis分布式锁机制
- 对数据的影响只会触发一次,比如乐观锁或状态机(数据的状态信息是向前变化的,因此如果数据的当前状态小于记录的状态,就可以忽略这次修改的信息)
消息积压
事前处理:上线之前对流量有个预估,压测得出消费者的消费能力上限,再根据实际情况进行部署以支撑整个服务
事中处理:先保证服务,进行消费者的临时扩容操作
- 消费者扩容:
- 如果当前Topic的Message Queue的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。
- 消息迁移Queue扩容:
- 如果当前Topic的Message Queue的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容Message Queue。可以新建一个临时的Topic,临时的Topic多设置一些Message Queue,然后先用一些消费者把消费的数据丢到临时的Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的Topic里的数据,消费完了之后,恢复原状。
事后处理:
- 如果当前Topic的Message Queue的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容Message Queue。可以新建一个临时的Topic,临时的Topic多设置一些Message Queue,然后先用一些消费者把消费的数据丢到临时的Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的Topic里的数据,消费完了之后,恢复原状。
- 提高并行消费度,如:合理设置消费者组;增加队列分区,考虑把消息分散到多个队列中,避免单个队列出现积压
- 批量方式消费,提高消费吞吐量
- 调整消息处理的优先级:根据消息的重要性和紧急程度,调整消息处理的优先级。优先处理重要的消息,确保关键业务的及时性,而对于非关键的消息可以进行降级处理、跳过、或延后处理。
- 优化每条消息的消费过程(业务)
5. 合理设置超时机制 - 数据清理和重试:定期清理过期或无效信息
- 扩容:若MQ出现性能瓶颈,对MQ扩容
- 增加消费者数量:增加消费能力
消息过滤
- 在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。
- 在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。
- 一般采用Cosumer端过滤,如果希望提高吞吐量,可以采用Broker过滤。
- 对消息的过滤有三种方式:
- 根据Tag过滤:高效简单
- SQL 表达式过滤:更加灵活
- Filter Server 方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤
高可用(消息持久化)
- RocketMQ的高可用主要是在体现在Broker的读和写的高可用,Broker的高可用是通过主从集群实现的
- Broker可以配置两种角色:Master和Slave
- Master角色的Broker支持读和写
- Slave角色的Broker只支持读
- Master会向Slave同步消息。
- Producer只能向Master角色的Broker写入消息,Cosumer可以从Master和Slave角色的Broker读取消息。
- 读的高可用:自动切换
- Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。
- 写的高可用:多Master
- 在创建 Topic 的时候,把 Topic 的多个Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,其他组Master 仍然可用, Producer 仍然可以发送消息
- RocketMQ 目前还不支持把Slave自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker。