赞
踩
**书山有路勤为径,学海无涯苦作舟**
记录程序员生活点点滴滴,希望记录的内容能帮助到努力爬山的各位伙伴!
标签:Rabbit MQ/消息监听
1.引入Maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.Rabbit MQ基本配置
mq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualhost: /
exchange: sync.target.queue.direct
queue.fromGw: sync.target.queue
备注:
1.创建RabbitMQ配置类
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; /** * @author Miracle * @title: RabbitConfig * @projectName proxy * @description: 【MQ配置类】 * @date 2021/6/719:07 */ @Configuration public class RabbitConfig { @Value("${spring.mq.queue.fromGw}") public String queue_fromGw; @Bean public Queue fromGw() { return new Queue(queue_fromGw); } @Bean ConnectionFactory connectionFactory(@Value("${spring.mq.port}") int port, @Value("${spring.mq.host}") String host, @Value("${spring.mq.username}") String userName, @Value("${spring.mq.password}") String password, @Value("${spring.mq.virtualhost}") String vhost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setVirtualHost(vhost); connectionFactory.setPort(port); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); return connectionFactory; } @Bean public SimpleMessageListenerContainer modbusMessageContainer(WaMingQueueListener receiver, ConnectionFactory connectionFactory) throws AmqpException, IOException { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames(queue_fromGw); container.setExposeListenerChannel(true); //container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式为手工确认 container.setMessageListener(receiver);//监听处理类 return container; } }
2.创建监听实现类
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; /** * @author Miracle * @title: WaMingQueueListener * @projectName Proxy * @description: 【监听MQ】 * @date 2021/6/719:10 */ @Component @Slf4j public class WaMingQueueListener implements ChannelAwareMessageListener { @Resource OperateMenuService OperateMenuService; /** * 监听MQ * @param message * @param channel * @throws Exception */ @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); log.info("Listener RabbitMQ Msg :" + new String(body)); // 实际业务处理 ...... } }
*登山路上的慕码人,理解不透的地方还请各位指点!*
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。