当前位置:   article > 正文

Java教程:RocketMq集群消息核心知识与SpringBoot整合并实现生产者与消费者_springboot rocketmq 生产者与消费者

springboot rocketmq 生产者与消费者

前言:

上一章节我们 讲了RocketMq的一些基础概念和集群搭建,本章我们讲解下如何加入到项目中开发

RocketMq消息类型分为三种:普通消息、顺序消息、事务消息

  • 普通消息:有三种发送方式

     单向发送:单向发送是指发送方只负责发送消息,不等待服务器回应,且没有回调函数触发。即只发送请求而不管响应。
     
     同步发送:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才会发送下一个数据包的通讯方式。
     
     异步发送:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下一个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 顺序消息

     一般情况下,每个主题(topic)都会有多个消息队列(message queue),假设投递了同一个主题的十条消息,那么这十条消息会分散在不同的队列中。对于消费者而言,每个消息队列是等价的,就不能确保消息总体的顺序。而顺序消息的方案就是把这十条消息都投递到同一个消息队列中。顺序消息与普通消息同样有三种发送方式。
    
    • 1
  • 事务消息

     RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。
    
    • 1

    事务消息发送步骤:

    1、发送方将半事务消息发送至RocketMQ服务端。

    2、RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成“暂不能投递”状态。

    3、发送方开始执行本地事务逻辑。

    4、发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

    事务消息回查机制:

    1、在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。

    2、发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

    3、发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

    在这里插入图片描述

第一步:导入pom

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

第二步:添加yml配置

rocketmq:
  name-server: 192.168.116.128:9876;192.168.116.132:9876
  producer:
    group: rocketmq-producer
  • 1
  • 2
  • 3
  • 4

第三步:实现普通消息发送和消费

生产者源码:

/**
 * 发送消息:同步消息、异步消息和单向消息。
 *
 * @author wfeil211@foxmail.com
 * @date 2022-9-26 14:16:42
 */
@Slf4j
@Service
public class MqOrdinaryProducer {

    /**
     * rocketmq
     */
    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * 单向消息
     */
    public void sendMq() {
        rocketMqTemplate.sendOneWay("topic-test", "测试发送单向消息》》》》》》》》》");
    }

    /**
     * 同步发送
     */
    public void sync() {
        SendResult sendResult = rocketMqTemplate.syncSend("topic-test", "sync发送消息。。。。。。。。。。");
        log.info("发送结果{}", sendResult);
    }

    /**
     * 异步发送
     */
    public void async() {
        String msg = "异步发送消息。。。。。。。。。。";
        log.info(">msg:<<" + msg);
        rocketMqTemplate.asyncSend("topic-test", msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                log.info("异步发送成功{}", var1);
            }

            @Override
            public void onException(Throwable var1) {
                //发送失败可以执行重试
                log.info("异步发送失败{}", var1);
            }
        });
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

消费者源码:

/**
 * 消费消息
 *
 * @author wfeil211@foxmail.com
 * @date 2022-9-26 14:16:42
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-producer", topic = "topic-test")
public class MqOrdinaryConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("接收到的数据是:{}", message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

第四步:实现顺序消息发送和消费

生产者源码:

/**
 * 发送消息:同步消息、异步消息和单向消息。
 * @author wfeil211@foxmail.com
 * @date 2022-9-26 14:16:42
 */
@Slf4j
@Service
public class MqOrderProducer {

    /**
     * rocketmq
     */
    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * 单向消息
     */
    public void sendMq() {
        rocketMqTemplate.sendOneWayOrderly("topic-test", "测试发送单向消息》》》》》》》》》", "orderId");
    }

    /**
     * 同步发送
     */
    public void sync() {
        SendResult sendResult = rocketMqTemplate.syncSendOrderly("topic-test", "sync发送消息。。。。。。。。。。", "orderId");
        log.info("发送结果{}", sendResult);
    }

    /**
     * 异步发送
     */
    public void async() {
        String msg = "异步发送消息。。。。。。。。。。";
        log.info(">msg:<<" + msg);
        rocketMqTemplate.asyncSendOrderly("topic-test", msg, "orderId", new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                log.info("异步发送成功{}", var1);
            }
            @Override
            public void onException(Throwable var1) {
                //发送失败可以执行重试
                log.info("异步发送失败{}", var1);
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

消费者源码:

/**
 * 消费消息
 *
 * @author wfeil211@foxmail.com
 * @date 2022-9-26 14:16:42
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-producer", topic = "topic-test", consumeMode = ConsumeMode.ORDERLY, consumeThreadMax = 1)
public class MqOrderConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("接收到的数据是:{}", message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

第五步:实现事务消息发送和消费

生产者源码:

/**
 * 发送消息:事务消息
 *
 * @author wfeil211@foxmail.com
 * @date 2022-9-26 14:16:42
 */
@Slf4j
@Service
public class MqTransactionProducer {

    /**
     * rocketmq
     */
    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * 发送事务消息
     */
    public void sendTransaction() {
        rocketMqTemplate.sendMessageInTransaction("topic-test:testTag", MessageBuilder.withPayload("测试事务消息").build(), "msg");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

事务监听源码:

/**
 * 本地事务监听
 *
 * @author wfeil211@foxmail.com
 * @date 2022-9-26 14:16:42
 */
@Slf4j
@RocketMQTransactionListener
public class SendMessageToIpcExecuteLocal implements RocketMQLocalTransactionListener {

    /**
     * 执行本地事务
     *
     * @param message
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info(message.toString());
        return RocketMQLocalTransactionState.COMMIT;
    }

    /**
     * 检查本地事务
     *
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info(message.toString());
        return RocketMQLocalTransactionState.COMMIT;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

消费者源码与普通消费者类似:

/**
 * 消费消息
 *  * @author wfeil211@foxmail.com
 * @date 2022-9-26 14:16:42
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-producer", topic = "topic-test")
public class MqOrdinaryConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("接收到的数据是:{}", message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

重点了解:

如何发送指定tag消息

  1. 这时候有的同学就有疑问了,不是说tag是topic下的二级分类,那如何发送指定tag消息,其实不难
    **在发送消息时将destination参数这样拼接,testTag即为tag标识:

    rocketMqTemplate.sendOneWay("topic-test:testTag", "测试发送单向消息》》》》》》》》》");
    
    • 1
  2. 消费时如何消费,重点在@RocketMQMessageListener注解,里面定义了一些非常多的属性,其中selectorExpression就是指定对应tag,将此参数配置自己的tag即可实现消费

    @RocketMQMessageListener(consumerGroup = "rocketmq-producer", topic = "topic-test", selectorExpression = "testTag")
    
    • 1

集群模式

消息负载均衡(Rebalance机制)

什么是消息的负载均衡,上节说一个topic可以有多个队列,生产者在发送消息时无需指定队列,只需指定topic即可,topic会自动分配到指定队列,那么队列又是如何分配的呢?

负载均衡:将⼀个Topic下的多个Queue分配到不同Consumer实例的过程,例如,⼀个Topic下5个队列,在只有1个消费者的情况下,这个消费者将负责消费这5个队列的消息。如果此时我们增加⼀个消费者,那么就可以给其中⼀个消费者分配2个队列,给另⼀个分配3个队列,从而提升消息的并行消费能力,而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。
  • 1

在这里插入图片描述
分配策略

1、平均分配策略

 比如:有10个Queue,4个Consumer,那么每个Consumer可以分配到2个Queue,但是还有两个Queue是多余的,那么这两个Queue将依次按顺序分给Consumer-A,Consumer-B
  • 1

在这里插入图片描述
2、环形平均策略

环形平均算法是指,根据消费者的顺序,依次在由queue队列组成的环形图中逐个分配。
  • 1

在这里插入图片描述
3、一致性hash策略

该算法会将consumer的hash值作为Node节点存放到hash环上,然后将queue的hash值也放到hash环上,通过顺时针方向,距离queue最近的那个consumer就是该queue要分配的consumer。
  • 1

在这里插入图片描述
4、同机房策略

该算法会根据queue的部署机房位置和consumer的位置,过滤出当前consumer相同机房的queue。然后按照平均分配策略或环形平均策略对同机房queue进行分配。如果没有同机房queue,则按照平均分配策略或环形平均策略对所有queue进行分配。
  • 1

在这里插入图片描述

广播模式

广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。

生产者组和消费者组

  • 生产者组

     一个生产者组,代表着一群topic相同的Producer。即一个生产者组是同一类Producer的组合、如果发送的是事务消息,如果节点1发送完消息后,消息存储到broker的Half Message Queue中,还未存储到目标topic的queue中时,此时节点1崩溃,则可以通过同一Group下的节点2进行二阶段提交,或回溯。
    
    • 1
  • 消费者组

     用来进行负载均衡
    
    • 1

点个赞呗~(首席摸鱼师 微信同号)嘻嘻

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/713031
推荐阅读
相关标签
  

闽ICP备14008679号