赞
踩
在写代码之前,先了解几个概念:
Topic:主题,是一类消息的集合,每条消息只能属于一个主题
生产者向Topic发送消息,消费者订阅指定的Topic,MQ会主动把消息推送给消费者
Tag:标签,为消息设置的标签,用于区分同一 Topic 下不同类型的消息
简单来说就是给 Topic 下的消息再次分组
GroupName:用于给生产者和消费者分组
建立一个普通的java项目
修改 pom.xml,增加
- <!-- 客户端依赖,需要跟 rocketmq 版本一致 -->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.9.3</version>
- </dependency>
新建 Producer.java
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
- // 指定 nameserver 地址,把自己注册到nameserver上,同时获取 broker 信息
- producer.setNamesrvAddr("192.168.56.106:9876");
-
- //启动生产者
- producer.start();
- for (int i = 0; i < 10; i++) {
- Message msg = new Message("wangzhe" ,// topic
- "yase" /* Tag */,
- ("亚瑟的大宝剑").getBytes("utf-8"));//把字符串转换为字节数组
- //发送消息
- SendResult sendResult = producer.send(msg);
- System.out.println(sendResult);
- }
- //关闭生产者,关闭后,在 rocketmq 的界面中就找不到了
- producer.shutdown();
- }
注意:发消息之前,设置虚拟机的系统时间,跟宿主机保持一致
结果:
界面上也能找到自己的 Topic 和 消息
然后新建一个Consumer类
- public static void main(String[] args) throws Exception {
- // 建立消费者,自定义一个分组名
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
- // 指定 nameserver 地址,把自己注册到nameserver上,同时获取 broker 信息
- consumer.setNamesrvAddr("192.168.56.106:9876");
- // 订阅Topic,第二个参数为*,表示这个Topic的消息都要
- consumer.subscribe("wangzhe", "*");
- // 注册监听器,收到消息时执行
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- /**
- * 消费消息
- * @param msgs 这时一个列表,可能有多个消息
- * @param context
- * @return
- */
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- //取出消息体,这是我们最关注的东西
- System.out.println(new String(msgs.get(0).getBody()));
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//表示消费成功
- }
- });
- System.out.println("消费者已经就绪。。。。。。");
- // 启动消费者
- consumer.start();
- }
Consumer 启动后会一直等待 Broker 推送消息
这时候启动 Producer 发送消息,消费者就能收到
ConsumeConcurrentlyStatus.CONSUME_SUCCESS:表示消费成功
返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,之后消息会重发
修改 Consumer,增加 consumerLater 方法:
- private static void consumerLater() throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
- consumer.setNamesrvAddr("192.168.56.106:9876");
- consumer.subscribe("wangzhe", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- System.out.println(LocalDateTime.now() +"-------"+ new String(msgs.get(0).getBody()));
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//表示消费失败,稍后消息会重发,默认重发16次
- }
- });
- System.out.println("消费者已经就绪。。。。。。");
- consumer.start();
- }
上图中:把之前的代码封装到 consumerSuccess 方法中,新增了 consumerLater 方法
先启动Consumer,再启动Producer发送消息
结果:消息会重发
可以通过 setMaxReconsumeTimes 方法设置重发次数,为了避免干扰,让上一次的消息成功消费掉
修改 Consumer,增加 consumerTimes 方法:
- private static void consumerTimes() throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
- consumer.setNamesrvAddr("192.168.56.106:9876");
- consumer.subscribe("wangzhe", "*");
- //设置最多重发 1 次
- consumer.setMaxReconsumeTimes(1);
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- System.out.println(LocalDateTime.now() +"-------"+ new String(msgs.get(0).getBody()));
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//表示消费失败,稍后消息会重发,默认重发16次
- }
- });
- System.out.println("消费者已经就绪。。。。。。");
- consumer.start();
- }
上图中:一共消费两次,只重发了一次
Producer 发送消息后会等待 Broker 的响应,然后再往下执行,上面的代码就是这种方式
修改 Producer,增加 syncSend 方法:
- private static void syncSend() throws Exception{
- DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
- producer.setNamesrvAddr("192.168.56.106:9876");
- producer.start();
- for (int i = 0; i < 5; i++) {
- String message = "亚瑟的大宝剑" + i;//加上序号
- Message msg = new Message("wangzhe" ,
- "yase",
- message.getBytes("utf-8"));//把字符串转换为字节数组
- SendResult send = producer.send(msg);
- System.out.println("发送成功:" + message);
- }
- producer.shutdown();
- }
结果,有序输出:
这时,如果发送失败,默认进行两次重试,可以通过 setRetryTimesWhenSendFailed() 指定重试次数
发送消息后,不等待 Broker的响应,而是提供一个回调方法,比如:
- public static void main(String[] args) throws Exception {
- //1. 同步发送
- //syncSend();
- //2. 异步发送
- asyncSend();
- }
- private static void asyncSend() throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
- producer.setNamesrvAddr("192.168.56.106:9876");
- producer.start();
- for(int i = 1; i<=5; i++){//数字搞的大一点,不然效果不明显
- String message = "亚瑟的大宝剑" + i;
- Message msg = new Message("wangzhe" ,
- "yase" ,
- message.getBytes("utf-8"));
- //异步发送消息,提供一个回调方法
- producer.send(msg, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println("发送成功:" + message);
- }
- @Override
- public void onException(Throwable e) {
- System.out.println("发送失败:" + e.getMessage());
- }
- });
-
- }
- Thread.sleep(10000);//休眠10秒再关闭客户端
- producer.shutdown();
- }
结果,乱序输出:
这时,如果发送失败,默认进行2次重试,可以通过 setRetryTimesWhenSendAsyncFailed() 指定重试次数。
这种方式不会等 Broker 的响应,也不管成败,可以应用于一些消息不是那么重要,可丢失的场景
通过 sendOneway() 方法实现,该方法没有返回值
- private static void oneWay() throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
- producer.setNamesrvAddr("192.168.56.106:9876");
- producer.start();
- for(int i = 1; i<=5; i++){
- String message = "亚瑟的大宝剑" + i;
- Message msg = new Message("wangzhe" ,
- "yase" ,
- message.getBytes("utf-8"));
- //发送消息 oneway,该方法没有返回值
- producer.sendOneway(msg);
- System.out.println("发送成功:" + message);
- }
- producer.shutdown();
- }
消费 message 时,按照发送的顺序来消费,先进先出
使用同步发送保证了发送时消息的顺序
Consumer 使用 consumerSuccess 方法,Producer 使用同步发送
先启动Producer,在启动Consumer,结果:
原因:消息是保存在多个Queue中的
队列是存储消息的物理实体,可以认为是数据库的表。一个Topic中有多个Queue,默认是4个,我们发的消息就是存储在这些个Queue中
在 RocketMQ Dashboard 中可以看到队列的数量
默认情况下,消息发送会采取 _轮询 _的方式把消息发送到不同的queue
消费时,从多个queue上获取消息,导致乱序
假设我们的 Producer 为订单先后生成了 4 条消息:
订单T0000001:未支付
订单T0000001:已支付
订单T0000001:发货中
订单T0000001:发货失败
以 轮询 的方式放到了 4 队列中,每个队列一条消息
这时候消费者 收到的消息可能有很多种,但是只有一种是正确的
解决:**发送数据,通过设置,把这些消息按照顺序放到一个Queue中,然后消费者再去消费
**有 2 种方式:
设置Topic下只有一个队列,也叫全局有序(只有一个队列)
设置相关的消息都发送到相同的一个队列,也叫分区有序
需要在 RocketMQ Dashborad 中先把对应主题删掉
在 Producer 中,增加:
- private static void allOrder() throws Exception{
- DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
- producer.setNamesrvAddr("192.168.56.106:9876");
- //只设置一个队列
- producer.setDefaultTopicQueueNums(1);
- producer.start();
- for (int i = 0; i < 5; i++) {
- String message = "亚瑟的大宝剑" + i;
- Message msg = new Message("wangzhe" ,
- "yase" ,
- message.getBytes("utf-8"));
- SendResult send = producer.send(msg);
- System.out.println("发送成功:" + message);
- }
- producer.shutdown();
- }
启动 Producer 后, RocketMQ Dashborad 查看
Consumer 增加 consumerOrder 方法,使用 MessageListenerOrderly 监听器
- private static void consumerOrder() throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
- consumer.setNamesrvAddr("192.168.56.106:9876");
- consumer.subscribe("wangzhe", "*");
- //使用 MessageListenerOrderly 监听器
- consumer.registerMessageListener(new MessageListenerOrderly() {
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- System.out.println(LocalDateTime.now().toString() + "---" + new String(msgs.get(0).getBody()));
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
- System.out.println("消费者已经就绪。。。。。。");
- consumer.start();
- }
结果:
先删除 RocketMQ Dashborad 中对应主题
另外,pom.xml中增加:
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.60</version>
- </dependency>
使用 Java客户端 实现,在 Producer 中增加一个新的 order 方法,内容:
- private static void order() throws Exception{
- DefaultMQProducer producer = new DefaultMQProducer("test");
- producer.setNamesrvAddr("192.168.228.106:9876");
- producer.start();
- for(int i = 1; i<=6; i++){
- Map<String,String> message = new HashMap<>();
- if(i % 2 ==0){
- message.put("type","法师");
- message.put("name","妲己"+i);
- }else{
- message.put("type","战士");
- message.put("name","亚瑟"+i);
- }
- Message msg = new Message("wangzhe" ,
- "yase",
- JSONObject.toJSONString(message).getBytes("utf-8"));
- SendResult send = producer.send(msg, new MessageQueueSelector() {
- /**
- * @param mqs topic下的4个队列
- * @param msg 上面的msg对象
- * @param arg 自定义的参数,用来选择消息发到哪个队列
- * @return
- */
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- //这样同一类型的消息会发达一个队列中
- String type = String.valueOf(arg);
- if(type.equals("法师")){
- return mqs.get(0);
- }else{
- return mqs.get(1);
- }
- }
- }, message.get("type"));
- System.out.println(send);
- System.out.println("发送成功:" + message);
- }
- producer.shutdown();
- }
Consumer 不做修改,先启动Producer发送消息,然后再启动Consumer
结果:
消息写入到 Broker 后,内部的调度线程会在等待指定的时间后,把消息投递到队列中
比如:12306上卖火车票,车票预订成功后就会发一条消息,在45分钟后发送给业务系统,如果这时订单没有付款,就取消订单,已经付款则忽略
目前 RocketMQ 不支持随意时长的延迟,而是通过特定的等级来指定。
默认支持18个等级,等级定义在RocketMQ Dashboard 项目中的 MessageStoreConfig类:
指定的延时等级为 3,则表示延迟时长为 10s,即延迟等级是从 1 开始计数的
如果想自定义,需要修改 broker 启动时指定的配置文件,我的是 /home/soft/rocketmq/conf/broker.conf
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
修改:Producer ,增加:
- private static void delay() throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
- producer.setNamesrvAddr("192.168.56.106:9876");
- producer.start();
- String message = "亚瑟的大宝剑---延时消息";
- Message msg = new Message("wangzhe" ,
- "yase" ,
- message.getBytes("utf-8"));
- msg.setDelayTimeLevel(3);//设置延时级别
- SendResult send = producer.send(msg);
- System.out.println(LocalDateTime.now().toString() + "发送成功:" + message);
- producer.shutdown();
- }
运行 Producer,结果:
如果消息过多,每次发送消息都和MQ建立连接,无疑是一种性能开销,可以把消息打包批量发送,提高性能
批量发送消息有些限制:
应该有相同的topic
不能是延时消息
不能超过4M
可以通过 Producer 的 setMaxMessageSize 方法设置
spring boot 下可通过 rocketmq.producer.maxMessageSize 设置,单位是字节
Producer 中,增加:
- private static void batch() throws Exception{
- DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
- producer.setNamesrvAddr("192.168.56.106:9876");
- //设置最大消息大小,默认4M
- //同时也要修改启动broker时,指定配置文件:broker.conf, 加上 maxMessageSize
- producer.setMaxMessageSize(1024 * 1024 * 4);
- producer.start();
- List<Message> list = new ArrayList<>();
- for(int i = 1; i<=100; i++){
- String message = "亚瑟的大宝剑" + i;
- Message msg = new Message("wangzhe" ,
- "yase" ,
- message.getBytes("utf-8"));
- list.add(msg);
- }
- SendResult send = producer.send(list);
- System.out.println(send);
- producer.shutdown();
- }
Consumser,批量消费:
- private static void consumerBatch() throws Exception{
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
- consumer.setNamesrvAddr("192.168.56.106:9876");
- //每次从 broker 批量拉取消息的数量,默认32
- consumer.setPullBatchSize(32);
- //从拉取的消息中,每次最多消费的数量,默认1,就是下面 consumeMessage 方法的 msgs 参数的长度
- consumer.setConsumeMessageBatchMaxSize(10);
- //设置最多只有 1 个线程处理消息,默认是多线程,只设置 1 个线程,更容易看到效果
- consumer.setConsumeThreadMax(1);
- //设置最少有 1 个线程处理消息
- consumer.setConsumeThreadMin(1);
- consumer.subscribe("wangzhe", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {//使用并发消费监听器
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- System.out.println("拿到的消息数量......" + msgs.size());
- for(MessageExt msg : msgs){
- System.out.println(Thread.currentThread().getName() + "---" + LocalDateTime.now().toString() + "---" + new String(msg.getBody()));
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.out.println("消费者已经就绪。。。。。。");
- }
注意:先启动 Producer ,在启动 Consumer,结果:
Producer 发送了很多消息,但是 Consumer 只想消费其中的某些,而不是全部,有两种方式
Tag过滤
SQL过滤(做了解)
修改Producer,增加
- private static void tagFilter() throws Exception{
- DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
- producer.setNamesrvAddr("192.168.56.106:9876");
- producer.start();
- Message msg = new Message("wangzhe" ,"yase" , "我是亚瑟".getBytes("utf-8"));
- Message msg1 = new Message("wangzhe" ,"diaochan" , "我是貂蝉".getBytes("utf-8"));
- Message msg2 = new Message("wangzhe" ,"lvbu" , "我是吕布".getBytes("utf-8"));
- SendResult send = producer.send(msg);
- System.out.println(send);
- SendResult send1 = producer.send(msg1);
- System.out.println(send1);
- SendResult send2 = producer.send(msg2);
- System.out.println(send2);
- producer.shutdown();
- }
修改Consumer
- private static void consumerTag() throws Exception{
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
- consumer.setNamesrvAddr("192.168.56.106:9876");
- // 只消费 lvbu 和 ciaochan 这两个tag
- consumer.subscribe("wangzhe", "lvbu || diaochan");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- //取出消息体,这是我们最关注的东西
- System.out.println(new String(msgs.get(0).getBody()));
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//表示消费成功
- }
- });
- consumer.start();
- System.out.println("消费者已经就绪。。。。。。");
- }
结果,只收到 lvbu和 diaochan的消息
注意:实际工作中,一个服务一般只有一个消费者,通过 tag,调用不同的service处理消息
默认不支持 sql,需要修改 broker 的配置文件,我们的是 /home/soft/rocketmq/conf/broker.conf
enablePropertyFilter=true
之后重启rocketMq
表达式介绍:
- 关键字:
- AND, OR, NOT, BETWEEN...AND, IN, TRUE, FALSE, IS, NULL
-
- 数据类型:
- Boolean, 比如: TRUE, FALSE
- String, 比如: 'abc',必须用单引号包起来
- 数字, 比如: 123、3.1415
-
- 语法:
- AND, OR
- >, >=, <, <=, =
- BETWEEN A AND B, equals to >=A AND <=B
- NOT BETWEEN A AND B, equals to >B OR <A
- IN ('a', 'b'), equals to ='a' OR ='b',
- IS NULL, IS NOT NULL,
- =TRUE, =FALSE,
-
- 样例:
- (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
修改 Producer,增加
- private static void sqlFilter() throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("test");
- producer.setNamesrvAddr("192.168.56.106:9876");
- producer.start();
-
- Message msg1 = new Message("wangzhe", "貂蝉芳龄18".getBytes("utf-8"));
- msg1.putUserProperty("age","18");
- SendResult send1 = producer.send(msg1);
- System.out.println(send1);
-
- Message msg2 = new Message("wangzhe", "西施芳龄19".getBytes("utf-8"));
- msg2.putUserProperty("age","19");
- SendResult send2 = producer.send(msg2);
- System.out.println(send2);
-
- Message msg3 = new Message("wangzhe", "甄姬芳龄20".getBytes("utf-8"));
- msg3.putUserProperty("age","20");
- SendResult send3 = producer.send(msg3);
- System.out.println(send3);
-
- producer.shutdown();
- }
修改Consumer,增加
- private static void consumerSql() throws Exception{
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
- consumer.setNamesrvAddr("192.168.56.106:9876");
- //Sql 过滤
- consumer.subscribe("wangzhe", MessageSelector.bySql("age = 18"));
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- //取出消息体,这是我们最关注的东西
- System.out.println(new String(msgs.get(0).getBody()));
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//表示消费成功
- }
- });
- consumer.start();
- System.out.println("消费者已经就绪。。。。。。");
- }
结果:
Producer 会对发送失败的消息,进行重发,目的:保证消息不丢失
需要注意:
采用 同步或异步 发送时才会重试,oneway方式不会
普通消息有重试,顺序消息没有
可能造成消息重复,需要在消费端处理
发送失败重投次数,默认为2
可以通过 provider.setRetryTimesWhenSendFailed** **方法设置重投次数
重投时,尝试向其他broker发送(就是Rocketmq集群下的其他节点),目的是为了保证消息不丢
需要设置 provider.setRetryAnotherBrokerWhenNotStoreOK(true);
如果消息刷盘超时,不会将消息尝试发送到其他 Broker
刷盘:Producer 发送消息到 Broker,Broker会先把消息放到内存中,然后定时刷到磁盘
超过重投次数,抛出异常
如果消息发送超时,不重试
可以通过 setSendMsgTimeout 设置超时时间,默认 3 秒
发送失败重试次数,默认为2
可以通过 provider.**retryTimesWhenSendAsyncFailed **方法设置重投次数
仅在同一个broker上做重试
如果消息发送超时,不重试
可以通过 setSendMsgTimeout 设置超时时间,默认 3 秒
其实就是保证消息不丢失
RocketMQ消息丢失可能发生在以下三个阶段:
生产者发送消息到Broker时;
Broker内部存储消息到磁盘以及主从复制同步时;
Broker把消息推送给消费者或者消费者主动拉取消息时;
推荐使用同步发送,如果是异步发送,可以根据发送的结果信息来判断是否需要重试来保证消息的可靠性
如果要求比较高,在发送失败时,尝试将消息存储到 db,由后台线程定时重试,确保消息到达 Broker
响应状态:
SEND_OK:消息发送成功,但并不可靠,要保证不丢失,需要启用同步Master服务器或同步刷盘
FLUSH_DISK_TIMEOUT:消息发送成功,但刷盘超时,此时消息在队列中(内存),服务器宕机,消息丢失
FLUSH_SLAVE_TIMEOUT:消息发送成功,同步到slave节点超时,此时消息在队列中,服务器宕机,消息丢失
SLAVE_NOT_AVAILABLE:消息发送成功,但slave不可用。
可能丢失的原因:
刷盘阶段可能会出现消息丢失
主从复制时,要将Master的消息同步到Slave,这个过程也会出现消息丢失
刷盘有2种方式:同步刷盘、异步刷盘。
同步刷盘:消息写入内存的 PageCache后,立刻刷盘,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大
异步刷盘:默认方式,消息写入到内存的 PageCache中,就立刻给客户端返回写操作成功
当 PageCache中的消息积累到一定的量时,触发一次刷盘或者定时将消息写入磁盘中
这种方式吞吐量大,性能高,但是 PageCache中的数据可能丢失,不能保证数据绝对的安全
主从同步也有2种方式:同步复制、异步复制。
同步复制:等Master和Slave都写成功后才反馈客户端
在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复
但是同步复制会增大数据写入延迟,降低系统吞吐量
异步复制:只要Master写成功,即可反馈给客户端写成功状态
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,
但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失
消费重试
死信队列:未成功消费的消息,不会立刻将消息丢弃,而是发送到死信队列,其名称是在原队列名称前加%DLQ%,如果消息最终进入了死信队列,则可以通过RocketMQ提供的相关接口从死信队列获取到相应的消息,保证了消息消费的可靠性
消息的重复消费分为两种
同一条消息消费两次
Consumer 已经处理消息,但是由于网络原因,Broker 没有收到通知,重新消费
两条消息内容一样
Producer 发送消息时可能触发重试机制,导致多条消息内容一致
解决:
保证消费端业务逻辑的幂等性,即:无论执行多少个相同消息,结果都一致
比如:处理消息的目的是,更新数据库值为100,这样的消息就算重复消费也无所谓
利用日志表去记录每次消费成功消息,消费前先判断下日志表中是否已经消费过
在发送消息时,可以对消息内容进行hash,然后把得hahs值放到Message的key中,推荐使用
把消息的业务id,放到key中,比如:订单id
java 客户端方式
consusmer获取key,然后存起来,每次消费时先判断 key 存不存在
在界面上通过key查询也很方便
每个消息只会被消费一次,同一个Group下的Consumer,合作消费
Producer
- private static void send() throws Exception{
- DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
- producer.setNamesrvAddr("192.168.56.106:9876");
- producer.start();
- //发送 6 条消息
- for(int i = 1; i<=6; i++){
- String message = "亚瑟的大宝剑" + i;
- Message msg = new Message("wangzhe" ,// topic
- "yase" /* Tag */,
- message.getBytes("utf-8"));//把字符串转换为字节数组
- msg.setKeys("1111111111");//存放业务id
- SendResult send = producer.send(msg);
- System.out.println("发送成功:" + message);
- }
- producer.shutdown();
- }
Consumer
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
- consumer.setNamesrvAddr("192.168.56.106:9876");
- // 设置消费模型,集群还是广播,默认为集群
- consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.subscribe("wangzhe", "*");
- consumer.setMaxReconsumeTimes(2);
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- System.out.println(LocalDateTime.now().toString() + "---" + new String(msgs.get(0).getBody()));
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.out.println("消费者已经就绪。。。。。。");
- }
Consumer2
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
- consumer.setNamesrvAddr("192.168.56.106:9876");
- // 设置消费模型,集群还是广播,默认为集群
- consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.subscribe("wangzhe", "*");
- consumer.setMaxReconsumeTimes(2);
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- System.out.println(LocalDateTime.now().toString() + "---" + new String(msgs.get(0).getBody()));
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.out.println("消费者已经就绪2。。。。。。");
- }
结果:
每个Consumer都消费所有消息
修改Consumer
- // 设置消费模型:广播
- consumer.setMessageModel(MessageModel.BROADCASTING);
Rocketmq 集成在 Spring boot 中非常简单,只需要准备一个配置类,比如:
- @Configuration
- public class RocketmqConfig {
-
-
- /**
- * rocketmq 生产者
- * @return
- */
- @Bean
- public DefaultMQProducer defaultMQProducer(){
- DefaultMQProducer producer = new DefaultMQProducer("consumer");
- // 指定 nameserver 地址,把自己注册到nameserver上,同时获取 broker 信息
- producer.setNamesrvAddr("192.168.56.106:9876");
- try {
- producer.start();
- } catch (MQClientException e) {
- }
- return producer;
- }
-
- /**
- * rocketmq 消费者
- * @return
- */
- @Bean
- public DefaultMQPushConsumer defaultMQPushConsumer(){
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("producer");
- consumer.setNamesrvAddr("192.168.56.106:9876");
- try {
- consumer.subscribe("topic", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- String url = new String(msgs.get(0).getBody());
- handelMovieService.handelUrl(url);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//表示消费成功
- }
- });
- //启动消费者
- consumer.start();
- } catch (MQClientException e) {
- }
- return consumer;
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。