赞
踩
1. NameServer 每个NameServer结点之间是相互独立,彼此没有任何信息交互 启动NameServer。NameServer启动后监听端口,等待Broker、Producer、Consumer连接, 相当于一个路由控制中心。主要是用来保存topic路由信息,管理Broker 2. Broker 消息存储和中转角色,负责存储和转发消息 在启动时会向NameServer进行注册并且定时发送心跳包。心跳包中包含当前 Broker 信息 以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。 3. topic : 一个消息的集合的名字 创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建Topic。 4. 生产者 生产者发送消息。启动时先从 NameServer 集群中的其中一台拉取到路由表,缓存到本地, 并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上, 轮询从队列列表中选择一个队列(默认轮询) 5. 消费者 消费者跟其中一台NameServer建立连接,获取当前订阅Topic存在哪些Broker上, 然后直接跟Broker建立连接通道,然后开始消费消息
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.2</version>
- </dependency>
- rocketmq:
- name-server: 127.0.0.1:9876
- producer:
- group: my-group
- # 发送消息超时时间
- send-message-timeout: 5000
- # 发送消息失败重试次数
- retry-times-when-send-failed: 2
- retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
- rocketmq:
- name-server: 127.0.0.1:9876
- consumer:
- topic: topic_test
- group: consumer_my-group
- @Resource
- private RocketMQTemplate rocketMQTemplate;
-
- /**
- * 一般消息
- * Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。
- * 使用 Tag 可以实现对 Topic 中的消息进行过滤。
- * **/
- @GetMapping("/send")
- public String send(){
- rocketMQTemplate.convertAndSend("topic_test", "Hello, World!");
- rocketMQTemplate.convertAndSend("topic_test:tagB","Hello, World222--tagB");
- return "rocketMq普通消息发送完成";
- }
- /** 支持消费者按照发送消息的先后顺序获取消息 */
- @GetMapping("/send/orderly")
- public String sendOrder(){
- //发送顺序消息,参数:topic,消息,hashkey,相同hashkey发送至同一个队列
- rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 1).build(),"queue");
- rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 2).build(),"queue");
- return "rocketMq顺序-消息发送成功";
- }
- @GetMapping("/send/sync")
- public String sendMsg() {
- String message = "我是同步消息:" + LocalDateTime.now();
- SendResult result = rocketMQTemplate.syncSend("topic_test:tagA", MessageBuilder.withPayload(message).build());
- log.info("同步-消息发送成功:" + LocalDateTime.now());
- return "rocketMq 同步-消息发送成功:" + result.getSendStatus();
- }
- /** 发送异步消息 */
- @GetMapping("/send/async")
- public String asyncSendMsg(){
- String message = "我是异步消息:" + LocalDateTime.now();
- rocketMQTemplate.asyncSend("topic_test:tagA",message,new SendCallback() {
-
- @Override
- public void onSuccess(SendResult sendResult) {
- log.info("发送成功 (后执行),SendStatus = {}",sendResult.getSendStatus());
- }
-
- @Override
- public void onException(Throwable throwable) {
- log.info("发送失败 (后执行)");
- }
- });
- return "rocketMq 异步-消息发送成功:" + LocalDateTime.now();
- }
- @GetMapping("/send/oneWay")
- public String sendOneWayMessage() {
- String message = "我是单向消息:"+LocalDateTime.now();
- this.rocketMQTemplate.sendOneWay("topic_test:tagA", message);
- log.info("单向发送消息完成:message = {}", message);
- return "rocketMq 单向-消息发送成功:" + LocalDateTime.now();
- }
- /** 延时消息 */
- @GetMapping("/sendDelay")
- public String sendDelay(){
- String message = "我是延时消息:" + LocalDateTime.now();
- // 第四个参数为延时级别,分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2h
- rocketMQTemplate.syncSend("topic_test:tagC", MessageBuilder.withPayload(message).build(), 3000, 2);
- return "rocketMq延时-消息发送成功";
- }
- /** 事务消息 */
- @GetMapping("/send/transaction/{id}")
- public void sendTransactionMessage(@PathVariable("id") Integer id){
- //发送事务消息:采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等
- //参数一:topic;参数二:消息
- // 事务id
- String[] tags = {"tagA", "tagB", "tagC"};
- int i = id%3;
-
- String transactionId = UUID.randomUUID().toString();
- String message = "我是事务消息:" + LocalDateTime.now();
- TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("topic_test:" + tags[i]
- , MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId).build(),
- // 给本地事务的参数
- 2);
- //发送状态
- String sendStatus = result.getSendStatus().name();
- //本地事务执行状态
- String localState = result.getLocalTransactionState().name();
- log.info("发送状态:"+sendStatus+";本地事务执行状态"+localState);
-
-
- }
- @Slf4j
- @RocketMQTransactionListener
- public class MyTransactionListener implements RocketMQLocalTransactionListener {
-
-
- @Override
- public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {
-
- MessageHeaders headers = message.getHeaders();
- //获取事务ID
- String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
- log.info("执行本地事务 ,transactionId is {}, orderId is {}",transactionId, message.getHeaders().get("rocketmq_TAGS"));
- try{
- //模拟网络波动
- Thread.sleep(3000);
- /***
- * 首先发送一个半消息(half message),这个消息不会立即投递给消费者;然后执行本地事务(比如数据库操作)。
- * 根据本地事务的执行结果,决定是提交(commit)还是回滚(rollback)这个消息。
- * 如果本地事务成功,消息会被提交并发送给消费者;
- * 如果失败,消息会被回滚,消费者不会接收到这个消息
- */
-
- }catch (Exception e){
- return RocketMQLocalTransactionState.ROLLBACK;
- }
-
- // 执行本地事务
- String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));
- if (StringUtils.equals("tagA", tag)){
- //这里只讲TAGA消息提交,状态为可执行
- return RocketMQLocalTransactionState.COMMIT;
- }else if (StringUtils.equals("tagB", tag)) {
- return RocketMQLocalTransactionState.ROLLBACK;
- } else if (StringUtils.equals("tagC",tag)) {
- return RocketMQLocalTransactionState.UNKNOWN;
- }
-
- log.info("事务提交,消息正常处理: " + LocalDateTime.now());
- //执行成功,可以提交事务
- return RocketMQLocalTransactionState.COMMIT;
- }
-
- @Override
- public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {
- MessageHeaders headers = message.getHeaders();
- //获取事务ID
- String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
- log.info(transactionId + ",消息回查"+ LocalDateTime.now());
- return RocketMQLocalTransactionState.ROLLBACK;
- }
- }
tagA、tagB、tagC 三种事务消息,只有Commit的才能发送到broker
- /**
- * topic指定消费的主题,consumerGroup指定消费组,
- * 一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
- * 2.实现RocketMQListener接口
- * 如果想拿到消息的其他参数可以写成MessageExt
- * selectorExpression = "tagA || tagB" 指定tag 的消费
- */
- @Service
- @Slf4j
- @RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}")
- public class RocketMqConsumer implements RocketMQListener<String>{
-
- @Override
- public void onMessage(String s) {
- log.info("topic_test: 所有的收到消息:"+s);
- }
-
- }
生产端是一样的,但是消费端需要增加一个参数
messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
- @Service
- @Slf4j
- @RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
- public class RocketMqConsumer implements RocketMQListener<String>{
-
- @Override
- public void onMessage(String s) {
- log.info("consumer2---topic_test: 所有的收到消息:"+s);
- }
-
- }
-
- // 第2个消费者类,他们都是一样的代码,
- //为了表示广播,就是一个消息,会被这两个消费者消费
-
- @Service
- @Slf4j
- @RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
- public class RocketMqConsumer implements RocketMQListener<String>{
-
- @Override
- public void onMessage(String s) {
- log.info("consumer1--topic_test: 所有的收到消息:"+s);
- }
-
- }
RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。