赞
踩
目录
新建一个SpringBoot项目,引入RocketMQ 的 pom
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.0.4</version>
- </dependency>
- package com.gane.rocketmq.model;
-
- public class User {
-
- private String name;
- private int age;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public int getAge() {
- return age;
- }
-
- public void setAge(int age) {
- this.age = age;
- }
- }

- package com.gane.rocketmq.producer;
-
- import com.gane.rocketmq.enums.SceneEnum;
- import org.apache.rocketmq.client.producer.SendCallback;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Component;
-
- @Component
- public class RocketMQProducerService {
-
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
-
- /**
- * 普通发送
- *
- * @param scene
- * @param payload
- */
- public void send(SceneEnum scene, Object payload) {
-
- rocketMQTemplate.convertAndSend(SceneEnum.destination(scene), payload);
- }
-
- /**
- * 同步发送
- *
- * @param scene
- * @param payload
- * @return
- */
- public SendResult sendSync(SceneEnum scene, Object payload) {
-
- SendResult sendResult = rocketMQTemplate.syncSend(SceneEnum.destination(scene), payload);
-
- System.out.println("同步发送结果为:" + sendResult);
-
- return sendResult;
- }
-
- /**
- * 发送异步消息
- *
- * @param scene
- * @param payload
- */
- public void sendASync(SceneEnum scene, Object payload) {
-
- rocketMQTemplate.asyncSend(SceneEnum.destination(scene), payload, new SendCallback() {
-
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println("异步发送成功啦");
- }
-
- @Override
- public void onException(Throwable throwable) {
- System.out.println("异步发送出异常啦");
- }
- });
-
- return;
- }
-
- /**
- * 发送延时消息<br/>
- * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- */
- public void sendDelay(SceneEnum scene, Object payload, int delayLevel) {
- rocketMQTemplate.syncSend(SceneEnum.destination(scene), MessageBuilder.withPayload(payload).build(), 2000, delayLevel);
- }
-
- /**
- * 发送单向消息(不关心发送结果,如日志)
- */
- public void sendOneWayMsg(SceneEnum scene, Object payload) {
- rocketMQTemplate.sendOneWay(SceneEnum.destination(scene), payload);
- }
-
- }

引入场景,MQ的topic以场景命名
- package com.gane.rocketmq.enums;
-
- public enum SceneEnum {
-
- USER_REGISTER("11000001", "USER_REGISTER"),
- USER_REMOVE("11000002", "USER_REMOVE"),
-
- ORDER_CREATE("12000001", "ORDER_CREATE"),
- ORDER_CLOSED("12000002", "ORDER_CLOSED"),
- ;
-
- private String sceneCode;
-
- private String desc;
-
- SceneEnum(String sceneCode, String desc) {
- this.sceneCode = sceneCode;
- this.desc = desc;
- }
-
- public String getSceneCode() {
- return sceneCode;
- }
-
- public String getDesc() {
- return desc;
- }
-
- public static String destination(SceneEnum scene) {
-
- String topic = "TP_S_" + scene.getSceneCode().substring(0, 4) + "%" + "EC_EVENT_" + scene.getSceneCode().substring(4, 8);
-
- String tag = scene.getDesc();
-
- return topic + ":" + tag;
- }
- }

- package com.gane.rocketmq.consumer;
-
- import com.alibaba.fastjson.JSON;
- import com.gane.rocketmq.model.User;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
- @Component
- @RocketMQMessageListener(topic = "TP_S_1100%EC_EVENT_0001", consumerGroup = "USER_CONSUMER")
- public class UserRegisterConsumerService implements RocketMQListener<User> {
-
- @Override
- public void onMessage(User payload) {
- System.out.println("消费消息:" + JSON.toJSONString(payload));
- }
- }

- server.port=8081
-
- rocketmq.name-server=127.0.0.1:9876
- rocketmq.producer.group=USER_PRODUCER
- package com.gane.rocketmq.controller;
-
- import com.gane.rocketmq.enums.SceneEnum;
- import com.gane.rocketmq.model.User;
- import com.gane.rocketmq.producer.RocketMQProducerService;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
-
- @RestController
- public class TestController {
-
- @Resource
- private RocketMQProducerService producerService;
-
- @GetMapping("/publish")
- public SendResult publish() {
-
- User user = new User();
- user.setName("maple");
- user.setAge(18);
-
-
- producerService.send(SceneEnum.USER_REGISTER, user);
- System.out.println("第一个发送成功");
-
- SendResult sendResult = producerService.sendSync(SceneEnum.USER_REGISTER, user);
- System.out.println();
- System.out.println("第二个发送成功");
-
- producerService.sendASync(SceneEnum.USER_REGISTER, user);
- System.out.println();
- System.out.println("第三个发送成功");
-
- producerService.sendOneWayMsg(SceneEnum.USER_REGISTER, user);
- System.out.println("第四个发送成功");
-
- producerService.sendDelay(SceneEnum.USER_REGISTER, user, 2);
- System.out.println("第五个发送成功");
-
- return sendResult;
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。