当前位置:   article > 正文

项目实践-Spring Boot实现RabbitMQ消息监听_rabbitmq 消费者监听实现类

rabbitmq 消费者监听实现类

项目实践-Spring Boot实现RabbitMQ消息监听

前言

**书山有路勤为径,学海无涯苦作舟**
记录程序员生活点点滴滴,希望记录的内容能帮助到努力爬山的各位伙伴!

标签:Rabbit MQ/消息监听
  • 1
  • 2
  • 3
  • 4

一、RabbitMQ概念

  • RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),是应用层协议的一个开放标准。
  • RabbitMQ服务器是用Erlang语言编写的,是企业级消息代理软件。
  • RabbitMQ的应用场景是异步处理/应用解耦/流量削峰。
  • RabbitMQ客户端分为两种角色生产者(Producer )/消费者(Consumer )
  • AMQP四个概念:虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)

二、SpringBoot集成Rabbit MQ

1.引入Maven依赖

	<dependency>
     	<groupId>org.springframework.boot</groupId>
      	<artifactId>spring-boot-starter-amqp</artifactId>
  	</dependency>
  • 1
  • 2
  • 3
  • 4

2.Rabbit MQ基本配置

mq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualhost: /
    exchange: sync.target.queue.direct
    queue.fromGw: sync.target.queue
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

备注:

  • 在我的项目中Rabbit MQ配置信息放在bootstrap.yml文件中
  • queue.fromGw: sync.target.queue是本次监听的目标队列

三、实现消息监听

1.创建RabbitMQ配置类

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

/**
 * @author Miracle
 * @title: RabbitConfig
 * @projectName proxy
 * @description: 【MQ配置类】
 * @date 2021/6/719:07
 */
@Configuration
public class RabbitConfig {
    @Value("${spring.mq.queue.fromGw}")
    public String queue_fromGw;

    @Bean
    public Queue fromGw() {
        return new Queue(queue_fromGw);
    }
    
    @Bean
    ConnectionFactory connectionFactory(@Value("${spring.mq.port}") int port,
                                        @Value("${spring.mq.host}") String host,
                                        @Value("${spring.mq.username}") String userName,
                                        @Value("${spring.mq.password}") String password,
                                        @Value("${spring.mq.virtualhost}") String vhost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setVirtualHost(vhost);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    public SimpleMessageListenerContainer modbusMessageContainer(WaMingQueueListener receiver, ConnectionFactory connectionFactory) throws AmqpException, IOException {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(queue_fromGw);
        container.setExposeListenerChannel(true);
		//container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式为手工确认
        container.setMessageListener(receiver);//监听处理类
        return container;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

2.创建监听实现类

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;

/**
 * @author Miracle
 * @title: WaMingQueueListener
 * @projectName Proxy
 * @description: 【监听MQ】
 * @date 2021/6/719:10
 */
@Component
@Slf4j
public class WaMingQueueListener  implements ChannelAwareMessageListener {

    @Resource
    OperateMenuService OperateMenuService;

    /**
     * 监听MQ
     * @param message
     * @param channel
     * @throws Exception
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        byte[] body = message.getBody();
        log.info("Listener RabbitMQ Msg :" + new String(body));
        // 实际业务处理
        ......
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

结言

*登山路上的慕码人,理解不透的地方还请各位指点!*
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/618621
推荐阅读
相关标签
  

闽ICP备14008679号