当前位置:   article > 正文

RabbitMq的使用_rabbitmq 手动创建 exchange

rabbitmq 手动创建 exchange

以提供者、消费者为例、

准备 :

        创建Springboot项目 消费、提供模块、配置yml文件、导入依赖

消费者:

        Yaml文件

  1. server:
  2. port: 1234
  3. spring:
  4. rabbitmq:
  5. host: 127.0.0.1
  6. port: 5672
  7. username: guest
  8. password: guest

          Pom依赖

  1. <dependencies>
  2. <!-- rabbitmq 的依赖 -->
  3. <dependency>
  4. <groupId>org.springframework.boot</groupId>
  5. <artifactId>spring-boot-starter-amqp</artifactId>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-web</artifactId>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-test</artifactId>
  14. <scope>test</scope>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.springframework.amqp</groupId>
  18. <artifactId>spring-rabbit-test</artifactId>
  19. <scope>test</scope>
  20. </dependency>
  21. </dependencies>

提供者:

        Yaml文件

  1. server:
  2. port: 12345
  3. spring:
  4. rabbitmq:
  5. host: 127.0.0.1
  6. port: 5672
  7. username: guest
  8. password: guest

提供者 建立一个消息的发送接口:

  1. public interface IMessageProducerService {
  2. public void sendMessage1(String msg);
  3. public void sendMessage2(Map<String,Object> map);
  4. public void sendMessage3(String msg);
  5. public void sendDelayedMessage(String msg);
  6. }

再建立一个接口的实现类

  1. @Component
  2. @Service
  3. public class MessageProducerServiceImpl implements IMessageProducerService{
  4. @Resource
  5. private RabbitTemplate template;
  6. private void sendMessage(String exchange,String routingKey,Object message){
  7. //exchange 交换机的名字、routingkey 路由键、message 发送的消息
  8. //提供者发送 消息到交换机,不是消息队列、那么交换机、路由键 必须存在、否则报错
  9. //交换机和队列每次修改**持久化**和**自动删除**等属性都要**先删除**原来交换机和队列,
  10. //否则启动就异常,报创建**队列**失败
  11. this.template.convertAndSend(exchange,routingKey,message);
  12. }
  13. @Override
  14. public void sendMessage1(String msg) {
  15. this.sendMessage("dx","rk",msg);
  16. }
  17. @Override
  18. public void sendMessage2(Map<String, Object> map) {
  19. this.sendMessage(
  20. "myfan","",map);
  21. }
  22. @Override
  23. public void sendMessage3(String msg) {
  24. this.sendMessage("mytopic","a.b.log.dao",msg);
  25. this.sendMessage("mytopic","log.dao",msg);
  26. this.sendMessage("mytopic","log.d.c",msg);
  27. this.sendMessage("mytopic","a.log.c",msg);
  28. }
  29. }

消费者通过代码创建 交换机、消息队列、路由键、设置消息队列过期时间

一定要先启动消费者、通过@Bean 和@Binding 创建交换机、消息队列和绑定路由

DirectExchange 直连模式、FanoutExchange 广播模式、TopicExchange、

  1. @Configuration
  2. //延时队列只能通过代码创建绑定或Web端手动创建绑定
  3. public class ConsumerConfiger {
  4. public static final String EXCHANGE = "dx"; // 交换空间名称
  5. public static final String ROUTINGKEY = "rk"; // 设置路由key
  6. //1、以直连的方式创建
  7. @Primary
  8. @Bean
  9. public DirectExchange directExchange() {
  10. // 要创建的交换机
  11. // 持久化(存硬盘,重启还在,默认true)
  12. // 自动删除(当所有绑定队列都不在使用时删除交换机)
  13. return new DirectExchange(EXCHANGE, true, false);
  14. }
  15. @Bean
  16. public Queue queue11() {
  17. // 要创建的队列
  18. // 持久化(保存硬盘,重启存在,默认true)
  19. // 排他(只有创建它的连接可用,连接关闭时无论队列中有没有消息会删除队列)
  20. // 自动删除(当队列中有消息时,无论是否排他,关闭连接都不会删除队列,此时消费者消费完消息后再断开消费者,队列会被自动删除,删除后队列消息也丢失)
  21. // args参数设置队列最多5条消息(保留最后的5条消息,前面的全部丢失)
  22. return new Queue("q11",true,true,false,args);
  23. }
  24. @Bean
  25. public Queue queue12() {
  26. // args参数设置队列过期时间10秒,10秒后所有消息清空
  27. Map<String,Object> args = new HashMap<>();
  28. args.put("x-message-ttl",10000);
  29. return new Queue("q12", true,false,false,args);
  30. }
  31. //1、看binding 就知道是绑定的意思、绑定 exchange 交换机 下面的 queue 消息队列 下面的
  32. // routingkey
  33. @Bean
  34. public Binding bindingExchangeQueue11(DirectExchange exchange, Queue queue11) {
  35. return BindingBuilder.bind(queue1).to(exchange).with(ROUTINGKEY);
  36. }
  37. @Bean
  38. public Binding bindingExchangeQueue12(DirectExchange exchange, Queue queue12) {
  39. return BindingBuilder.bind(queue2).to(exchange).with(ROUTINGKEY);
  40. }
  41. //2、以广播模式创建、
  42. /**
  43. * FanoutExchange 广播模式
  44. * @return
  45. */
  46. @Bean
  47. public FanoutExchange getFanoutExchange(){
  48. return new FanoutExchange("myfan");
  49. }
  50. @Bean
  51. public Queue queue21() {
  52. return new Queue("q21");
  53. }
  54. @Bean
  55. public Queue queue22() {
  56. return new Queue("q22");
  57. }
  58. @Bean
  59. public Binding bindingExchangeQueue21(FanoutExchange exchange, Queue queue21) {
  60. return BindingBuilder.bind(queue21).to(exchange);
  61. }
  62. @Bean
  63. public Binding bindingExchangeQueue22(FanoutExchange exchange, Queue queue22) {
  64. return BindingBuilder.bind(queue22).to(exchange);
  65. }
  66. //3、TopicExchange
  67. // "*"用于匹配一个单词,比如"a","abc"等;
  68. // "#"用于匹配0个或者多个单词,比如"", "abc", "abc.def"等
  69. @Bean
  70. public TopicExchange getTopicExchange(){
  71. return new TopicExchange("topic");
  72. }
  73. @Bean
  74. public Queue queue31() {
  75. return new Queue("q31");
  76. }
  77. @Bean
  78. public Queue queue32() {
  79. return new Queue("q32");
  80. }
  81. @Bean
  82. public Binding bindingExchangeQueue31(TopicExchange exchange, Queue queue31) {
  83. return BindingBuilder.bind(queue31).to(exchange).with("#.log.*");
  84. }
  85. @Bean
  86. public Binding bindingExchangeQueue32(TopicExchange exchange, Queue queue32) {
  87. return BindingBuilder.bind(queue32).to(exchange).with("*.log.#");
  88. }
  89. }

消费者实现监听处理类:、负责接收提供者发送的消息、

  1. @Service
  2. public class MessageConsumer {
  3. @RabbitListener(queues = "q11")
  4. public void receiveMessage1(String msg){
  5. System.out.println("【*** 接收消息11 ***】:"+msg);
  6. }
  7. //注解方式绑定交换机和队列 无需创建配置类、
  8. @RabbitListener(bindings = @QueueBinding(
  9. value = @Queue(value = "q11"),
  10. exchange = @Exchange(value = "dx",type = ExchangeTypes.DIRECT)
  11. ))
  12. //public void receiveMessage(String msg) { // 进行消息接收处理
  13. //System.out.println("【*** q1接收消息 ***】" + msg);
  14. //}
  15. @RabbitListener(queues = "q12")
  16. public void receiveMessage2(String msg){
  17. System.out.println("【*** 接收消息12 ***】:"+msg);
  18. }
  19. @RabbitListener(queues = {"q21","q22"})
  20. public void receiveMessage212(Map<String, Object> map){
  21. System.out.println("q21-22:"+map);
  22. }
  23. }

最后一步、提供者创建Test测试类、或者是像我一样创建控制层、调用方法使用

  1. @RestController
  2. public class MessageController {
  3. @Resource
  4. private IMessageProducerService service;
  5. @Resource
  6. private RabbitTemplate template;
  7. @RequestMapping("/test1")
  8. public void testMessage1() throws InterruptedException {
  9. service.sendMessage1("我发的第一条消息!"); 异常问题待会说
  10. /*
  11. for (int i=0;i<100;i++){ //有5条多余+10条超时的进入死信队列
  12. Thread.sleep(100);
  13. this.service.sendMessage1("直连消息:"+i);
  14. }
  15. */
  16. System.out.println("消息发送完毕");
  17. }
  18. @RequestMapping("/test2")
  19. public void testMessage2() {
  20. Map<String,Object> map = new HashMap<>();
  21. map.put("sid",1);
  22. map.put("nums",2);
  23. this.service.sendMessage2(map);
  24. System.out.println("消息发送完毕");
  25. }
  26. @RequestMapping("/test3")
  27. public void testMessage3() {
  28. this.service.sendMessage3("我发的topic交换机消息!");
  29. System.out.println("消息发送完" +
  30. "毕");
  31. }
  32. @RequestMapping("/test4")
  33. public void testMessage4() {
  34. this.service.sendDelayedMessage("我发的延时交换机消息!");
  35. System.out.println("消息发送完" +
  36. "毕");
  37. }
  38. }

以上是最基本的RabbitMq的使用 、如果有异常会出现 消息队列没有接收到消息、导致方法无线循环、我们可以配置 出现异常、只出现指定次数、

消费者配置yml 文件:

  1. spring:
  2. rabbitmq:
  3. #publisher-confirms: true # 发送确认 版本似乎不再支持
  4. #publisher-returns: true # 路由失败回调
  5. template:
  6. # 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
  7. mandatory: true
  8. listener:
  9. simple:
  10. retry:
  11. enabled: true
  12. max-attempts: 3 # 重试3
  13. prefetch: 1 # 每次从RabbitMQ获取的消息数量
  14. default-requeue-rejected: false
  15. concurrency: 1 # 每个队列启动的消费者数量
  16. max-concurrency: 5 # 每个队列最大的消费者数量
  17. acknowledge-mode: manual # 签收模式为手动签收,需要在代码中手动ACK

在消费者的监听处理类 随便找个方法 加上 int i = 10 / 0; 就进入异常、最后发现最多执行三次异常、

接下来: RabbitMq延时队列的使用

消费者配置类

  1. @Configuration
  2. public class ConsumerConfig {
  3. /**
  4. *
  5. * 延迟交换机(x-delayed-type、x-delayed-message固定写法)
  6. */
  7. @Bean("delayExchange")
  8. public CustomExchange delayExchange(){
  9. Map<String,Object> args = new HashMap<>();
  10. //x-delayed-type 固定写法
  11. args.put("x-delayed-type","direct");
  12. return new CustomExchange("delay.exchange","x-delayed-message",true,true,args);
  13. }
  14. @Bean("delayQueue")
  15. public Queue delayQueue(){
  16. return new Queue("delay.queue",true,false,false);
  17. }
  18. @Bean
  19. public Binding bindingDelayExchangeQueue(CustomExchange delayExchange, Queue delayQueue){
  20. return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.routkey").noargs();
  21. }
  22. }

消费者监听消息队列类

  1. @Component
  2. public class MessageConsumer {
  3. /**
  4. * 延时队列
  5. * @param channel
  6. * @param message
  7. */
  8. @RabbitListener(queues = "delay.queue")
  9. public void receiveDelay(Channel channel, Message message){
  10. System.out.println("receiveDelay:" + message);
  11. System.out.println("delay:" + message.getMessageProperties());
  12. System.out.println("message body:"+new String(message.getBody()));
  13. }
  14. /**
  15. * 直接接收延时消息
  16. */
  17. @RabbitListener(queues = {"delay.queue"})
  18. public void receiveDelay(String msg){
  19. System.out.println("delay.queue:"+msg);
  20. }
  21. }

提供者:发送类

        

  1. @Component
  2. @Service
  3. public class MessageProducerServiceImpl implements IMessageProducerService{
  4. @Resource
  5. private RabbitTemplate template;
  6. /**
  7. * 发延时队列
  8. * @param msg
  9. */
  10. @Override
  11. public void sendDelayedMessage(String msg){
  12. //延时队列 一定要下载最新对应的ez文件放入plugs 文件夹里,版本不同、没下载都无法生效
  13. template.convertAndSend("delay.exchange","delay.routkey", msg, message -> {
  14. // 设置过期时间
  15. message.getMessageProperties().setDelay(5000);
  16. return message;
  17. });
  18. }

这就是延时队列的使用、

往下走—— 死信队列

        

  1. //代码创建交换机队列并绑定
  2. @Configuration
  3. public class MessageConfig {
  4. /**
  5. * 直连
  6. * @return
  7. */
  8. @Bean
  9. public DirectExchange directExchange(){
  10. return new DirectExchange("direct",true,true);
  11. }
  12. @Bean
  13. public Queue queue11(){
  14. Map<String,Object> args = new HashMap<>();
  15. args.put("x-max-length",5); //超过5个的进入死信队列 像我发送100次请求,那么到了q11 6个请求,有一个会进入死信队列,超过了的都得进去
  16. args.put("x-dead-letter-exchange","deadExchange");
  17. args.put("x-dead-letter-routing-key","deadRouting");
  18. return new Queue("q11",true,false,false,args);
  19. }
  20. @Bean
  21. public Queue queue12(){
  22. Map<String,Object> args = new HashMap<>();
  23. args.put("x-message-ttl",10000); //超过10秒未消费的进入死信队列
  24. args.put("x-dead-letter-exchange","deadExchange");
  25. args.put("x-dead-letter-routing-key","deadRouting");
  26. return new Queue("q12",true,false,false,args);
  27. }
  28. //死信交换机
  29. @Bean
  30. public DirectExchange deadExchange(){
  31. return new DirectExchange("dead.exchange",true,false);
  32. }
  33. //死信队列
  34. @Bean
  35. public Queue deadQueue(){
  36. Map<String,Object> args = new HashMap<>();
  37. args.put("x-dead-letter-exchange","deadExchange");
  38. args.put("x-dead-letter-routing-key","deadRouting");
  39. return new Queue("dead.Queue",true,false,false,args);
  40. }
  41. @Bean
  42. public Binding bindingExchangeQueue13() {
  43. return BindingBuilder.bind(deadQueue())
  44. .to(deadExchange()).with("deadRouting");
  45. }
  46. @Bean
  47. public Binding bindingExchangeQueue11(DirectExchange directExchange, Queue queue11) {
  48. return BindingBuilder.bind(queue11)
  49. .to(directExchange).with("rk");
  50. }
  51. @Bean
  52. public Binding bindingExchangeQueue12(DirectExchange directExchange, Queue queue12) {
  53. return BindingBuilder.bind(queue12)
  54. .to(directExchange).with("rk");
  55. }
  56. }

提供者发送消息:

        

  1. @RestController
  2. public class MessageController {
  3. @Resource
  4. private IMessageProducerService service;
  5. @Resource
  6. private RabbitTemplate template;
  7. @RequestMapping("/test1")
  8. public void testMessage1() throws InterruptedException {
  9. // service.sendMessage1("我发的第一条消息!");
  10. for (int i=0;i<100;i++){ //有5条多余+10条超时的进入死信队列
  11. Thread.sleep(100);
  12. this.service.sendMessage1("直连消息:"+i);
  13. }
  14. System.out.println("消息发送完毕");
  15. }
  16. @RequestMapping("/test2")
  17. public void testMessage2() {
  18. Map<String,Object> map = new HashMap<>();
  19. map.put("sid",1);
  20. map.put("nums",2);
  21. this.service.sendMessage2(map);
  22. System.out.println("消息发送完毕");
  23. }
  24. @RequestMapping("/test3")
  25. public void testMessage3() {
  26. this.service.sendMessage3("我发的topic交换机消息!");
  27. System.out.println("消息发送完" +
  28. "毕");
  29. }
  30. @RequestMapping("/test4")
  31. public void testMessage4() {
  32. this.service.sendDelayedMessage("我发的延时交换机消息!");
  33. System.out.println("消息发送完" +
  34. "毕");
  35. }
  36. }

这就是RabbitMq的一个简单的使用,如果发现我有问题,请一定要指出来,我需要进步,我愿意改正

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/734114
推荐阅读
相关标签
  

闽ICP备14008679号