当前位置:   article > 正文

【RocketMQ】Springboot集成RocketMQ(三)_springboot3.0整合rocketmq 指定序列化器

springboot3.0整合rocketmq 指定序列化器

目录

新建工程

新建model

新建生产者

场景枚举类

新建消费者

修改配置文件

新增测试类


新建工程

新建一个SpringBoot项目,引入RocketMQ 的 pom

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.0.4</version>
  5. </dependency>

新建model

  1. package com.gane.rocketmq.model;
  2. public class User {
  3. private String name;
  4. private int age;
  5. public String getName() {
  6. return name;
  7. }
  8. public void setName(String name) {
  9. this.name = name;
  10. }
  11. public int getAge() {
  12. return age;
  13. }
  14. public void setAge(int age) {
  15. this.age = age;
  16. }
  17. }

新建生产者

  1. package com.gane.rocketmq.producer;
  2. import com.gane.rocketmq.enums.SceneEnum;
  3. import org.apache.rocketmq.client.producer.SendCallback;
  4. import org.apache.rocketmq.client.producer.SendResult;
  5. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.messaging.support.MessageBuilder;
  8. import org.springframework.stereotype.Component;
  9. @Component
  10. public class RocketMQProducerService {
  11. @Autowired
  12. private RocketMQTemplate rocketMQTemplate;
  13. /**
  14. * 普通发送
  15. *
  16. * @param scene
  17. * @param payload
  18. */
  19. public void send(SceneEnum scene, Object payload) {
  20. rocketMQTemplate.convertAndSend(SceneEnum.destination(scene), payload);
  21. }
  22. /**
  23. * 同步发送
  24. *
  25. * @param scene
  26. * @param payload
  27. * @return
  28. */
  29. public SendResult sendSync(SceneEnum scene, Object payload) {
  30. SendResult sendResult = rocketMQTemplate.syncSend(SceneEnum.destination(scene), payload);
  31. System.out.println("同步发送结果为:" + sendResult);
  32. return sendResult;
  33. }
  34. /**
  35. * 发送异步消息
  36. *
  37. * @param scene
  38. * @param payload
  39. */
  40. public void sendASync(SceneEnum scene, Object payload) {
  41. rocketMQTemplate.asyncSend(SceneEnum.destination(scene), payload, new SendCallback() {
  42. @Override
  43. public void onSuccess(SendResult sendResult) {
  44. System.out.println("异步发送成功啦");
  45. }
  46. @Override
  47. public void onException(Throwable throwable) {
  48. System.out.println("异步发送出异常啦");
  49. }
  50. });
  51. return;
  52. }
  53. /**
  54. * 发送延时消息<br/>
  55. * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  56. */
  57. public void sendDelay(SceneEnum scene, Object payload, int delayLevel) {
  58. rocketMQTemplate.syncSend(SceneEnum.destination(scene), MessageBuilder.withPayload(payload).build(), 2000, delayLevel);
  59. }
  60. /**
  61. * 发送单向消息(不关心发送结果,如日志)
  62. */
  63. public void sendOneWayMsg(SceneEnum scene, Object payload) {
  64. rocketMQTemplate.sendOneWay(SceneEnum.destination(scene), payload);
  65. }
  66. }

场景枚举类

引入场景,MQ的topic以场景命名

  1. package com.gane.rocketmq.enums;
  2. public enum SceneEnum {
  3. USER_REGISTER("11000001", "USER_REGISTER"),
  4. USER_REMOVE("11000002", "USER_REMOVE"),
  5. ORDER_CREATE("12000001", "ORDER_CREATE"),
  6. ORDER_CLOSED("12000002", "ORDER_CLOSED"),
  7. ;
  8. private String sceneCode;
  9. private String desc;
  10. SceneEnum(String sceneCode, String desc) {
  11. this.sceneCode = sceneCode;
  12. this.desc = desc;
  13. }
  14. public String getSceneCode() {
  15. return sceneCode;
  16. }
  17. public String getDesc() {
  18. return desc;
  19. }
  20. public static String destination(SceneEnum scene) {
  21. String topic = "TP_S_" + scene.getSceneCode().substring(0, 4) + "%" + "EC_EVENT_" + scene.getSceneCode().substring(4, 8);
  22. String tag = scene.getDesc();
  23. return topic + ":" + tag;
  24. }
  25. }

新建消费者

  1. package com.gane.rocketmq.consumer;
  2. import com.alibaba.fastjson.JSON;
  3. import com.gane.rocketmq.model.User;
  4. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  5. import org.apache.rocketmq.spring.core.RocketMQListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. @RocketMQMessageListener(topic = "TP_S_1100%EC_EVENT_0001", consumerGroup = "USER_CONSUMER")
  9. public class UserRegisterConsumerService implements RocketMQListener<User> {
  10. @Override
  11. public void onMessage(User payload) {
  12. System.out.println("消费消息:" + JSON.toJSONString(payload));
  13. }
  14. }

修改配置文件

  1. server.port=8081
  2. rocketmq.name-server=127.0.0.1:9876
  3. rocketmq.producer.group=USER_PRODUCER

新增测试类

  1. package com.gane.rocketmq.controller;
  2. import com.gane.rocketmq.enums.SceneEnum;
  3. import com.gane.rocketmq.model.User;
  4. import com.gane.rocketmq.producer.RocketMQProducerService;
  5. import org.apache.rocketmq.client.producer.SendResult;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import javax.annotation.Resource;
  9. @RestController
  10. public class TestController {
  11. @Resource
  12. private RocketMQProducerService producerService;
  13. @GetMapping("/publish")
  14. public SendResult publish() {
  15. User user = new User();
  16. user.setName("maple");
  17. user.setAge(18);
  18. producerService.send(SceneEnum.USER_REGISTER, user);
  19. System.out.println("第一个发送成功");
  20. SendResult sendResult = producerService.sendSync(SceneEnum.USER_REGISTER, user);
  21. System.out.println();
  22. System.out.println("第二个发送成功");
  23. producerService.sendASync(SceneEnum.USER_REGISTER, user);
  24. System.out.println();
  25. System.out.println("第三个发送成功");
  26. producerService.sendOneWayMsg(SceneEnum.USER_REGISTER, user);
  27. System.out.println("第四个发送成功");
  28. producerService.sendDelay(SceneEnum.USER_REGISTER, user, 2);
  29. System.out.println("第五个发送成功");
  30. return sendResult;
  31. }
  32. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/434650
推荐阅读
相关标签
  

闽ICP备14008679号