赞
踩
- <!-- rocketmq服务端版本4.9.3 -->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.2</version>
- </dependency>
- #rokectmq连接信息配置和默认分组
- rocketmq:
- name-server: 192.168.1.208:9876;192.168.1.208:9877
- producer:
- group: guizhou-producer-ext
- consumer:
- group: guizhou-consumer-ext
- package com.dzt.manager.message.producer;
-
- import com.alibaba.fastjson.JSON;
- import com.dzt.constant.RocketMqConst;
- import org.apache.rocketmq.client.producer.SendCallback;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.Resource;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.UUID;
-
- /**
- * @author dzt
- */
- @Service
- public class LiveSenderService {
- public static final Logger log = LoggerFactory.getLogger(LiveSenderService.class);
-
- @Resource
- private RocketMQTemplate rocketMQTemplate;
-
-
- /**
- * 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等)
- */
- public void send() {
- Map<String, String> map = new HashMap<>();
- map.put("orderId", UUID.randomUUID().toString());
- rocketMQTemplate.convertAndSend(RocketMqConst.topic_name + ":tag1", map);
- // 等价于上面一行
- //rocketMQTemplate.send(topic_name + ":tag1",
- MessageBuilder.withPayload(map).build());
- }
-
- /**
- * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
- * (msgBody也可以是对象,sendResult为返回的发送结果)
- */
- public SendResult sendMsg(String msgBody) {
- SendResult sendResult = rocketMQTemplate.syncSend(RocketMqConst.topic_name,
- MessageBuilder.withPayload(msgBody).build());
- log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
- return sendResult;
- }
-
- /**
- * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理
- * 相关成功失败时的逻辑)
- * (适合对响应时间敏感的业务场景)
- */
- public void sendAsyncMsg(String msgBody) {
- rocketMQTemplate.asyncSend(RocketMqConst.topic_name,
- MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- // 处理消息发送成功逻辑
- log.info("消息发送成功,响应结果:{}", sendResult);
- }
-
- @Override
- public void onException(Throwable throwable) {
- // 处理消息发送异常逻辑,可以根据异常确定是否要进行重试或其他业务
- log.info("消息发送失败,出现异常", throwable);
- }
- });
- }
-
- /**
- * 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)
- * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m
- 9m 10m 20m 30m 1h 2h
- */
- public void sendDelayMsg(String msgBody, int delayLevel) {
- rocketMQTemplate.syncSend(RocketMqConst.topic_name,
- MessageBuilder.withPayload(msgBody).build(), 30 * 1000, delayLevel);
- }
-
- /**
- * 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)
- */
- public void sendOneWayMsg(String msgBody) {
- rocketMQTemplate.sendOneWay(RocketMqConst.topic_name,
- MessageBuilder.withPayload(msgBody).build());
- }
-
- /**
- * 发送带tag的消息,直接在topic_name后面加上":tag"
- */
- public SendResult sendTagMsg(String msgBody) {
- return rocketMQTemplate.syncSend(RocketMqConst.topic_name + ":tag2",
- MessageBuilder.withPayload(msgBody).build());
- }
-
- public void sendOrderMsg(String msgBody) {
- log.info("开始发送顺序消息......");
- rocketMQTemplate.syncSendOrderly(RocketMqConst.topic_name, msgBody, "order");
- log.info("结束发送顺序消息......");
- }
-
-
- }

- package com.dzt.manager.message.consumer;
-
- import com.dzt.constant.RocketMqConst;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Service;
-
- /**
- * @author dzt
- * 消费模式默认为集群模式,即同一个消费者组内,多个实例负载均衡后,只有一个实例消费一次
- * 如果需要使用广播模式,只需要添加该属性:messageModel = MessageModel.BROADCASTING
- 泛型可以根据消息的具体类型酌情修改
- */
- @RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = RocketMqConst.topic_name)
- @Service
- public class LiveReceiveService implements RocketMQListener<String> {
- public static final Logger log = LoggerFactory.getLogger(LiveReceiveService.class);
- @Override
- public void onMessage(String message) {
- log.info("接收到消息:{}",message);
- }
- }

- package com.dzt.constant;
-
- /**
- * @author dzt
- */
- public class RocketMqConst {
- /**
- * 同一项目统一使用一个topic_name
- */
- public static final String topic_name = "dzt-ext-sys";
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。