当前位置:   article > 正文

SpringBoot集成RabbitMQ消息中间件实现_spring boot 集成 rabbitmq 消息中间件

spring boot 集成 rabbitmq 消息中间件

RabbitMQ简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

MQ能干嘛

应用解耦、异步、流量削锋、数据分发、错峰流控、日志收集等等…

RabbitMQ执行流程

生产者与消息代理Boker直接建立一连长连接的channal,然后发送消息给我们的交换机Exchange,交换机接收到生产者发送的消息,由消息中的routeKey入有间发送给指定队列,队列存储消息等待消费者获取
在这里插入图片描述

JAVA集成RabbitMQ

导入pom依赖

		<!--消息队列依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

YAML配置

spring:
  rabbitmq:
    host: 127.0.0.1 #rabbitmq地址
    port: 5672      #端口
    virtual-host: /
    username: guest
    password: guest
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

配置完成后在我们主启动类上添加注解@EnableRabbit 开启对rabbltMQ的支持

@EnableRabbit
  • 1

然后注入amqpAdmin来进行创建我们的交换机、队列、绑定等。

	@Autowired
    AmqpAdmin amqpAdmin;
  • 1
  • 2

Exchange交换机

交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

交换机类型:
在这里插入图片描述

  • direct:消息中的路由键(routing key)如果和Binding中的binding key一致,交换机就将消息发到对应的队列中。路由键与队列名完全匹配。(点对点模式)
  • fanout:每个发到fanout类型交换机的消息都会分到所有绑定的队列上去。fanout交换器不处理该路由键,只是简单的将队列绑定到交换机上,每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。
  • topic:topic交换机通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键(routing-key)和绑定键(bingding-key)的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:"#“和”*"。#匹配0个或多个单词,匹配不多不少一个单词。
  • header
    @Test
    void createExchange(){
        /**
         * 创建 direct类型交换机
         * public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
         *         super(name, durable, autoDelete, arguments);
         *     }
         *     name:交换机名
         *     durable:是否持久化
         *     autoDelete:是否自动删除
         *     arguments:参数列表
         */
        DirectExchange directExchange = new DirectExchange("oa-cloud-document-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        System.out.println("交换机创建完成");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

创建成功后可以查看我们的UI管理页面
在这里插入图片描述
QUEUE队列

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走

	@Test
    void createQueue(){
        /**
         * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) {
         *         super(arguments);
         *       name:队列名
         *       durable:是否持久化
         *       exclusive:是否排他(只能让一条声明的链接使用)
         *       autoDelete:是否自动删除
         */
        Queue queue = new Queue("oa-cloud-document-notice-read-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        System.out.println("队列创建成功");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

在这里插入图片描述

Binding绑定

用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表

    @Test
    void createBinding(){
        /**
         *  public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments) {
         *         super(arguments);
         *         destination:目的地
         *         destinationType:目的地类型
         *         exchange:交换机
         *         routingKey:路由键
         *  将交换机exchange与指定的目的地进行绑定,使用routingKey指定路由键
         */
        Binding binding = new Binding(
                "oa-cloud-document-notice-read-queue",
                Binding.DestinationType.QUEUE,
                "oa-cloud-document-exchange",
                "notice-read",
                null);
        amqpAdmin.declareBinding(binding);
        System.out.println("绑定创建成功");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

在这里插入图片描述
发送消息测试

接下来我们就可以给rabbitmq发送消息进行测试
使用rabbitTemplate来发送消息

	@Autowired
    RabbitTemplate rabbitTemplate;
	@Test
    void sendMessage(){
        NoticeParam msg = new NoticeParam(UUID.randomUUID().toString(),UUID.randomUUID().toString());
        /**
         * exchage:交换机,给这个交换机发送消息
         * routingKey:使用指定的路由键 路由到绑定的队列上
         * msg:Object 发送的消息
         *
         * 如果发送的消息是对象,则该对象必须实现序列化接口
         * 发送消息会将该对象进行序列化后发送
         *
         * 消息转换默认使用java的序列换,我们可以转换成我们自己的消息转换器
         * 使用
         */
        rabbitTemplate.convertAndSend("oa-cloud-document-exchange","notice-read",msg);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

**查看我们发送的消息
在这里插入图片描述

如果发送的消息是对象,则必须让该对象实现Serializable接口

可以看到对象已经被序列化了,但是并不是我们想要的JSON格式

查看源码消息转换默认进行序列化,我们可以转换成我们自己的消息转换器
查看源码可以看到如果我们不设置消息转换器将默认使用他自己的
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

我们可以使用JSON格式的消息转换器
在这里插入图片描述
再次发送消息测试,可以看到我们发送的对象已经转换成对象了
在这里插入图片描述
监听消息队列

使用@RabbitListener注解

 	/**
     * @Description: 监听消息 queues 就是我们需要监听的队列
     * @Author: Huang
     * @Date: 2021/1/10 21:24
     * @Param: [message]
     * @return: void
    */
    @RabbitListener(queues = {"oa-cloud-document-notice-read-queue"})
    public void linserQueue(Object message){
        System.out.println(message);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

查看控制台可以看到我们发送的消息
其中包含消息头和消息体
在这里插入图片描述
这样拿到的消息就是个Message类型的数据,我们也可以直接使用对象来进行接收

/**
     * @Description: 监听消息 queues 就是我们需要监听的队列
     * @Author: Huang
     * @Date: 2021/1/10 21:24
     * @Param: [message]
     * @return: void
    */
    @RabbitListener(queues = {"oa-cloud-document-notice-read-queue"})
    public void linserQueue(Message message, NoticeParam content){
        System.out.println(content);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

在这里插入图片描述
这样我们就直接取到对象

还可以使用@RabbitHandler + @RabbitListener来实现多类型的方法重载

@RefreshScope
@Slf4j
@RabbitListener(queues = {"oa-cloud-document-notice-read-queue"})
@Service
public class QyWechatDocumentInfoServiceImpl{
	/**
     * @Description: 监听消息 queues 就是我们需要监听的队列
     * @Author: Huang
     * @Date: 2021/1/10 21:24
     * @Param: [message]
     * @return: void
    */
    @RabbitHandler
    public void linserQueue(Message message, NoticeParam content){
        System.out.println(content);
    }
    @RabbitHandler
    public void linserQueue2(Message message, QueryIdParam content){
        System.out.println(content);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

实际业务处理

现在有一个业务场景,当我们阅读文章时,要去添加这个文章的访问量,按照我们传统的做法是在查询时再去调用修改数据库增加这篇文章的访问量,当并发量一上来,这这修改的操作就会浪费很多资源

使用消息队列来优化传统的解决方法,当我们查询这个文章时,给我们的消息队列发送一个消息,让我们直接返回查询到的文章内容,异步的添加文章的访问量

发送消息

 /**
     * @Description: 获取通知详情
     * @Author: Huang
     * @Date: 2021/1/9 16:25
     * @Param: [httpServletRequest, queryIdParam]
     * @return: com.lgzyy.oacloudcommon.model.response.base.ResponseData
     */
    @Override
    public ResponseData getNoticeInfo(HttpServletRequest request, QueryIdParam queryIdParam) {
        if (StringUtils.isBlank(queryIdParam.getId())){
            return new ResponseData(ExceptionEnum.SYS_PARAM_INVALID.getCode(),ExceptionEnum.SYS_PARAM_INVALID.getMessage());
        }
        Long readCount = 0L;
        boolean agree = false;
        Long agreeCount = 0L;
        //获取用户信息
        String adminToken = request.getHeader(AuthenticationConstant.QYWECHAT_AUTHORIZATION_TOKEN);
        OaCloudSysUser oaCloudSysUser = GsonUtil.fromJson(redisUtil.get(AuthenticationConstant.QYWECHAT_AUTHORIZATION_TOKEN_PREFIX + adminToken).toString(), OaCloudSysUser.class);
        OaCloudDocumentInfo oaCloudDocumentInfo = oaCloudDocumentInfoMapper.selectById(queryIdParam.getId());
        if (Objects.isNull(oaCloudDocumentInfo)){
            return new ResponseData(ExceptionEnum.SELECT_FAIL.getCode(),ExceptionEnum.SELECT_FAIL.getMessage());
        }
        //获取阅读数
        readCount = redisUtil.getHashSize("notice-read-" + queryIdParam.getId());
        //获取通知点赞数
        agreeCount = redisUtil.getHashSize("notice-agree-" + queryIdParam.getId());

        if (StringUtils.isNotBlank(oaCloudSysUser.getId())){
            /**
			 * 给消息队列发送消息
			 * RabbitEnum.NOTICE_READ.getExchange()		//指定交换机
			 * RabbitEnum.NOTICE_READ.getRoutingKey()	//指定路由键,可以找到与交换机绑定的队列
			 * new NoticeParam(queryIdParam.getId(),oaCloudSysUser.getId())
			 */
			rabbitTemplate.convertAndSend(RabbitEnum.NOTICE_READ.getExchange(),RabbitEnum.NOTICE_READ.getRoutingKey(),new NoticeParam(queryIdParam.getId(),oaCloudSysUser.getId()));
            //获取当前用户是否点赞
            agree = redisUtil.checkHashKey("notice-agree-" + queryIdParam.getId(), oaCloudSysUser.getId());
        }

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("noticeInfo",oaCloudDocumentInfo);
        jsonObject.put("readCount",readCount);
        jsonObject.put("agree",agree);
        jsonObject.put("agreeCount",agreeCount);
        return new ResponseData(ExceptionEnum.SELECT_SUCCESS.getCode(),ExceptionEnum.SELECT_SUCCESS.getMessage(),jsonObject);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

接收消息

接收消息然后执行访问量添加
  • 1
	@RabbitHandler
    public void rabbitInsertNoticeRead(Message message,NoticeParam content){
        log.info("=============================MQ接收消息=============================");
        log.info("使用的交换机:{}",message.getMessageProperties().getReceivedExchange());
        log.info("使用的路由键:{}",message.getMessageProperties().getReceivedRoutingKey());
        log.info("接收的队列:{}",message.getMessageProperties().getConsumerQueue());
        log.info("消息内容:{}",content);
        log.info("===================================================================");
        //添加当前用户阅读记录
        redisUtil.setHash("notice-read-"+content.getDocumentId(),content.getUserId());
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

运行效果
在这里插入图片描述
查看我们数据是否插入成功
在这里插入图片描述
可以看到我们的消息已成功接收并执行了业务

消息确认机制

在这里插入图片描述
消息确认机制为防止我们的消息发送失败,保障消息可靠抵达。

  • 发送端确认回调
  • 开启发送端消息确认
	spring:
	  rabbitmq:
	    host: 127.0.0.1
	    port: 5672
	    virtual-host: /
	    username: admin
	    password: admin
	    #开启发送端确认
	    publisher-confirm-type: correlated
	    publisher-confirms: true
	    #开启发送端消息抵达队列确认
	    publisher-returns: true
	    #只要消息抵达队列,以异步发送优先回调publisher-returns
	    template:
	      mandatory: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

修改我们的RabbitConfig,给rabbitTemplate设置确认回调

/**
 * @ClassName: RabbitConfig
 * @Author: Huang
 * @Date: 2021/1/10 21:16
 */
@Slf4j
@Configuration
public class RabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }


    /**
     * 设置发送端确认回调
     */
    @PostConstruct //RabbitConfig对象创建完成后调用该方法
    public void initRabbitTemplate(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             *  只要消息抵达broker就触发
             * @param correlationData 消息的唯一关联数据(消息的ID)
             * @param ack 消息是否成功收到
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("==================================================================");
                log.info("消息关联数据:{}",correlationData);
                log.info("消息是否成功:{}",ack);
                log.info("消息失败原因:{}",cause);
                log.info("==================================================================");
            }
        });

        /**
         * 设置消息抵达队列的确认回调
         */
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息没有抵达队列就触发,类似于失败回调
             * @param message   投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange  消息发送给的交换机
             * @param routingKey 消息发送使用的路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("==================================================================");
                log.info("-------------------------消息发送到队列失败--------------------------");
                log.info("投递失败的消息:{}",message);
                log.info("回复的状态码:{}",replyCode);
                log.info("回复的内容:{}",replyText);
                log.info("使用的交换机:{}",exchange);
                log.info("使用的路由键:{}",routingKey);
                log.info("==================================================================");
             }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

在发送一条消息测试

在这里插入图片描述
可以看到我们的发送端的确认消息,已成功发给消息代理。只要发送了消息,都会进行我们的确认回调。

  • 消息正确抵达队列
    如果消息代理发送消息给队列失败,将会调用ReturnCallback回调

-消费端确认消息

默认是自动确认,只要消息接收到,消费端就会自动确认,服务端都会把这个消息删除

注意:如果发送多条消息,但消费端还没处理完所有消息,如果消费端宕机,那将会造成 消息丢失 也是就消息没有全部消费,但是默认确认,这样服务端就会移除所有消息

可以开启消费端手动确认默认

spring:
  rabbitmq:    
    #开启手动消息签收
    listener:
      simple:
        acknowledge-mode: manual
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

手动消息确认,只要我们没有确认消息。消息将一直处于unAcked状态,及时我们的消费端宕机了,消息也会一直保存,状态重新变为ready状态
在这里插入图片描述

在这里插入图片描述
可以看到虽然我们的消息发送成功,并成功被消费,但是我们没有确认,这个消息就是一直处于unacked状态,这时候我们再来关闭消费端,可以看到消息变成了ready状态
在这里插入图片描述
手动确认代码实现

	@RabbitHandler
    public void rabbitInsertNoticeRead(Message message, NoticeParam content, Channel channel){
        log.info("=============================MQ接收消息=============================");
        log.info("使用的交换机:{}",message.getMessageProperties().getReceivedExchange());
        log.info("使用的路由键:{}",message.getMessageProperties().getReceivedRoutingKey());
        log.info("接收的队列:{}",message.getMessageProperties().getConsumerQueue());
        log.info("消息内容:{}",content);
        log.info("===================================================================");
        //添加当前用户阅读记录
        redisUtil.setHash("notice-read-"+content.getDocumentId(),content.getUserId());
        try{
            /**
             * 消息确认:
             *  deliveryTag  channel内按顺序自增的一个消息编号
             *  b:是否批量确认
             */
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag,false);
        }catch (Exception e){
            e.printStackTrace();
        }

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

修改完成之后,当我们重启消费端,可以看到刚才的消息已被获取,而且mQ控制页面的消息也被成功确认并移除
在这里插入图片描述

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/971700
推荐阅读
相关标签
  

闽ICP备14008679号