赞
踩
MQ全称为Message Queue,即消息队列 ,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生 产、存储、消费全过程的软件系统,遵循FIFO原则。
为什么要使用RocketMQ,我们先来看一个天府通刷地铁出站的业务场景
【注意】假如天府通出站API 和 支付系统 是不同的子系统 (两个Tomcat远程通信)
上下班高峰期使用天府通刷码的人非常多,以为做并发量很高,一个出站请求到后台需要做费用结算,或者积分赠送等业务。由于并发很高,并且费用结算和积分等业务本来就耗时,况且支付服务也不一定能承担那么大的请求量。
当服务器线程耗尽,后续请求会等待变慢,再加上高并发请求就会导致后续请求越来越慢,请求长时间等待,导致大量请求超时。并发太高,可能会导致服务器的内存上升,CPU使用率急速上升,甚至导致服务器宕掉。
解决方案:使用MQ消峰,效果如下
加入MQ后的效果
高并发请求在MQ中排队,达到了消除峰值的目的,不会有大量的请求同时怼到支付系统
服务异步调用,“天府通出站API” 把结算消息放入MQ就可以返回“出站成功,费用稍后结算”给用户,响应时间很快
服务彻底解耦,即使支付服务挂掉,也不影响“天府通出站API”正常工作,当支付系统再启动仍然可以继续消费MQ中的消息。
消除峰值
MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统 被压垮。
异步&解耦
上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。 而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。 即使消费者挂掉也不影响生产者工作,只要把消息放入队列即可,消费者重启后自己消费即可。
数据收集
分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或 批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此 类数据收集是最好的选择。
大数据处理
比如我们的平台向“三方平台”获取数据,一次请求了大量数据回来要进行处理,由于数据较多处理不过来,那么就可以放入MQ,再创建一些消费者进行数据处理即可。
【注意
】如下情况不太适合MQ
提高系统响应速度-异步执行
任务异步处理。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
提高系统稳定性-消峰,解耦
一是并发被消峰后,系统不容易被高并发打垮,二是系统挂了也没关系,操作内容放到消息队列不丢失,后续重新消费者一样能消费做业务处理。
排序保证 FIFO
遵循队列先进先出的特点,能够保证消息按照添加的数据被消费。
ActiveMQ
ActiveMQ是使用Java语言开发一款MQ产品。早期很多公司与项目中都在使用。但现在的社区活跃度已 经很低。现在的项目中已经很少使用了。
RabbitMQ
RabbitMQ是使用ErLang语言开发的一款MQ产品。其吞吐量较Kafka与RocketMQ要低,且由于其不是 Java语言开发,所以公司内部对其实现定制化开发难度较大。
Kafka
Kafka是使用Scala/Java语言开发的一款MQ产品。其最大的特点就是高吞吐率,常用于大数据领域的实 时计算、日志采集等场景。其没有遵循任何常见的MQ协议,而是使用自研协议。对于Spring Cloud Netç ix,其仅支持RabbitMQ与Kafka。
RocketMQ
RocketMQ是使用Java语言开发的一款MQ产品。经过数年阿里双11的考验,性能与稳定性非常高。其 没有遵循任何常见的MQ协议,而是使用自研协议。对于Spring Cloud Alibaba,其支持RabbitMQ、 Kafka,但提倡使用RocketMQ
下面是MQ的对比图:
技术选型建议:
RocketMQ是一个统一消息引擎、轻量级数据处理平台。
RocketMQ是⼀款阿⾥巴巴开源的消息中间件,双十一承载了万亿级消息的流转,2016年11⽉,阿⾥巴巴向 Apache 软件基⾦会捐赠 RocketMQ,成为 Apache 孵化项⽬,2017 年 9 ⽉ ,Apache 宣布 RocketMQ孵化成为 Apache 顶级项⽬(TLP )成为国内⾸个互联⽹中间件在 Apache 上的顶级项⽬。
支持集群模型、负载均衡、水平扩展能力
亿级别消息堆积能力
采用零拷贝的原理,顺序写盘,随机读 -nio,零拷贝
底层通信框架采用Netty NIO - nio通讯模型—Netty -redis-kafaka/rocketmq
NameServer代替Zookeeper,实现服务寻址和服务协调
消息失败重试机制、消息可查询
强调集群无单点,可扩展,任意一点高可用,水平可扩展
经过多次双十一的考验
下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.2.0/
下载后解压
Bin : 可执行文件目录
config:配置文件目录
Lib : 依赖库,一堆Jar包
解压压缩包,配置 ROCKETMQ_HOME
Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行 start mqnamesrv.cmd
,启动NameServer。
成功后会弹出提示框,此框勿关闭。
进入至‘MQ文件夹\bin’下,修改Bean目录下的 runbroker.cmd
中JVM占用内存大小
CMD执行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
,启动Broker。
成功后会弹出提示框,此框勿关闭
RabbitMQ安装好之后会在用户目录下产生一个store目录用来存储相关数据:
[问
]RocketMQ数据存储在磁盘会影响性能吗?
不会,RocketMQ的性能在所有的MQ中是比较高的,主要是因为RocketMQ使用了mmap零拷贝技术,consumequeue中的数据是顺序存放的,还引入了PageCache的预读取机制,使得对 consumequeue文件的读取几乎接近于内存读取,即使在有消息堆积情况下也不会影响性能。
为了方便管理,我们需要安装一个可视化插件
RocketMQ可视化管理插件下载地址:https://github.com/apache/rocketmq-externals/releases
解压后,修改配置:src/main/resource/application.properties ,这里需要指向Name Server 的地址和端口 如下:
回到安装目录,执行: mvn clean package -Dmaven.test.skip=true
,然后会在target目录生成打包后的jar文件
进入 target 目录,执行 java -jar rocketmq-console-ng-1.0.0.jar , 访问 http://localhost:8080
RocketMQ开发官方文档:
https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
RocketMQ的集群架构如下
RocketMQ架构上主要分为四部分,如上图所示
消息发布的角色,支持分布式集群方式部署。Producer通过nameserver的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时 也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
Broker主要负责消息的存储、投递和查询以及服务高可用保证。
NameServer是一个Broker与Topic路由的注册中心支持Broker的动态注册与发现主要包括两个功能
Broker管理
NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活。
路由信息管理
每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费
官方案例:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
注意和安装的MQ版本一致
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
步骤分析
创建producer组
设置NameServer地址
startr生产者
发送消息获取结果
结束producer
//消息发送者
public class ProducerTest {
public static void main(String[] args) {
try {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("producergroup");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
//构建消息
Message message = new Message("topic_log","tags_error",("我是消息"+i).getBytes());
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}catch (Exception e){
e.printStackTrace();
}
}
}
代码解释:
DefaultMQProducer : MQ生产者 , 可以指定组名 producerGroupName
producer.setNamesrvAddr : 指定Name Server地址,用作Brocker发现。注意IP和启动name server服务时指定的IP保持一致。
producer.start() : 启动 生产者
new Message(“topic_log”,“tags_error”,(“我是消息”+i).getBytes()) :消息,参数为:topic,tags,内容
producer.send(message) : 发送消息
SendResult :发送结果,其中包含
创建consumer组
设置Name Server地址
设置消费位置,从最开始销毁
设置消息回调处理监听 -> 处理消息
Start consumer
//消息发送者
public class ConsumerTest {
public static void main(String[] args) {
try {
// 实例化消息生产者Producer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("consumergroup");
// 设置NameServer的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//从最开始的位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
//和发送者保持一致才能搜到消息
consumer.subscribe("topic_log", "tags_error");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s 成功搜到消息: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Producer实例
consumer.start();
}catch (Exception e){
e.printStackTrace();
}
}
}
为了增强Broker性能与吞吐量,Broker一般都是以集群形式出现的。各集群节点中可能存放着相同Topic的不同Queue。
不过,这里有个问题,如果某Broker节点宕机,如何保证数据不丢失呢?其解决方案是,将每个Broker集群节点进行横向扩展,即将Broker节点再建为一个HA集群,解决单点问题。
Broker节点集群是一个主从集群,即集群中具有Master与Slave两种角色。Master负责处理读写操作请求,Slave负责对Master中的数据进行备份。当Master挂掉了,Slave则会自动切换为Master去工作。所以这个Broker集群是主备集群。Consumer既可以从Master订阅消息,也可以从Slave订阅消息
一个Master可以包含多个Slave,但一个Slave只能隶属于一个Master。 Maste与Slave 的对应关系是通过指定相同的BrokerName、不同的BrokerId 来确定的。BrokerId为0表示Master非0表示Slave。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
topicid | broker | 队列 |
---|---|---|
topic_log | 1 | 队列1,队列2 |
topic_log | 2 | 队列3,队列4 |
Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息
RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送
。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息。
Producer会使用一定的算法(随机轮询+规避故障)选择把消息发送到哪个master的某个queue中。
Consumer 支持两种消费形式:拉取式消费、推动式消费。(主动,被动),RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息,不同的 Consumer Group可以消费同一个Topic。
一个Consumer Group内的Consumer可以消费多个Topic的消息。
[注意] 集群模式:一个Queue是不能被同一个ConsumerGroup中的多个Consumer消费的,目的是减少资源竞争提升整体性能。
Topic表示一类消息的集合,每个topic主题包含若干条message消息,每条message消息只能属于一个topic主题,Topic是RocketMQ进行消息订阅的基本单位。
消息是指消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主 题。
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。Topic是消息的一级分类,Tag是消息的二级分类
一个Topic中可以包含多个Queue,一 个Topic的Queue也被称为一个Topic中消息的分区(Partition)。
在集群模式下, 在一个Consumer Group内,一个Queue最多只能分配给一个Consumer,一个Cosumer可以分配得到多个Queue。这样的分配规则,每个Queue只有一个消费者,可以避免消费过程中的多线程处理和资源锁定,有效提高各Consumer消费的并行度和处理效率。
消费者组中Consumer的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量,则多出的 Consumer将不能消费消息。如果一个Consmer挂了,该Consumer Group中的其它Consumer可以接着消费原Consumer消费的Queue。
【注意】 一个Topic可以对应多个消费者 ,一个Queue只能对应一个组中的一个消费者。
【注意】为了防止消息紊乱,一个Consumer Group 中的Consumer都是订阅相同Topic下的Queue。
在广播模式下一个队列要可以被多个消费者对应.在广播模式下,同一个 ConsumerGroup 中的每个 Consumer 监听全部的队列。需要注意的是,广播模式下因为每个 Consumer 实例都需要处理全部的消息,因此这种模式仅推荐在**通知推送、配置同步类小流量场景使用。
读写队列
Queue分为 写队列(对于生产者来说是写) 和 读队列(对于消费者来说) ,默认创建数量是都是4 ,这个读写队列是从逻辑上进行划分在物理上读/写是一个队列,Producer发送的消息进入写队列 ,Consumer从读队列获取数据,一半情况下读写队列数量是一样的。
可以通过可视化界面修改Topic中的队列数量
perm用于设置对当前创建Topic的操作权限:2表示只写,4表示只读,6表示读写。
RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。 不过需要注意的是,MessageId有两个:在生产者send()消息时会自动生成一个MessageId(msgId),
当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识。
当消费者数量或者Queue的数量修改,Rebalance是把⼀个Topic下的多个Queue重新分配给Consumer Group下的Consumer。目的是增加消费能力。
由于一个队列值分配给一个Consumer,那么当Consumer Group中的消费者数量大于队列数量,那么多出来的Consumer分配不到队列。
消息的消费分为:拉取式 pull ,和推送是 push
Pull:拉取式,需要消费者间隔一定时间就去遍历关联的Queue,实时性差但是便于应用控制消息的拉取
Push:推送式,封装了Queue的遍历,实时性强,但是对系统资源占用比较多。
广播模式(一对多):同一个Consumer Group 下的所有Consumer都会受到同一个Topic的所有消息。同一个消息可能会被消费多次。
集群模式(一对一):同一个Gonsumer Group 下的Consumer平分同一个Topic下的消息。同一个消息只是被消费一次。
Queue是如何分配给Consumer的,这对应了四种算法:平均分配策略,环形平均策略,一致性Hash策略,同机房策略。
平均分配【默认】:根据 qeueuCount (4)/ consumerCount (3)作为每个消费者平均分配数量,如果多出来的queue就再依次逐个分配给Consumer。
环形平均策略:根据消费者的顺序,一个一个的分配Queue即可类似于发扑克牌。
一致性Hash策略 : 该算法将Consumer的Hash值作为节点放到Hash环上,然后将Queue的hash值也放入Hash环上,通过顺时针进行就近分配。
同机房策略:该算法会根据queue的部署机房位置和consumer的位置,过滤出当前consumer相同机房的queue。然后按照平均分配策略或环形平均策略对同机房queue进行分配。如果没有同机房queue,则按照平均分配策略或环形平均策略对所有queue进行分配。
平均分配性能比较高,一致性Hash性能不高,但是能减少Rebalance,如果Consumer数量变动频繁可以使用一致性Hash。
RockertMQ通过Offset来维护Consumer的消费进度,比如:消费者从哪个位置开始持续消费消息的?这里有三个枚举来指定从什么位置消费
CONSUME_FROM_LAST_OFFSET:从queue的最后一条消息开始消费
CONSUME_FROM_FIRST_OFFSET:从queue的第一条消息开始消费
CONSUME_FROM_TIMESTAMP:从某个时间戳位置的消息开始消费
。消费者消费结束之后,会向Consumer会提交其消费进度offset给Broker。Offset信息的存储分为本地 Offset管理 和远程Offset管理
Offset的同步提交与异步提交: 集群消费模式下,Consumer消费完消息后会向Broker提交消费进度offset,其提交方式分为两种:
消息不会被单独清理,消息是顺序存储到commitlog的,消息是以commitlog为单位进行清理,RocketMQ有自己的清理规则,默认是72小时候后进行清理
到达时间清理点,自动清理过期的文件(凌晨4点)
磁盘空间使用率达到了过期清理阈值(75%),自动清理过期的文件。
磁盘占用率达到清理阈值(85%),开始按照设定的规则清理文件,从老的文件开始。
磁盘占用率达到系统危险阈值(90%),拒绝写入数据。
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>
生产者
package cn.ronghuanet._01hello;
import io.netty.util.CharsetUtil;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* 生产者
*/
public class SenderTest {
//演示消息同步发送
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//生产者
DefaultMQProducer producer = new DefaultMQProducer("hello-producerGroup");
//设置name server地址
producer.setNamesrvAddr("127.0.0.1:9876");
//设置队列数量为2,有4个,根据情况设置
producer.setDefaultTopicQueueNums(2);
//启动
producer.start();
// for (int i = 0 ; i < 16 ; i++){
Message message = new Message();
//消息主题
message.setTopic("hello-topic");
//消息标签
message.setTags("sms");
//添加内容
message.setBody(("我是消息").getBytes(CharsetUtil.UTF_8));
//执行发送
SendResult result = producer.send(message);
//打印结果
System.out.println(result);
// }
producer.shutdown();
}
}
package cn.ronghuanet._01hello;
import io.netty.util.CharsetUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
//消息发送者
public class ConsumerTest {
public static void main(String[] args) {
try {
// 实例化消息生产者Producer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer
("hello-consumergroup");
// 设置NameServer的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置消费模式
//consumer.setMessageModel(MessageModel.CLUSTERING); //默认是集群
consumer.setMessageModel(MessageModel.BROADCASTING); //默认是集群
//从最开始的位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
//和发送者保持一致才能搜到消息
consumer.subscribe("hello-topic", "sms");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
msgs.forEach(message->{
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});
//System.out.printf("%s 成功搜到消息: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费 ack机制
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Producer实例
consumer.start();
}catch (Exception e){
e.printStackTrace();
}
}
}
运行两个consumer,分别测试集群和广播消费模式
同步消息是发送者发送消息,需要等待结果的返回,才能继续发送第二条消息,这是一种阻塞式模型,虽然消息可靠性高,但是阻塞导致性能低。API : SendResult result = producer.send(message); 代码示例:
发送者代码
public class Producer {
//演示消息同步发送
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//生产者
DefaultMQProducer producer = new DefaultMQProducer("syn-producerGroup");
//设置name server地址
producer.setNamesrvAddr("127.0.0.1:9876");
//设置队列数量为2,默认为4,根据情况设置
producer.setDefaultTopicQueueNums(2);
//启动
producer.start();
for (int i = 0 ; i < 16 ; i++){
Message message = new Message();
//消息主题
message.setTopic("syn-topic");
//消息标签
message.setTags("sms");
//添加内容
message.setBody((i+"我是消息").getBytes(CharsetUtil.UTF_8));
//执行发送
SendResult result = producer.send(message);
//打印结果
System.out.println(result);
}
producer.shutdown();
} }
同步发送使用 SendResult result = producer.send(message); 方法即可,马上可以拿到返回值。SendResult 结果如下
SendResult [
sendStatus=SEND_OK, msgId=C0A8006516B018B4AAC270EF9D940000,offsetMsgId=C0A8006500002A9F0000000000008E1C,
messageQueue=MessageQueue [topic=syn-topic, brokerName=LAPTOP-20VLGCRC, queueId=3], queueOffset=0]
SendStatus : 状态OK
msgId: 发送者生成的ID
OffsetMsgId : 由Broker生成的消息ID
MessageQueue :队列信息
异步消息是发送者发送消息,无需等待发送结果就可以再发送第二条消息,它是通过回调的方式来获取到消息的发送结果,消息可靠性高,性能也高。API : producer.send(message,SendCallback) 示例代码:
. . . 省略. . .
producer.send(
//创建消息对象
new Message("asyn-topic", "sms", "我是消息".getBytes(CharsetUtil.UTF_8)),
//添加发送回调
new SendCallback() {
//发送成功结果处理
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
//发送异常结果处理
@Override
public void onException(Throwable throwable) {
System.out.println("发送异常:"+throwable.getMessage());
}
}
);
SendCallback 是消息发送结果回调。如果:sendResult.getSendStatus() == SendStatus.SEND_OK 表示成功
这种方式指的是发送者发送消息后无需等待Broker的结果返回,Broker也不会返回结果,该方式性能最高,但是消息可靠性低。API : producer.sendOneway(message) 示例代码:
... 省略...
Message message = new Message("asyn-topic", "sms", "我是消息".getBytes(CharsetUtil.UTF_8));
producer.sendOneway(message);
sendOneway 单向发送是没有返回结果值的。
下面对三种发送方式做一个对比
可靠性最高: 同步发送 > 异步发送 > 单向发送
性能最高:单向发送 > 异步发送 > 同步发送
使用场景建议如下
如果是比较重要的不可丢失的消息,且对时效性要去不高建议使用同步发送,如转账消息
如果是不重要的可失败的消息,比如日志消息,建议使用单向发送
如果对时效性要求比较高,且消息不能丢失,可以尝试使用异步发送
在某些业务场景下是需要消息按照顺序进行消费,比如一个账户的加钱,减钱的动作必须按照时间先后去执行,否则就会发生金额不够导致操作失败。
按照发送的顺序进行消费就是顺序消息,遵循(FIFO), 默认生产者以Round Robin轮询方式把消息发送到不同的Queue分区队列;消费者从多个队列中消费消息,这种情况没法保证顺序。
全局有序是一个topic下的所有消息都要保证顺序,如果要保证消息全局顺序消费,就需要保证使用一个队列存放消息,一个消费者从这一个队列消费消息就能保证顺序,即:单线程执行,可以通过 producer.setDefaultTopicQueueNums(1);来指定队列数量。
可以通过代码指定创建1个队列即可
producer.setDefaultTopicQueueNums(1);
使用一个线程,一次只拉取一个消息 , 使用 MessageListenerOrderly 有序的消费消息。
//最大线程1个
defaultMQPushConsumer.setConsumeThreadMax(1);
defaultMQPushConsumer.setConsumeThreadMin(1);
//同时只拉取一个消息
defaultMQPushConsumer.setPullBatchSize(1);
...省略...
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
list.forEach(message->{
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});
return ConsumeOrderlyStatus.SUCCESS;
}
});
...省略...
还有一种就是分区有序或者部分有序,部分顺序消息只要保证某一组消息被顺序消费,即:只需要保证一个队列中的消息有序消费即可。
比如:保证同一个订单ID的生成、付款、发货消息按照顺序消费即可实现原理:
把同一个订单ID的消息放入同一个MessageQueue
保证这个MessageQueue只有一个消费者不被并发处理 ,这个需要使用到 MessageQueueSelector 来保证同一个订单的消息在同一个队列
//演示消息同步发送
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//生产者
DefaultMQProducer producer = new DefaultMQProducer("syn-producerGroup");
//设置name server地址
producer.setNamesrvAddr("127.0.0.1:9876");
//发送消息超时时间
producer.setSendMsgTimeout(1000);
//启动
producer.start();
for (long i = 0 ; i < 4 ; i++){
Order order = new Order(i,"订单"+i,"创建");
//添加内容
byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
Message message = new Message("topic-order","product-order",bytes);
message.setKeys("key-"+i);
//执行发送
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
//使用取模算法确定id存放到哪个队列
int index =(int) (id % mqs.size());
//index就是要存放的队列的索引
return mqs.get(index);
}
//把订单ID作为参数,作为选择器的基础数据
},order.getId());
System.out.println(result);
//====================================================================
order.setStatus("支付");
//添加内容
bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
message = new Message("topic-order","product-order",bytes);
message.setKeys("key-"+i);
//执行发送
result = producer.send(message,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
//使用取模算法确定id存放到哪个队列
int index =(int) (id % mqs.size());
//index就是要存放的队列的索引
return mqs.get(index);
}
},order.getId());
System.out.println(result);
//====================================================================
order.setStatus("发货");
//添加内容
bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
message = new Message("topic-order","product-order",bytes);
message.setKeys("key-"+i);
//执行发送
result = producer.send(message,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
//使用取模算法确定id存放到哪个队列
int index =(int) (id % mqs.size());
//index就是要存放的队列的索引
return mqs.get(index);
}
},order.getId());
System.out.println(result);
//打印结果
}
producer.shutdown();
}
public class Consumer {
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("syn-consumerGroup");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//从开始位置消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅
defaultMQPushConsumer.subscribe("topic-order","product-order");
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
list.forEach(message->{
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});
return ConsumeOrderlyStatus.SUCCESS;
}
});
defaultMQPushConsumer.start();
}
}
我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费性能。我们可以考虑使用延迟消息来解决。
延迟消息即:把消息写到Broker后需要延迟一定时间才能被消费 , 在RocketMQ中消息的延迟时间不能任意指定,而是由特定的等级(1 到 18)来指定,分别有:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
可以通过修改配置来增加级别,比如在mq安装目录的 broker.conf 文件中增加
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 2d 这个时候总共就有19个level。
下面是延迟消息内部工作流程图
RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;如果有就走下面的流程
修改消息Topic的名字为SCHEDULE_TOPIC_XXXX
根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId
目录与consumequeue文件
Commit Log Offset:记录在CommitLog中的位置。
Size:记录消息的大小
Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。
将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
Broker内部有⼀个延迟消息服务类ScheuleMessageService,根据延迟级别数,创建对应数量的定时器Timer,定时消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。
在将消息到期后,队列的Level等级改为0,作为一条普通消息,投递到目标Topic。
只需要一处改动,发送者通过 message.setDelayTimeLevel(3); 设置延迟级别即可
public class Producer {
//演示消息同步发送
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//生产者
DefaultMQProducer producer = new DefaultMQProducer("syn-producerGroup-delay");
//设置name server地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动
producer.start();
for (long i = 0 ; i < 4 ; i++){
Order order = new Order(i,"订单"+i,"创建");
//添加内容
byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
Message message = new Message("topic-order-delay","product-order-delay",bytes);
//延迟级别 3,代表 10s延迟
message.setDelayTimeLevel(3);
message.setKeys("key-"+i);
//执行发送
SendResult result = producer.send(message);
System.out.println("发送时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
System.out.println(result);
}
producer.shutdown();
}
}
public class Consumer {
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("syn-consumerGroup-delay");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//从开始位置消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅
defaultMQPushConsumer.subscribe("topic-order-delay","product-order-delay");
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(message->{
System.out.println("消费时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start();
}
}
如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。
分布式事物的解决方案有很多,如:2PC,TCC,最终一致性,最大努力通知等等。这里要介绍的是基于RocketMQ事务消息的最终一致性方案,下面举个例子。
用户注册成功,向用户数据库保存用户信息,同时通过远程调用积分服务为用户赠送积分,模型如下:
我们需要使用分布式事务管理实现用户数据库和积分数据库的一致性。即:用户保存成功,用户的积分也要保存成功,或者都回滚不做任何存储。这种业务场景可以选择2PC强一致性方案,也可以选择最终一致性。我们选择最终一致性,因为用户注册成功,不要求马上赠送积分,延迟一定时间后再赠送成功也是允许的。所以有了如下模型
事务流程
1、用户服务(事务发起方)往MQ中发送一个事务消息,
MQ返回结果是否发送成功
用户服务受到消息发送成功结果,保存用户数据,提交本地事务
积分服务拿到MQ中的事务消息
积分服务保存积分到数据库
事务流程中的最大的难点就是如何保证事务消息发送和本地事务的原子性,即:第一步和第二步要么都成功,要么都失败,不能说消息发送成功了,结果用户保存失败了,那么积分服务可能会增加成功,就导致数据不一致。RocketMQ已经帮我们处理好这个问题。它的工作原理如下:
事务发起方,即用户服务会先向broker发送一个prepare“半事务消息”(一个并不完整的消息)到RMQ_SYS_TRANS_HALF_TOPIC的queue中, 该消息对消费者不可见。
MQ会返回一个ACK确认消息发送成功或者失败
消息发送成功,用户服务执行保存用户操作,提交本地事务,并根据本地事务的执行结果来决定半消息的提交状态为提交或者回滚
本地事务提交成功,事务发起方即用户服务会向broker再次发起“结束半事务”消息请求,commit或者rollback指令
broker端收到请求后,首先从RMQ_SYS_TRANS_HALF_TOPIC的queue中查出该消息,设置为完成状态。如果消息状态为提交,则把半消息从RMQ_SYS_TRANS_HALF_TOPIC队列中复制到这个消息原始topic的queue中去(之后这条消息就能被正常消费了);如果消息状态为回滚,则什么也不做。
Producer发送的半消息结束请求是oneway的,也就是发送后就不管了,只靠这个是无法保证半消息一定被提交的(比如未执行第4步),rocketMq提供了一个兜底方案,这个方案叫消息反查机制,Broker启动时,会启动一个TransactionalMessageCheckService任务,该任务会定时从半消息队列中读出所有超时未完成的半消息,针对每条未完成的消息,Broker会给对应的Producer发送一个消息反查请求,根据反查结果来决定这个半消息是需要提交还是回滚,或者后面继续来反查
consumer(本例中指积分系统)消费消息,执行本地数据变更,提交本地事务
我们需要做什么
编写本地事务检查监听TransactionListener ,一是执行本地事务逻辑,二是返回本地事务执行状态
发消息时生产者需要设置producer.setTransactionListener 事务监听
public class MyTransactionCheckListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//执行业务,保存本地事务
//保存成功
return LocalTransactionState.ROLLBACK_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//这里查询本地事务状态
return LocalTransactionState.COMMIT_MESSAGE;
}
}
public class TransationSender {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer("tran-product-group");
producer.setNamesrvAddr("127.0.0.1:9876");
//线程池
ExecutorService excutorService = Executors.newFixedThreadPool(20);
producer.setExecutorService(excutorService);
producer.setTransactionListener(new MyTransactionCheckListener());
//设置事务消息监听
producer.start();
for(int i = 0 ; i < 10 ; i++){
String orderId = UUID.randomUUID().toString();
String tags = "Tag";
Message message = new Message("topic-tran", "tag", orderId, ("下单:"+i).getBytes(CharsetUtil.UTF_8));
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
System.out.println(transactionSendResult);
}
producer.shutdown();
}
}
正常消费
public class TransationConsumer {
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("trans-consumer-group");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//订阅
defaultMQPushConsumer.subscribe("topic-tran", "tag");
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(message->{
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start();
}
}
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB,如果超过可以有2种处理方案:
将消息进行切割成多个小于4M的内容进行发送
修改4M的限制改成更大
可以设置Producer的maxMessageSize属性
修改配置文件中的maxMessageSize属性
对于消费者而言Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列 表,但默认情况下每次只能消费一条消息,可以通过:Consumer的pullBatchSize属性设置消息拉取数量(默认32),可以通过设置consumeMessageBatchMaxSize属性设置消息一次消费数量(默认1)。
[注意]:pullBatchSize 和 consumeMessageBatchMaxSize并不是设置越大越好,一次拉取数据量太大会导致长时间等待,性能降低。而且消息处理失败同一批消息都会失败,然后进行重试,导致消费时长增加。增加没必要的重试次数。
我们需要做什么
定义消息切割器切割消息
发送消息把消息切割之后,进行多次批量发送
把消息按照4M切成多份,支持可迭代
//消息切割器,按照4M大小写个
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(currIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
return tmpSize;
}
}
public class BatchProducer {
//演示消息同步发送
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//生产者
DefaultMQProducer producer = new DefaultMQProducer("batch-producerGroup");
//设置name server地址
producer.setNamesrvAddr("127.0.0.1:9876");
//设置最大消息大小,默认4M
producer.setMaxMessageSize(1024 * 1024 * 4);
//启动
producer.start();
//===========准备消息==========================================================
List<Message> messages = new ArrayList<>();
for (long i = 0 ; i < 10000 ; i++){
//添加内容
byte[] bytes = ("批量消息".getBytes(CharsetUtil.UTF_8));
Message message = new Message("topic-order-batch","product-order-batch",bytes);
message.setKeys("key-"+i);
messages.add(message);
}
//===========切割消息==========================================================
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
//发送消息
SendResult sendResult = producer.send(listItem);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
producer.shutdown();
}
}
我们要做什么
public class BatchConsumer {
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("batch-consumerGroup");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//从开始位置消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//批量拉取消息数量,默认32
defaultMQPushConsumer.setPullBatchSize(32);
//每次消费条数,默认1
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);
//订阅
defaultMQPushConsumer.subscribe("topic-order-batch","product-order-batch");
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(message->{
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start();
}
}
消息过滤包括 tags过滤法sql过滤,消费者在消费消息的时候可以通过:Consumer.subscribe(topic,tags) 来指定要消费的消息,如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。或者使用“*”来消费某Topic主题下的所有tags消息。如:
Consumer.subscribe(“topic”,”taga || tagb || tagc”)
除此之外RocketMQ还支持使用SQL进行消息过滤,这种方式可以实现对消息的复杂过滤。SQL过滤表达式中支持多种常量类型与运算符。
支持的常量类型:
数值:比如:123,3.1415
字符:必须用单引号包裹起来,比如:‘abc’
布尔:TRUE 或 FALSE
NULL:特殊的常量,表示空
支持的运算符有:
数值比较:>,>=,<,<=,BETWEEN,=
字符比较:=,<>,IN
逻辑运算 :AND,OR,NOT
NULL判断:IS NULL 或者 IS NOT NULL
不过,只有使用PUSH模式的消费者才能使用SQL过滤。API如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件conf/broker.conf中添加如下属性,以开启该功能:enablePropertyFilter = true
发送消息时,你能通过putUserProperty来设置消息的属性
...省略...
//添加内容
byte[] bytes = (JSON.toJSONString("消息")).getBytes(CharsetUtil.UTF_8);
Message message = new Message("topic-order-filter","product-order-filter",bytes);
//添加一个用户属性,用来作为过滤条件
message.putUserProperty("sex",(i % 2)+"");
message.setKeys("key-"+i);
//执行发送
SendResult result = producer.send(message);
...省略...
通过:consumer.subscribe(“topic”, MessageSelector.bySql(" sql 条件")); 来过滤SQL
...省略...
//订阅
//defaultMQPushConsumer.subscribe("topic-order-filter","product-order-filter || tagb");
//只是消费 sex为 0的消息
defaultMQPushConsumer.subscribe("topic-order-filter", MessageSelector.bySql("sex = 0"));
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(message->{
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
...省略...
对发送失败的消息进行重新发送叫消息重试,producer和consumer都有消息重试机制。
RocketMQ默认支持消息重试机制,消息重试具有如下特点
对于同步和异步消息支持消息重试,对于oneway单向消息不支持重试
普通消息具有消息重试,顺序消息不支持消息重试
消息重试可能会造成消息重复,所以消费者一定要做好幂等处理
消息发送失败有三种情况:同步发送失败、异步发送失败、消息刷盘失败
同步发送失败策略:
对于普通消息,消息发送默认采用round-robin策略来选择所发送到的队列。如果发送失败,默认重试2 次。但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker。当然,若只有一个Broker其 也只能发送到该Broker,但其会尽量发送到该Broker上的其它Queue。相关设置如下:
// 设置重试发送的次数,默认为2次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时限为5s,默认3s
producer.setSendMsgTimeout(5000);
如果超过重试次数,则抛出异常,由Producer去保证消息不丢。当然当生产者出现 RemotingException、MQClientException和MQBrokerException时,Producer会自动重投消息。
同时,Broker还具有失败隔离功能,使Producer尽量选择未发生过发送失败的Broker作为目标 Broker。其可以保证其它消息尽量不发送到问题Broker,为了提升消息发送效率,降低消息发送耗时。
异步发送失败策略:
异步发送失败重试时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保 证消息不丢。 相关设置如下:
//指定异步发送失败后不进行重试发送
producer.setRetryTimesWhenSendAsyncFailed(0)
消息刷盘失败策略:
消息刷盘超时(Master或Slave)或slave不可用(slave在做数据同步时向master返回状态不是 SEND_OK)时,默认是不会将消息尝试发送到其他Broker的。不过,对于重要消息可以通过在Broker 的配置文件设置retryAnotherBrokerWhenNotStoreOK属性为true来开启。
发送端重试实例:
//同步发送消息,如果5秒内没有发送成功,则重试5次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(5);
producer.send(msg,5000L);
顺序消息重试
对于顺序消息消费失败默认会进行每隔1000毫秒进行重试,由于要保证消息是顺序消费,所以重试会导致后面的消息阻塞。可以通过下面的设置来修改重试间隔时间:
//每隔100毫秒重试
consumer.setSuspendCurrentQueueTimeMillis(100);
[注意]顺序消息没有发送失败重试机制,但具有消费失败重试机制 ,顺序消息重试是无止境的,为了防止消息一直重试阻塞,务必要做好监控工作。
无顺消息重试
对于无序消息(普通消息、延时消息、事务消息),当Consumer消费消息失败时,可以通过设置返回 状态达到消息重试的效果。不过需要注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息
重试时间间隔
对于无序消息集群消费下的重试消费,每条消息默认最多重试16次,但每次重试的间隔时间是不同的,会逐渐变长。每次重试的间隔时间如: 1s 5s 10s …2h ,如果16次都重试失败,消息进入死信队列
可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:
// 修改消费重试次数
consumer.setMaxReconsumeTimes(10);
重试队列
对于需要重试消费的消息,并不是Consumer在等待了指定时长后再次去拉取原来的消息进行消费,而 是将这些需要重试消费的消息放入到了一个特殊Topic的队列中,而后进行再次消费的。这个特殊的队
列就是重试队列。 当出现需要进行重试消费的消息时,Broker会为每个消费组都设置一个Topic名称为%RETRY%consumerGroup@consumerGroup 的重试队列。
消费端重试实例:
//最大重试次数,默认16
defaultMQPushConsumer.setMaxReconsumeTimes(10);
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt message : list){
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
if(message.getReconsumeTimes() > 3){
//如果重试次数大于3,可以把消息持久化到数据库,然后另外使用一个定时任务去定时重试。
//甚至进行人工干预
//消息持久化到数据库,这里就没必要重试了,返回success
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//出现异常,进行重试
try {
throw new Exception("出现异常了...");
} catch (Exception e) {
e.printStackTrace();
//稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
};
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
消息多次消费失败,达到最大重试次数,消息不会被丢弃而是进入死信队列(Dead-Letter Queue,DLQ),死信队列中的消息被称为死信消息(Dead-Letter Message,DLM)。
死信队列具有如下特征
死信队列中的消息无法再消费,死信队列对应Topic的权限为2,只有写权限,所以死信队列没有办法读取。
3天之后死信队列分钟的消息被删除,和普通消息一样
死信队列就是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup,其中每个队列都是死信队列
如果⼀个消费者组未产生死信消息,则不会为其创建相应的死信队列
如果出现死信队列,说明程序除了问题,程序员应该及时的排除,进行BUG的处理。我们应该在消费者重试次数达到一定程度就对消息进行持久化,方便后续的处理。或额外定时重试。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
@SpringBootApplication
public class ApplicationStart {
public static void main(String[] args) {
SpringApplication.run(ApplicationStart.class);
}
}
rocketmq:
name-server: 127.0.0.1:9876
# 是否开启自动配置
producer:
enable-msg-trace: true
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
group: "service-pay-producer"
# 消息最大长度 默认 1024 * 4 (4M)
max-message-size: 4096
# 发送消息超时时间,默认 3000
send-message-timeout: 3000
# 发送消息失败重试次数,默认2
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
@Service
public class RocketMQProducer{
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.producer.send-message-timeout}")
private Integer messageTimeOut;
/**
* 发送普通消息
* @return
*/
public SendResult sendMsg(String msgBody){
SendResult result = rocketMQTemplate.syncSend("queue_test_topic", MessageBuilder.withPayload(msgBody).build());
return result;
}
/**
* 发送异步消息 在SendCallback中可处理相关成功失败时的逻辑
*/
public void sendAsyncMsg(String msgBody){
rocketMQTemplate.asyncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑
}
@Override
public void onException(Throwable e) {
// 处理消息发送异常逻辑
}
});
}
/**
* 发送延时消息<br/>
* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h<br/>
*/
public void sendDelayMsg(String msgBody, Integer delayLevel){
rocketMQTemplate.syncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(),messageTimeOut,delayLevel);
}
/**
* 发送带tag的消息,直接在topic后面加上":tag"
*/
public void sendTagMsg(String msgBody){
rocketMQTemplate.syncSend("queue_test_topic:tag1",MessageBuilder.withPayload(msgBody).build());
}
}
实现:RocketMQListener 接口消费消息,通过@RocketMQMessageListener指定consumerGroup,topic,和tags
/**
* rocketmq 消息监听,@RocketMQMessageListener中的selectorExpression为tag,默认为*
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "queue_test_topic",selectorExpression="*",consumerGroup = "queue_group_test")
public class RocketMQMsgListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String msg = new String(body, CharsetUtil.UTF_8);
log.info("接收到消息:{}", msg);
}
}
@Controller
public class ProducerController {
@Autowired
private RocketMQProducer rocketMQProducer;
@RequestMapping("/send")
@ResponseBody
public SendResult send(String msg) {
//formats: `topicName:tags`
return rocketMQProducer.sendMsg(msg);
}
}
生产者增加事务消息发送方法
/**
* 发送事务消息
*/
public SendResult sendTransMsg(String msgBody){
//封装消息
Message<String> message = MessageBuilder.withPayload(msgBody).build();
//发送事务消息
return rocketMQTemplate.sendMessageInTransaction(
//这里和事务监听器里面的事务组保持一致
"tx-producer-group",
//topic:tag
"queue_test_topic:trans-tags",message ,null);
}
通过 @RocketMQTransactionListener(txProducerGroup = “tx-producer-group”) 注解标记,监听器需要实现RocketMQLocalTransactionListener 接口 , txProducerGroup 是事务组的名字。
@Component
@RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
public class MyTransactionCheckListener implements RocketMQLocalTransactionListener {
public static final Logger LOGGER = LoggerFactory.getLogger(MyTransactionCheckListener.class);
Random random = new Random();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message msg, Object arg) {
//执行业务,保存本地事务
//保存成功
if(random.nextInt() % 2 == 0){
LOGGER.info("本地事务提交成功...");
return RocketMQLocalTransactionState.COMMIT;
}
LOGGER.info("本地事务提交未知...");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message msg) {
//这里查询本地事务状态
if(random.nextInt() % 2 == 0){
LOGGER.info("本地事务回查...COMMIT");
return RocketMQLocalTransactionState.COMMIT;
}
LOGGER.info("本地事务回查...ROLLBACK");
return RocketMQLocalTransactionState.ROLLBACK;
}
}
正常编写即可
@RequestMapping("/sendTrans")
@ResponseBody
public SendResult sendTrans(String msg) {
//formats: `topicName:tags`
return rocketMQProducer.sendTransMsg(msg);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。