当前位置:   article > 正文

RabbitMQ在java中的使用、整合_java rabbitmq

java rabbitmq

目录

初识RabbitMQ

1.什么是RabbitMQ

2.RabbitMQ同时支持同步调用和异步调用

RabbitMQ在java中整合

1.添加依赖

2.在yaml中配置RabbitMQ连接

3.添加配置类

 4.在测试类中测试

 总结


初识RabbitMQ

1.什么是RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,它基于高级消息队列协议(AMQP)设计。RabbitMQ可以作为消息代理、任务队列、事件传输系统等,广泛应用于分布式系统和微服务架构中

2.RabbitMQ同时支持同步调用和异步调用

1.同步(Synchronous):

        在同步模式下,生产者发送消息后,会等待直到消息被确认或者被拒绝,然后才继续执行后
        续的操作。

        这种方式确保了消息的发送和接收是顺序进行的,适用于需要严格顺序控制的场景

优点:

        时效性强,等待结果后返回

缺点:

        扩展性差

        性能下降

        级联失败问题

2.异步(Asynchronous)

        在异步模式下,生产者发送消息后,不需要等待消息确认,就可以继续发送下一条消息或者
        执行其他任务。

        这种方式可以提高消息处理的吞吐量,因为生产者不需要等待每条消息的确认。

        异步模式通常用于消息量大、对实时性要求不高的场景,或者当消费者处理速度较慢时,以
        避免生产者阻塞。

优点:

        耦合度低,扩展性强

        异步调用,无需等待,性能好

        故障隔离,下游服务故障不影响上游业务

        缓存消息,流量削峰填谷

缺点:

        不能立即得到调用结果,时效性差

        不能确定下游业务是否成功

        业务安全依赖于Broker的可靠性

Broker:消息代理

像查询就不需要用到异步调用,因为查询必须是要等查出来才能进行下一步操作的

RabbitMQ也有自己的管理界面,可以也可以在管理界面直接进行交换机,队列的建立

RabbitMQ在java中整合

1.添加依赖

以下是整合RabbitMQ的依赖和一些基本依赖

SpringAMQP是RabbitMQ使用的主要依赖

SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来
SpringAMQP就会把消息传递给当前方法

  1. <!--AMQP依赖,包含RabbitMQ-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <!--单元测试-->
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-test</artifactId>
  10. </dependency>
  11. <!--Jackson-->
  12. <dependency>
  13. <groupId>com.fasterxml.jackson.dataformat</groupId>
  14. <artifactId>jackson-dataformat-xml</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>com.fasterxml.jackson.core</groupId>
  18. <artifactId>jackson-databind</artifactId>
  19. </dependency>
  20. <!-- json转换依赖-->
  21. <dependency>
  22. <groupId>com.alibaba</groupId>
  23. <artifactId>fastjson</artifactId>
  24. <version>1.2.73</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.json</groupId>
  28. <artifactId>json</artifactId>
  29. <version>20210307</version>
  30. </dependency>
  31. <!--日志依赖-->
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-logging</artifactId>
  35. </dependency>
  36. <!--quartz框架-->
  37. <dependency>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-starter-quartz</artifactId>
  40. </dependency>
  41. <!-- mysql驱动依赖-->
  42. <dependency>
  43. <groupId>mysql</groupId>
  44. <artifactId>mysql-connector-java</artifactId>
  45. <version>8.0.27</version>
  46. </dependency>
  47. <!-- 数据库连接池依赖-->
  48. <dependency>
  49. <groupId>com.alibaba</groupId>
  50. <artifactId>druid</artifactId>
  51. <version>1.1.10</version>
  52. </dependency>
  53. <!-- lombak,类的简化-->
  54. <dependency>
  55. <groupId>org.projectlombok</groupId>
  56. <artifactId>lombok</artifactId>
  57. </dependency>
  58. <!--mybatisPlus-->
  59. <dependency>
  60. <groupId>com.baomidou</groupId>
  61. <artifactId>mybatis-plus-boot-starter</artifactId>
  62. <version>3.5.3.1</version>
  63. </dependency>

2.在yaml中配置RabbitMQ连接

  1. spring:
  2. rabbitmq:
  3. host: 这里本地就写本地的ip,服务器上的就写外网ip
  4. port: 5672
  5. virtual-host: /dahei
  6. username: 自己设置的账号
  7. password: 自己设置的密码

3.添加配置类

这个配置类是避免返回是序列化后的值,消息转换器,将JDK自带的GBK编码变成JSON格式

  1. @Configuration
  2. public class MessageConfig {
  3. @Bean
  4. public MessageConverter messageConverter() {
  5. return new Jackson2JsonMessageConverter();
  6. }
  7. }

下面这段代码的作用是确保当消息无法被RabbitMQ队列接收时(例如,因为路由键不匹配或队列不存在),应用程序能够收到通知,并记录相关信息以便调试和监控。这是确保消息传递可靠性的一个重要机制。通过这种方式,开发者可以知道哪些消息没有被成功处理,从而采取相应的补救措施。 

  1. @Slf4j
  2. @Configuration
  3. @RequiredArgsConstructor
  4. public class MQConfirmConfig implements ApplicationContextAware {
  5. private final RabbitTemplate rabbitTemplate;
  6. @Override
  7. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  8. // RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
  9. rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
  10. //配置回调
  11. @Override
  12. public void returnedMessage(ReturnedMessage returnedMessage) {
  13. log.debug("exchange:{},message:{}, Code:{}, Text:{}, key:{}",
  14. returnedMessage.getExchange(), returnedMessage.getMessage(), returnedMessage.getReplyCode(),
  15. returnedMessage.getReplyText(), returnedMessage.getRoutingKey());
  16. }
  17. });
  18. }
  19. }

 要想接收消息,交换机和队列是必不可少的,相应的队列要绑定到相应的交换机上

  1. /**
  2. * @Author: 大黑
  3. * @Date: 2024/4/1 0:36
  4. */
  5. @Configuration
  6. public class FanoutConfiguration {
  7. //首先声明一个交换机
  8. @Bean
  9. public FanoutExchange fanoutExchange() {
  10. //ExchangeBuilder.fanoutExchange("dahei.fanout1").build();这种创建方式跟new创建是一样的
  11. return new FanoutExchange("dahei.fanout1");
  12. }
  13. //声明一个队列
  14. @Bean
  15. public Queue queue() {
  16. //QueueBuilder.durable()(创建一个持久的队列,将消息写入磁盘中);这种创建方式跟new是一样的
  17. //return QueueBuilder.durable("fanout.queue3").lazy().build();(创建一个lazy Queue-惰性队列)
  18. return new Queue("fanout.queue3");//默认持久的
  19. }
  20. //绑定交换机和队列
  21. @Bean
  22. public Binding binding(FanoutExchange fanoutExchange, Queue queue) {
  23. return BindingBuilder.bind(queue).to(fanoutExchange);
  24. }
  25. //在绑定交换机和队列的时候还可以直接调用这个方法
  26. //这中方法和上面那种的作用是一样的
  27. // @Bean
  28. // public Binding binding() {
  29. // return BindingBuilder.bind(queue()).to(fanoutExchange());
  30. // }
  31. }

 4.在测试类中测试

这是基于注解的方式进行配置

@RabbitListener:消息监听注解,Spring管理

  1. /**
  2. * @Author: 大黑
  3. * @Date: 2024/3/29 0:07
  4. */
  5. @Component
  6. @Slf4j
  7. public class ListenerTest {
  8. @RabbitListener(queues = "simple.queue")
  9. public void listenerMq(String queueMsg) {
  10. // System.out.println("消费者收到了消息:" + queueMsg);
  11. log.info("接收到的消息为{}", queueMsg);
  12. }
  13. /**
  14. * work模式下的接收
  15. *
  16. * @param queueMsg
  17. * @throws InterruptedException
  18. */
  19. @RabbitListener(queues = "work.queue")
  20. public void workQueueMq1(String queueMsg) throws InterruptedException {
  21. // System.out.println("消费者收到了消息:" + queueMsg);
  22. log.info("消费者1" + "接收到的消息为{}", queueMsg);
  23. // Thread.sleep(20);
  24. }
  25. @RabbitListener(queues = "work.queue")
  26. public void workQueueMq2(String queueMsg) throws InterruptedException {
  27. // System.out.println("消费者收到了消息:" + queueMsg);
  28. log.info("消费者2" + "接收到的消息为{}", queueMsg);
  29. // Thread.sleep(200);
  30. }
  31. /**
  32. * fanout交换机模式下的接收
  33. *
  34. * @param queueMsg
  35. * @throws InterruptedException
  36. */
  37. @RabbitListener(queues = "fanout.queue1")
  38. public void fanoutQueueMq1(String queueMsg) throws InterruptedException {
  39. // System.out.println("消费者收到了消息:" + queueMsg);
  40. log.info("fanoutQueue1" + "接收到的消息为{}", queueMsg);
  41. // Thread.sleep(20);
  42. }
  43. @RabbitListener(queues = "fanout.queue2")
  44. public void fanoutQueueMq2(String queueMsg) throws InterruptedException {
  45. // System.out.println("消费者收到了消息:" + queueMsg);
  46. log.info("fanoutQueue2" + "接收到的消息为{}", queueMsg);
  47. // Thread.sleep(200);
  48. }
  49. /**
  50. * direct交换机模式下的接收
  51. *
  52. * @param queueMsg
  53. */
  54. @RabbitListener(queues = "direct.queue1")
  55. public void directQueueMq1(String queueMsg) {
  56. // System.out.println("消费者收到了消息:" + queueMsg);
  57. log.info("directQueueMq1" + "接收到的消息为{}", queueMsg);
  58. // Thread.sleep(20);
  59. }
  60. @RabbitListener(queues = "direct.queue2")
  61. public void directQueueMq2(String queueMsg) {
  62. // System.out.println("消费者收到了消息:" + queueMsg);
  63. log.info("directQueueMq2" + "接收到的消息为{}", queueMsg);
  64. // Thread.sleep(200);
  65. }
  66. /**
  67. * direct交换机模式下的接收(基于注解模式进行接收)
  68. *
  69. * @param queueMsg
  70. */
  71. @RabbitListener(bindings = @QueueBinding(
  72. value = @Queue(name = "direct.queue1", durable = "true"),
  73. exchange = @Exchange(name = "dahei.direct", type = ExchangeTypes.DIRECT), //交换机类型type,有枚举类可供选择,这里一般都会有默认值
  74. key = {"red", "blue"}//direct交换机key,数组类型,可以声明多个key
  75. ))
  76. public void directNewQueueMq1(String queueMsg) {
  77. // System.out.println("消费者收到了消息:" + queueMsg);
  78. log.info("directQueueMq1" + "接收到的消息为{}", queueMsg);
  79. // Thread.sleep(20);
  80. }
  81. @RabbitListener(bindings = @QueueBinding(
  82. value = @Queue(name = "direct.queue2", durable = "true"),
  83. exchange = @Exchange(name = "dahei.direct", type = ExchangeTypes.DIRECT), //交换机类型type,有枚举类可供选择,这里一般都会有默认值
  84. key = {"yellow", "blue"}//direct交换机key,数组类型,可以声明多个key
  85. ))
  86. public void directNewQueueMq2(String queueMsg) {
  87. // System.out.println("消费者收到了消息:" + queueMsg);
  88. log.info("directQueueMq2" + "接收到的消息为{}", queueMsg);
  89. // Thread.sleep(200);
  90. }
  91. /**
  92. * topic交换机模式
  93. * @param queueMsg
  94. */
  95. @RabbitListener(queues = "topic.queue1")
  96. public void topicQueueMq1(String queueMsg) {
  97. // System.out.println("消费者收到了消息:" + queueMsg);
  98. log.info("topicQueueMq1" + "接收到的消息为{}", queueMsg);
  99. // Thread.sleep(20);
  100. }
  101. @RabbitListener(queues = "topic.queue2")
  102. public void topicQueueMq2(String queueMsg) {
  103. // System.out.println("消费者收到了消息:" + queueMsg);
  104. log.info("topicQueueMq2" + "接收到的消息为{}", queueMsg);
  105. // Thread.sleep(200);
  106. }
  107. }
  1. @Slf4j
  2. @SpringBootTest
  3. class PublisherApplicationTests {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @Test
  7. void contextLoads() {
  8. String queueName = "simple.queue";
  9. String queueMsg = "你好,mq,这是伽有";
  10. rabbitTemplate.convertAndSend(queueName, queueMsg);
  11. }
  12. @Test
  13. //发送到队列
  14. void workQueue1() throws InterruptedException {
  15. //队列名称
  16. String queueName = "work.queue";
  17. for (int i = 0; i < 50; i++) {
  18. //发送的消息
  19. String queueMsg = "你好,workQueue_" + i;
  20. rabbitTemplate.convertAndSend(queueName, queueMsg);
  21. Thread.sleep(20);
  22. }
  23. }
  24. @Test
  25. //发送到fanout交换机
  26. void fanoutQueue() {
  27. String exeChange = "dahei.fanout";
  28. String queueMsg = "helle,tow queue";
  29. rabbitTemplate.convertAndSend(exeChange, null, queueMsg);
  30. }
  31. @Test
  32. //发送到direct交换机
  33. void directQueue() {
  34. String exeChange = "dahei.fanout";
  35. String queueMsg = "helle,tow queue";
  36. String routKet = "red";
  37. rabbitTemplate.convertAndSend(exeChange, routKet, queueMsg);
  38. }
  39. @Test
  40. //发送到topic交换机
  41. void topicQueue() {
  42. String exeChange = "dahei.topic";
  43. String queueMsg = "helle,tow queue";
  44. String routKet = "china.dahei";
  45. rabbitTemplate.convertAndSend(exeChange, routKet, queueMsg);
  46. }
  47. //发送其他类型的数据
  48. @Test
  49. void objectQueue() {
  50. Map<Object, Object> map = new HashMap<Object, Object>();
  51. map.put("name", "大黑");
  52. map.put("age", 22);
  53. rabbitTemplate.convertAndSend("object.queue", map);
  54. }
  55. //回调消息测试
  56. @Test
  57. void returnMessage() throws InterruptedException {
  58. //创建cd(callback)
  59. CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
  60. //添加ConfirmCallback
  61. cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
  62. @Override
  63. public void onFailure(Throwable ex) {
  64. log.error("回调消息失败", ex);
  65. }
  66. @Override
  67. public void onSuccess(CorrelationData.Confirm result) {
  68. log.debug("收到confirm back,开始发送消息");
  69. if (result.isAck()) {
  70. log.debug("消息发送成功, 收到ack");
  71. } else {
  72. log.debug("消息发送失败,收到nack,原因{}", result.getReason());
  73. }
  74. }
  75. });
  76. rabbitTemplate.convertAndSend("dahei.direct", "reds", "hello 大黑 回调情况如何", cd);
  77. Thread.sleep(2000);
  78. }
  79. //在临时消息发送达到内存最大时,会将一部分消息转存在磁盘中,然后会造成消息阻塞的情况
  80. @Test
  81. void pageOutTest() {
  82. Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
  83. .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();//创建一个非持久化的消息发送
  84. for (int i = 0; i < 1000000; i++) {
  85. rabbitTemplate.convertAndSend("simple.queue", message);
  86. }
  87. }
  88. }

 总结

  1. 基本概念

    • 消息(Message):传递的数据,包括消息头和消息体。
    • 队列(Queue):存储消息的缓冲区,直到它们被消费者消费。
    • 交换机(Exchange):接收生产者发送的消息,根据路由规则将消息路由到一个或多个队列。
    • 绑定(Binding):交换机和队列之间的关联,定义了消息如何从交换机路由到队列。
    • 路由键(Routing Key):用于消息路由的标识符。
  1. 高级特性

    • 死信交换机(Dead Letter Exchanges):处理无法路由的消息。
    • 优先级队列(Priority Queues):根据优先级对消息进行排序。
    • 延迟消息(Delayed Messages):设置消息的延迟时间。

        工作模式

                点对点(Point-to-Point):消息从生产者发送到单个消费者。

                发布/订阅(Publish/Subscribe):消息从生产者发送到多个消费者,通过交换机进行消息分发。

                路由(Routing):使用路由键将消息路由到不同的队列。

                主题(Topic):通过模式匹配路由键,实现更灵活的消息路由

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/902141
推荐阅读
相关标签
  

闽ICP备14008679号