当前位置:   article > 正文

SpringBoot整合RabbitMQ (持续更新中)

SpringBoot整合RabbitMQ (持续更新中)

RabbitMQ 官网地址:RabbitMQ: One broker to queue them all | RabbitMQ

RabbitMQ 与 Erlang 版本兼容关系

  • 3.13.0
  • 26.0
  • 26.2.x
  • The 3.13 release series is compatible with Erlang 26.

  • OpenSSL 3 support in Erlang is considered to be mature and ready for production use.

  • Erlang 26.1 and later versions supports FIPS mode on OpenSSL 3

  • 3.12.13
  • 3.12.12
  • 3.12.11
  • 3.12.10
  • 25.0
  • 26.2.x
  • The 3.12 release series is compatible with Erlang 26.

  • OpenSSL 3 support in Erlang is considered to be mature enough for production.

  • Erlang 26.1 and later versions supports FIPS mode on OpenSSL 3

  • 3.12.9
  • 3.12.8
  • 3.12.7
  • 3.12.6
  • 3.12.5
  • 25.0
  • 26.1.x
  • The 3.12 release series is compatible with Erlang 26.

  • OpenSSL 3 support in Erlang is considered to be mature enough for production.

  • Erlang 26.1 supports FIPS mode on OpenSSL 3

  • 3.12.4
  • 3.12.3
  • 3.12.2
  • 3.12.1
  • 3.12.0
  • 25.0
  • 26.0.x
  • The 3.12 release series is compatible with Erlang 26.

  • OpenSSL 3 support in Erlang is considered to be mature enough for production.

  • 3.11.28
  • 3.11.27
  • 3.11.26
  • 3.11.25
  • 3.11.24
  • 3.11.23
  • 3.11.22
  • 3.11.21
  • 3.11.20
  • 3.11.19
  • 3.11.18
  • 3.11.17
  • 3.11.16
  • 3.11.15
  • 3.11.14
  • 3.11.13
  • 3.11.12
  • 3.11.11
  • 3.11.10
  • 3.11.9
  • 3.11.8
  • 3.11.7
  • 3.11.6
  • 3.11.5
  • 3.11.4
  • 3.11.3
  • 3.11.2
  • 3.11.1
  • 3.11.0
  • 25.0
  • 25.3.x
  • Erlang 26 is supported starting with RabbitMQ 3.12.0.

  • As of Erlang 25.1, OpenSSL 3.0 support in Erlang is considered to be mature enough for production.

  • Erlang 25 before 25.0.2 is affected by CVE-2022-37026, a CVE with critical severity (CVSS 3.x Base Score: 9.8)

RabbitMQ 安装

下载地址:RabbitMQ: One broker to queue them all | RabbitMQ

exe文件点击安装即可(其他系统版本看官网)

RabbitMQ管理界面

管理界面的默认端口:15672 ,默认账户/密码: gurest/guest

SpringBoot整合RabbitMQ

1.maven 依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. <version>2.7.5</version>
  5. </dependency>

2.添加配置文件

  1. spring.rabbitmq.host=127.0.0.1
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest

3.添加配置文件

  1. package com.label.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author sszdzq
  8. */
  9. @Component
  10. public class RabbitMqConfig {
  11. /**
  12. * 定义一个主题类型的交换机
  13. *
  14. * @return
  15. */
  16. @Bean("topic-exchange")
  17. public Exchange topicExchange() {
  18. return ExchangeBuilder
  19. .topicExchange("topic-exchange") // 交换机类型,交换机名称
  20. .durable(true) //ture为持久化,存到磁盘,false存到内存
  21. .build();
  22. }
  23. /**
  24. * 定义一个队列
  25. *
  26. * @return
  27. */
  28. @Bean("messageQueue")
  29. public Queue messageQueue() {
  30. return new Queue("messageQueue");
  31. }
  32. /**
  33. * 交换机、队列、路由键 进行绑定
  34. *
  35. * @param exchange //交换机
  36. * @param queue //队列
  37. * @return
  38. */
  39. @Bean
  40. public Binding bindQueueAndExchange(@Qualifier("topic-exchange") Exchange exchange, @Qualifier("messageQueue") Queue queue) {
  41. return BindingBuilder
  42. .bind(queue)
  43. .to(exchange)
  44. .with("news.*") //路由键
  45. .noargs();
  46. }
  47. }

4.创建生产者与消费者

  1. package com.label.contoller;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.rabbitmq.client.Channel;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.json.JSONException;
  6. import org.springframework.amqp.core.ExchangeTypes;
  7. import org.springframework.amqp.core.Message;
  8. import org.springframework.amqp.core.MessageProperties;
  9. import org.springframework.amqp.rabbit.annotation.*;
  10. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.http.ResponseEntity;
  13. import org.springframework.web.bind.annotation.PostMapping;
  14. import org.springframework.web.bind.annotation.RequestBody;
  15. import org.springframework.web.bind.annotation.RequestMapping;
  16. import org.springframework.web.bind.annotation.RestController;
  17. import java.io.IOException;
  18. @RestController
  19. @Slf4j
  20. @RequestMapping(value = "/rabbitmq")
  21. public class TestController {
  22. @Autowired
  23. private RabbitTemplate rabbitTemplate;
  24. /**
  25. * 直连类型
  26. *
  27. * @param msg
  28. * @return
  29. */
  30. @PostMapping("/directSend")
  31. public ResponseEntity sendMsgDirect(String msg) {
  32. /**
  33. * 普通直接链接
  34. */
  35. rabbitTemplate.convertAndSend("direct_exchange", "direct_key", msg);
  36. /**
  37. * 设置属性 发送
  38. */
  39. rabbitTemplate.convertAndSend("direct_exchange", "direct_key", msg, e -> {
  40. MessageProperties messageProperties = e.getMessageProperties();
  41. /**
  42. * 单位为毫秒("6000",6秒钟)
  43. * 过期后如果设置了死信队列,消息进入死信队列
  44. * 没有设置死信直接丢弃
  45. */
  46. messageProperties.setExpiration("6000");
  47. return e;
  48. });
  49. return ResponseEntity.ok("3482347592");
  50. }
  51. /**
  52. * 扇形消息发送
  53. *
  54. * @param msg
  55. * @return
  56. */
  57. @PostMapping("/fanoutSend")
  58. public ResponseEntity sendMsgFanout(String msg) {
  59. /**
  60. * 广播模式下 没有路由建信息(填写也是无效)
  61. */
  62. rabbitTemplate.convertAndSend("fanout_exchange", "", "this is test message");
  63. return ResponseEntity.ok("3482347592");
  64. }
  65. /**
  66. * 广播模式消费
  67. *
  68. * @param msg
  69. */
  70. @RabbitListener(bindings = @QueueBinding(
  71. exchange = @Exchange(value = "fanout_exchange", type = ExchangeTypes.FANOUT),
  72. value = @Queue("fanout_queue")))
  73. public void customerFanout(String msg) {
  74. log.info("广播消费:{}", msg);
  75. }
  76. /**
  77. * 类型:主题
  78. * 生产者 (创建消息)
  79. * 主题消费发送
  80. */
  81. @PostMapping("/topic/send")
  82. public ResponseEntity producerTopicSend(@RequestBody JSONObject jb) throws JSONException {
  83. rabbitTemplate.convertAndSend("topic-exchange", jb.getString("topic"), jb.getString("msg"));
  84. return ResponseEntity.ok().build();
  85. }
  86. /**
  87. * 创建新的队列(绑定交换机:news.*,绑定路由键:news.330500000000)
  88. * x-expires: 队列的销毁时间
  89. *
  90. * @param msg
  91. */
  92. @RabbitListener(bindings = @QueueBinding(
  93. exchange = @Exchange(value = "topic-exchange", type = ExchangeTypes.TOPIC),
  94. value = @Queue(value = "message-one7", arguments = {
  95. @Argument(name = "x-expires", value = "10000", type = "java.lang.Integer")
  96. }),
  97. key = {"news.330500000000"}
  98. ))
  99. public void customerTopic1(String msg) {
  100. log.info("主题消费 news.330500000000 :{}", msg);
  101. }
  102. /**
  103. * 手动确认
  104. *
  105. * @param message
  106. * @param channel
  107. * @throws IOException
  108. */
  109. @RabbitListener(bindings = @QueueBinding(
  110. exchange = @Exchange(value = "topic-exchange", type = ExchangeTypes.TOPIC),
  111. value = @Queue(value = "message-one"),
  112. key = {"news.*"}
  113. ))
  114. public void customerTopic2(Message message, Channel channel) throws IOException {
  115. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  116. try {
  117. log.info("主题消费 news.* :{}", new String(message.getBody()));
  118. channel.basicAck(deliveryTag, true);
  119. } catch (Exception e) {
  120. channel.basicNack(deliveryTag, true, true);
  121. throw new RuntimeException(e);
  122. }
  123. }
  124. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/711604
推荐阅读
相关标签
  

闽ICP备14008679号