赞
踩
使用步骤
一、安装abbitmq服务,具体安装教程网上自行寻找
二、在配置文件中填下面的配置信息
spring:
rabbitmq:
#abbitmq安装IP
host: localhost
#abbitmq安装端口
port: xxxx
#abbitmq登陆名
username: xxxx
#abbitmq登陆密码
password: 'xxxxxxxxx'
publisher-confirms: true
virtual-host: /
三、创建队列常量类
package com.xxxx.util;
public class Constant {
/*************** 队列常量 **************************/
public static final String XXXX-XXXX-XXXX = "AAAA-AAAA-AAAA";
public static final String YYYY-YYYY-YYYY= "BBBB-BBBB-BBBB";
}
在abbitmq控制中心显示的队列名为 “AAAA-AAAA-AAAA”,而不是 XXXX-XXXX-XXXX
四、创建一个abbitmq的配置,让Spring Boot加载,要使用@Configuration表示这是个配置类
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import com.xxxx.util.Constant; @Configuration public class RabbitConfig { //每一个队列都需要创建一个@Bean @Bean public Queue loginLogQueue() { return new Queue(Constant.XXXX-XXXX-XXXX); } @Bean public Queue opLoginQueue() { return new Queue(Constant.YYYY-YYYY-YYYY); } public MappingJackson2MessageConverter jackson2Converter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); return converter; } @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); // 开启手动 ack factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } }
五、开始使用队列
创建一个Service接口
public interface Model Service {
//model是一个实体参数
public void record(Model model);
}
实现类
@Service public class ModelServiceImpl implements ModelService { @Autowired private AmqpTemplate amqpTemplate; //@Async(value="asyncServiceExecutor")配合上一篇文章,可以使用线程池 @Override public void record(Model model ) { //这个步骤可以没有,在这里我有是因为在下面一步中参数一直报错,所以强转为json String json=JSONObject.toJSON(model ).toString(); //将消息放入队列中等待消费 amqpTemplate.convertAndSend(Constant.XXXX-XXXX-XXXX, json); // amqpTemplate.convertAndSend(Constant.XXXX-XXXX-XXXX, model ); } }
将消息放入队列中等待消费,在这一步程序只把消息放入队列中就返回,消费队列将异步进行
接下来就是在Listener里消费队列里的消息
@Component public class ModelListener { @Autowired private ModelService modelService; @RabbitHandler @RabbitListener(queues = Constant.XXXX-XXXX-XXXX) public void process(Message msg, Channel channel) { ObjectMapper objectMapper = new ObjectMapper(); Model model;//参数实体类 try { //在这里通过msg获取到传进来的参数,通过队列传进来后参数转换为String类型 String log = new String(msg.getBody()); //将log转换为实体类Model model= objectMapper.readValue(log, Model.class); //....在这里用model进行自己的逻辑 } catch (JsonParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JsonMappingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { // 确认消息 channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。