赞
踩
项目中发布一个需求,当服务上线后通过不同各种通讯方式发送消息给用户,我们可以使用RabbitMQ来满足需求,写一个简单流程。
1.导入jar包,创建启动类,配置application文件
<dependencies> <!--微服务基础依赖--> <dependency> <groupId>com.zengjx</groupId> <artifactId>hrm-service-dependencies</artifactId> <version>${hrm.version}</version> </dependency> <!--导入rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
//创建一个启动类
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class,args);
}
}
server: port: 44000 spring: application: name: rabbitmq rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: / listener: simple: acknowledge-mode: manual #手动签收 prefetch: 1 publisher-confirms: true #消息发送到交换机失败回调 publisher-returns: true #消息发送到队列失败回调 template: mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
2.创建配置类,在该配置中创建交换机,创建队列,将队列绑定到交换机上,并设置发送和接收数据时的格式
/*在这个类中创建交换机 创建队列 绑定交换机 * */ @Configuration //声明该类为一个配置类 public class RabbitmqConfig { //以下配置RabbitMQ消息服务 @Autowired public ConnectionFactory connectionFactory; //创建邮箱的消息队列 public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; //创建电话的消息队列 public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //创建交换机 public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; //使用方法创建交换机 导入springboot的核心包 交给bean管理 @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ //使用方法创建对象 ExchangeBuilder.topicExchange创建交换机,参数为交换机的名字 //第二个为是否需要持久化 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //使用方法创建队列 创建的为邮箱队列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ Queue queue = new Queue(QUEUE_INFORM_EMAIL,true); return queue; } //使用方法创建队列 创建的为邮箱队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ Queue queue = new Queue(QUEUE_INFORM_SMS,true); return queue; } //绑定交换机 传入的参数为交换机和队列(都是对象的形式) 下面的参数第一个为绑定的队列第二个参数为交换机 第三个为routingKey 第四个为其他参数 @Bean public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("key").noargs(); } //绑定交换机 传入的参数为交换机和队列(都是对象的形式) 下面的参数第一个为绑定的队列第二个参数为交换机 第三个为routingKey 第四个为其他参数 @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("key").noargs(); } //发送的时候将消息序列化 通过连接工厂的对象创建一个rabbitmqTemplelate 然后设置该模板的格式为json格式 @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } //监听处理序列化 通过创建监听对象的工厂对象 给该工厂对象设置监听的队列并处理格式为json @Bean("rabbitListenerContainerFactory") public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setPrefetchCount(1); return factory; } }
3.创建生产者
//生产者的类 @SpringBootTest(classes = App.class) @RunWith(SpringRunner.class) public class Sender { //注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; //注入回调类 @Autowired private MQCallback mqCallback; //创建一个测试方法用来发送消息 //使用工具类发送消息到指定的交换机 并且携带routingkey 和消息 @Test public void test(){ //设置消息回调 //客户端到交换机消息投递回调 无论成功或者失败都会回调 rabbitTemplate.setConfirmCallback(mqCallback); //交换机到消息队列投递失败后的回调 rabbitTemplate.setReturnCallback(mqCallback); User user = new User(1L,"张飞"); //第一个参数为哪一个交换机 第二个参数为routingKey 第三个参数为消息 rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"key",user); System.out.println("发送成功"); } }
4.创建执行回调方法的类
@Component //交给spring管理 public class MQCallback implements ConfirmCallback,RabbitTemplate.ReturnCallback{ //消息投递到交换机 无论失败和成功都会调用 @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm回调执行"); System.out.println("correlationData:"+correlationData); System.out.println("消息是否投递到交换机:"+ack); System.out.println("cause:"+cause); } //消息投递到队列 只有失败的时候才会回调 @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("returnedMessage回调执行..."); System.out.println("message:"+message); System.out.println("错误码replyCode:"+replyCode); System.out.println("错误信息replyText:"+replyText); System.out.println("exchange:"+exchange); System.out.println("routingKey:"+routingKey); } }
5.创建消费者
//消费者的类 @Component //需要将该类交给spring管理 public class ReceiveHandler { //注入rabbitmq工具类 @Autowired RabbitTemplate rabbitTemplate; //使用注解监听指定的队列 监听email @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL},containerFactory = "rabbitListenerContainerFactory") public void receive_email(@Payload User user, Message message, Channel channel) throws IOException { //获取到deliveryTag long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("监听成功"); System.out.println("消息内容:"+user); //使用方法签收消息 channel.basicAck(deliveryTag , false); } //使用注解监听指定的队列 监听sms @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS} ,containerFactory = "rabbitListenerContainerFactory") public void receive_sms(@Payload User user, Message message, Channel channel) throws IOException { //获取到deliveryTag long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("监听成功"); System.out.println("消息内容:"+user); //使用方法签收消息 这里就是手动签收 channel.basicAck(deliveryTag , false); } }
6.交换机的三种类型
6.1Fanout:广播,将消息发送给所有绑定到交换机上的队列
6.2Direct:定向,把消息发送给符合指定routingKey的队列
6.3Topic:通配符,把消息交给符合routing pattern(路由模式)的队列 一堆或者一个
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。