赞
踩
1、pom.xml添加如下依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
2、整合rabbitMQ
- 1、首先建MQConn实体类
- /**
- * rabbitMQ的实体类ip、端口号、用户名、密码
- *
- **/
- public class MQConn {
-
- private String host;
- private int port;
- private String username;
- private String password;
- private long connectionTimeout;
- private CachingConnectionFactory.CacheMode cacheMode;
- private int channelCacheSize;
- private long channelCheckoutTimeout;
-
- 省略getter、setter
- }
-
- 2、建立配置文件rabbitmq.properties
- file.rabbitmq.host=192.168.203.136
- file.rabbitmq.port=5672
- file.rabbitmq.username=chai
- file.rabbitmq.password=123456
- file.rabbitmq.connectionTimeout=3000
-
- 3、注入参数RabbitConfig
- @Configuration
- @PropertySource("classpath:rabbitmq.properties")
- public class RabbitMQConfig {
-
- // 注入到实体MQConn
- @Bean("mqConn")
- @ConfigurationProperties(prefix = "file.rabbitmq")
- public MQConn fileWorkRabbitFactory(){
- MQConn mqConn=new MQConn();
- return mqConn;
- }
-
- @Bean("cachingConnectionFactory")
- public CachingConnectionFactory connectionFactory(
- @Qualifier("mqConn") MQConn mqConnFile
- ){
- CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
- cachingConnectionFactory.setHost(mqConnFile.getHost());
- cachingConnectionFactory.setPort(mqConnFile.getPort());
- cachingConnectionFactory.setUsername(mqConnFile.getUsername());
- cachingConnectionFactory.setPassword(mqConnFile.getPassword());
- cachingConnectionFactory.setVirtualHost("/");
- cachingConnectionFactory.setPublisherConfirms(true);
- if(mqConnFile.getCacheMode()!=null){
- cachingConnectionFactory.setCacheMode(mqConnFile.getCacheMode());
- }
- if(mqConnFile.getChannelCacheSize()>0){
- cachingConnectionFactory.setChannelCacheSize(mqConnFile.getChannelCacheSize());
- }
- if(mqConnFile.getChannelCheckoutTimeout()>0){
- cachingConnectionFactory.setChannelCheckoutTimeout(mqConnFile.getChannelCheckoutTimeout());
- }
- return cachingConnectionFactory;
- }
-
- @Bean
- @Scope("prototype")
- public PackMsgReceiver packMsgReceiver(){
- return new PackMsgReceiver();
- }
-
- /**
- * 创建监听器,监听队列
- * @param packMsgReceiver 监听方法
- * @return 监听器
- */
- @Bean
- public SimpleMessageListenerContainer messageListenerContainer(PackMsgReceiver packMsgReceiver, @Qualifier("cachingConnectionFactory") CachingConnectionFactory cachingConnectionFactory){
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
- // 监听队列的名称
- container.setQueueNames("hi");
- container.setExposeListenerChannel(true);
- // 设置每个消费者获取的最大消息数量
- container.setPrefetchCount(100);
- // 消费者的个数
- container.setConcurrentConsumers(1);
- // 设置确认模式为手工确认
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- container.setMessageListener(packMsgReceiver);
- return container;
- }
- }
-
- 4、接收消息处理
- @Component
- public class PackMsgReceiver implements ChannelAwareMessageListener {
- private static final Logger logger = LoggerFactory.getLogger(PackMsgReceiver.class);
-
- /**
- * @param
- * 1、处理成功,这种时候用basicAck确认消息;
- * 2、可重试的处理失败,这时候用basicNack将消息重新入列;
- * 3、不可重试的处理失败,这时候使用basicNack将消息丢弃。
- *
- * basicNack(long deliveryTag, boolean multiple, boolean requeue)
- * deliveryTag:该消息的index
- * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
- * requeue:被拒绝的是否重新入队列
- */
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- byte[] body = message.getBody();
- logger.info("接收到消息:" + new String(body));
- JSONObject jsonObject = null;
- try {
- jsonObject = JSONObject.parseObject(new String(body));
- if (消费成功) {
- logger.info("消息消费成功");
-
- channel.basicAck(message.getMessagePropertites().getDeliveryTag(),false);//确认消息消费成功
-
- }else if(可重试的失败处理){
-
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
-
- } else { //消费失败
-
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
- } catch (JSONException e) {
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//消息丢弃
- logger.error("This message:" + jsonObject + " conversion JSON error ");
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。