当前位置:   article > 正文

spring colud 中使用abbitmq消息队列步骤_abbitmq教程(安装与使用详解,spring集成)

abbitmq教程(安装与使用详解,spring集成)

使用步骤
一、安装abbitmq服务,具体安装教程网上自行寻找
二、在配置文件中填下面的配置信息

 spring:
  rabbitmq:
   #abbitmq安装IP
    host: localhost
    #abbitmq安装端口
    port: xxxx
    #abbitmq登陆名
    username: xxxx
    #abbitmq登陆密码
    password: 'xxxxxxxxx'
    publisher-confirms: true
    virtual-host: /
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

三、创建队列常量类
package com.xxxx.util;

public class Constant {
    /*************** 队列常量 **************************/
    public static final String XXXX-XXXX-XXXX = "AAAA-AAAA-AAAA";
    public static final String YYYY-YYYY-YYYY= "BBBB-BBBB-BBBB";
}
  • 1
  • 2
  • 3
  • 4
  • 5

在abbitmq控制中心显示的队列名为 “AAAA-AAAA-AAAA”,而不是 XXXX-XXXX-XXXX

四、创建一个abbitmq的配置,让Spring Boot加载,要使用@Configuration表示这是个配置类

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;

import com.xxxx.util.Constant;
@Configuration
public class RabbitConfig {
    //每一个队列都需要创建一个@Bean
	@Bean
	public Queue loginLogQueue() {
		return new Queue(Constant.XXXX-XXXX-XXXX);
	}
	
	@Bean
	public Queue opLoginQueue() {
		return new Queue(Constant.YYYY-YYYY-YYYY);
	}

	public MappingJackson2MessageConverter jackson2Converter() {
		MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
		return converter;
	}

	@Bean
	public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);
		factory.setMessageConverter(new Jackson2JsonMessageConverter());
		// 开启手动 ack
		factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		return factory;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

五、开始使用队列
创建一个Service接口

public interface Model Service {
   //model是一个实体参数
	public void record(Model model);
}

  • 1
  • 2
  • 3
  • 4
  • 5

实现类

@Service
public class ModelServiceImpl implements ModelService {

    @Autowired
    private AmqpTemplate amqpTemplate;


    //@Async(value="asyncServiceExecutor")配合上一篇文章,可以使用线程池
    @Override
    public void record(Model model ) {
        //这个步骤可以没有,在这里我有是因为在下面一步中参数一直报错,所以强转为json
		String json=JSONObject.toJSON(model ).toString();
		//将消息放入队列中等待消费
        amqpTemplate.convertAndSend(Constant.XXXX-XXXX-XXXX, json);
       // amqpTemplate.convertAndSend(Constant.XXXX-XXXX-XXXX, model );
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

将消息放入队列中等待消费,在这一步程序只把消息放入队列中就返回,消费队列将异步进行

接下来就是在Listener里消费队列里的消息

@Component
public class ModelListener {
	@Autowired
	private ModelService modelService;

	@RabbitHandler
	@RabbitListener(queues = Constant.XXXX-XXXX-XXXX)
	public void process(Message msg, Channel channel) {
		ObjectMapper objectMapper = new ObjectMapper();
		Model model;//参数实体类
		try {
		   //在这里通过msg获取到传进来的参数,通过队列传进来后参数转换为String类型
			String log = new String(msg.getBody());
			//将log转换为实体类Model 
			model= objectMapper.readValue(log, Model.class);
			//....在这里用model进行自己的逻辑
		} catch (JsonParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JsonMappingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}		
			try {
				// 确认消息
				channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

这篇消息队列文章可以和上篇线程池文章配合使用

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/808772
推荐阅读
相关标签
  

闽ICP备14008679号