当前位置:   article > 正文

SpringCloud——使用RabbitMQ发送消息给用户_rabbitmq实现动态发送指定人

rabbitmq实现动态发送指定人

项目中发布一个需求,当服务上线后通过不同各种通讯方式发送消息给用户,我们可以使用RabbitMQ来满足需求,写一个简单流程。

1.导入jar包,创建启动类,配置application文件

 <dependencies>

        <!--微服务基础依赖-->
        <dependency>
            <groupId>com.zengjx</groupId>
            <artifactId>hrm-service-dependencies</artifactId>
            <version>${hrm.version}</version>
        </dependency>

        <!--导入rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
//创建一个启动类
@SpringBootApplication
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class,args);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
server:
  port: 44000
spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualHost: /
    listener:
      simple:
        acknowledge-mode: manual #手动签收
        prefetch: 1
    publisher-confirms: true #消息发送到交换机失败回调
    publisher-returns: true  #消息发送到队列失败回调
    template:
     mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

2.创建配置类,在该配置中创建交换机,创建队列,将队列绑定到交换机上,并设置发送和接收数据时的格式

/*在这个类中创建交换机 创建队列 绑定交换机
* */
@Configuration    //声明该类为一个配置类
public class RabbitmqConfig {

    //以下配置RabbitMQ消息服务
    @Autowired
    public ConnectionFactory connectionFactory;

    //创建邮箱的消息队列
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    //创建电话的消息队列
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    //创建交换机
    public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

    //使用方法创建交换机   导入springboot的核心包   交给bean管理
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM(){
        //使用方法创建对象   ExchangeBuilder.topicExchange创建交换机,参数为交换机的名字
        //第二个为是否需要持久化
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    //使用方法创建队列   创建的为邮箱队列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        Queue queue = new Queue(QUEUE_INFORM_EMAIL,true);
        return queue;
    }

    //使用方法创建队列   创建的为邮箱队列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        Queue queue = new Queue(QUEUE_INFORM_SMS,true);
        return queue;
    }

    //绑定交换机  传入的参数为交换机和队列(都是对象的形式)    下面的参数第一个为绑定的队列第二个参数为交换机 第三个为routingKey 第四个为其他参数
    @Bean
    public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                            @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("key").noargs();
    }

    //绑定交换机  传入的参数为交换机和队列(都是对象的形式)    下面的参数第一个为绑定的队列第二个参数为交换机 第三个为routingKey 第四个为其他参数
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                            @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("key").noargs();
    }

    //发送的时候将消息序列化  通过连接工厂的对象创建一个rabbitmqTemplelate 然后设置该模板的格式为json格式
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    //监听处理序列化  通过创建监听对象的工厂对象  给该工厂对象设置监听的队列并处理格式为json
    @Bean("rabbitListenerContainerFactory")
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(1);
        return factory;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

3.创建生产者

//生产者的类
@SpringBootTest(classes = App.class)
@RunWith(SpringRunner.class)
public class Sender {

    //注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //注入回调类
    @Autowired
    private MQCallback mqCallback;

    //创建一个测试方法用来发送消息
    //使用工具类发送消息到指定的交换机  并且携带routingkey  和消息
    @Test
    public void test(){

        //设置消息回调
        //客户端到交换机消息投递回调  无论成功或者失败都会回调
        rabbitTemplate.setConfirmCallback(mqCallback);

        //交换机到消息队列投递失败后的回调
        rabbitTemplate.setReturnCallback(mqCallback);

        User user = new User(1L,"张飞");

        //第一个参数为哪一个交换机  第二个参数为routingKey  第三个参数为消息
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"key",user);
        System.out.println("发送成功");
    }   
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

4.创建执行回调方法的类

@Component  //交给spring管理
public class MQCallback implements ConfirmCallback,RabbitTemplate.ReturnCallback{

    //消息投递到交换机  无论失败和成功都会调用
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm回调执行");
        System.out.println("correlationData:"+correlationData);
        System.out.println("消息是否投递到交换机:"+ack);
        System.out.println("cause:"+cause);
    }

    //消息投递到队列  只有失败的时候才会回调
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("returnedMessage回调执行...");
        System.out.println("message:"+message);
        System.out.println("错误码replyCode:"+replyCode);
        System.out.println("错误信息replyText:"+replyText);
        System.out.println("exchange:"+exchange);
        System.out.println("routingKey:"+routingKey);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

5.创建消费者

//消费者的类
@Component   //需要将该类交给spring管理
public class ReceiveHandler {

    //注入rabbitmq工具类
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    //使用注解监听指定的队列  监听email
    @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL},containerFactory = "rabbitListenerContainerFactory")
    public void receive_email(@Payload User user, Message message, Channel channel) throws IOException {
        //获取到deliveryTag
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("监听成功");
        System.out.println("消息内容:"+user);
        //使用方法签收消息
        channel.basicAck(deliveryTag , false);
    }

    //使用注解监听指定的队列  监听sms
    @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS} ,containerFactory = "rabbitListenerContainerFactory")
    public void receive_sms(@Payload User user, Message message, Channel channel) throws IOException {
        //获取到deliveryTag
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("监听成功");
        System.out.println("消息内容:"+user);
        //使用方法签收消息     这里就是手动签收
        channel.basicAck(deliveryTag , false);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

6.交换机的三种类型

6.1Fanout:广播,将消息发送给所有绑定到交换机上的队列
6.2Direct:定向,把消息发送给符合指定routingKey的队列
6.3Topic:通配符,把消息交给符合routing pattern(路由模式)的队列 一堆或者一个

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/437853
推荐阅读
相关标签
  

闽ICP备14008679号