当前位置:   article > 正文

SpringBoot 集成 RabbitMQ(上)_springboot集成mq

springboot集成mq

接下来使用 SpringBoot 来集成 RabbitMQ

1. SpringBoot 来集成 RabbitMQ 入门

环境配置

JDK 1.8
IDEA 2019.1
SpringBoot 2.5.1
RabbitMQ 3.7.8
Erlang 21.1

工程结构图:
在这里插入图片描述

第一步》、新建一个 SpringBoot 工程,添加 POM 依赖:

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

第二步》、添加 application.yml 文件配置

server:
  port: 8080
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

上面两步执行完后,就得创建生产者和消费者了。

那么是先创建生产者,还是消费者呢?

如果先创建好生产者,然后向队列发送消息,如果该队列提前不存在的话,消息会发送失败;如果先创建消费者,让消费者提前创建好队列后,监听队列,再创建生产者,向队列中发送消息。

第三步》、创建消费者

在消费端声明队列

@Component
@RabbitListener(queuesToDeclare = @Queue("simple_queue"))  //如果simple_queue队列不存在,则创建simple_queue队列。默认队列是持久化,非独占式的
public class SimpleConsumer {

    // //消费者如果监听到消息队列有消息传入,则会自动消费
    @RabbitHandler
    public void receive(String message) {
        System.out.println("消费者接收到的消息是:" + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

@RabbitListener 注解的属性的作用:

  • queuesToDeclare:如果 simple_queue 队列不存在,则会自动创建simple_queue队列。默认队列是持久化,非独占式的
  • queues:里面的队列必须存在,否则就会报错。如:@RabbitListener(queues = {"simple_queue2"}),如果队列 simple_queue2 不存在,那么启动消费者就会报错

第四步》、创建生产者

这里使用单元测试的方式创建,简单、方便

@SpringBootTest
@RunWith(SpringRunner.class)
public class AppTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMsg() {
        String queueName = "simple_queue";
        String message = "Hello RabbitMQ SimpleQueue";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

第五步》、启动单元测试方法

执行 sendMsg() 方法,它会启动整个 SpringBoot 项目,将消费者 SimpleConsumer 注入 Spring 容器中,监听队列 simple_queue(如果没有此队列,则会创建它)。然后,生产者就会发送消息,被消费者消费。

2. 新建一个生产者工程

入门案例是将生产者放入了一个单元测试方法中,一启动单元测试方法,就会发送消息,然后立马被消费者消息了。这样,在 RabbitMQ 中看不到消息。

接下来将生产者放入一个单独的工程中,将它和消费者进行分离。

生产者工程图:
在这里插入图片描述

第一步》、新建一个 SpringBoot 工程,添加 POM 依赖

同上

第二步》、添加 application.yml 配置

同上

【注意】:如果两个工程同时启动,则需要修改 server.port 号,这个值不能重复

第三步》、创建生产者

@Component
public class SimpleProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg() {
        for (int i = 0; i < 5; i++) {
            String msg = "简单模式下的消息 " + i;
            rabbitTemplate.convertAndSend(RabbitMqConstant.SIMPLE_QUEUE_NAME, msg);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

此生产者发送了 5 条消息

常量类:

public interface RabbitMqConstant {
	// 队列名称
    String SIMPLE_QUEUE_NAME = "simple_queue";
}
  • 1
  • 2
  • 3
  • 4

接口调用:

@RestController
@RequestMapping("/simple")
public class SimpleController {

    @Autowired
    private SimpleProducer simpleProducer;

    @RequestMapping("/sendMsg")
    public String sendMsg() {
        simpleProducer.sendMsg();
        return "SIMPLE-QUEUE";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

调用此接口后,会向 MQ 发送 5 条消息,在 MQ 中查看如下:
在这里插入图片描述
启动消费者后,会消费消息

注意:消费者和生产者一定要消费相同的队列

3. 在生产者端新建队列

当我们在 MQ 中删除这个 simple_queue 队列后,再次通过生产者发送消息,发现:虽然队列已不存在,但往这个队列中发送消息时也没有报错。

这就有一个问题:当你发送一条消息时,不管这个队列是否存在,这条消息都不会报错,那我们就会默认它发送成功,如果,这个队列不存在,那么就会存在误判的问题了。

所以,为了避免这种情况,我们在启动生产者时,就去声明一个队列:如果它不存在就创建;存在,就不进行处理。

那么,又该如何做呢?

很简单,在生产者端中添加一个配置类即可:

@Configuration
public class SimpleQueueConfig {

    // 创建队列
    @Bean
    public Queue simpleQueue() {
        return new Queue(RabbitMqConstant.SIMPLE_QUEUE_NAME);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

启动生产者端,调用接口,如果 simple_queue 队列不存在,那么就会创建了。

4. 使用 @RabbitListener 注解消费消息

注解 @RabbitListener 既可以标记在类上,也可以标记在方法上:

  • 标记在类上:需配合 @RabbitHandler 注解一起使用。当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
  • 标记在方法上:就由指定的方法进行处理

4.1 标记在类上

看看生产者发送消息的代码:

public void sendMsg() {
    for (int i = 0; i < 5; i++) {
        String msg = "简单模式下的消息 " + i;
        rabbitTemplate.convertAndSend(RabbitMqConstant.SIMPLE_QUEUE_NAME, msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

往此队列中发送的是 String 类型的数据

看看消费者接收消息的代码:

@RabbitHandler
public void receive(String message) {
    System.out.println("消费者接收到的消息是:" + message);
}
  • 1
  • 2
  • 3
  • 4

方法的入参也是 String 类型的。它们类型相匹配,所以,可以消费消息。

现在,我们修改下生产者发送消息的数据类型:

public void sendMsg() {
    Map<String, Object> map = new HashMap<>();
    for (int i = 0; i < 5; i++) {
        String msg = "简单模式下的消息 " + i;
        map.put(String.valueOf(i), msg);
        rabbitTemplate.convertAndSend(RabbitMqConstant.SIMPLE_QUEUE_NAME, map);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

从上面可以看出,我们往队列中发送消息的数据类型为 HashMap。

然后,我们使用消费者接收消息,发现会报错:
在这里插入图片描述
数据类型不匹配。

然后,我们再往消费者中添加如下代码:

@RabbitHandler
public void receive(Map<String, Object> message) {
    System.out.println("消费者接收到的消息是:" + message);
}
  • 1
  • 2
  • 3
  • 4

这样,就消费消息成功了。

4.2 标记在方法上

生产者代码不变,消费者代码修改如下:

@Component
public class SimpleConsumer2 {

    @RabbitListener(queues = {"simple_queue"})
    public void receive(String message) {
        System.out.println("消费者接收到的消息是:" + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

只添加 @RabbitListener 注解。注意:simple_queue 队列必须存在。

5. 手动确认消息

修改 application.yml 配置:

server:
  port: 8080
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual #手动应答
        concurrency: 1 # 最小消费者数量
        max-concurrency: 1 # 最多消费者数量
        retry:
          enabled: true # 是否支持重试
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

消费者代码修改如下:

@Component
public class SimpleConsumer2 {

    @RabbitListener(queues = {"simple_queue"})
    public void receive(Message message, Channel channel) throws Exception{
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        System.out.println("我是消费信息:"+new String(message.getBody()));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/405708
推荐阅读
相关标签
  

闽ICP备14008679号