当前位置:   article > 正文

rabbitMQ的使用_rabbitmq convertandsend

rabbitmq convertandsend

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。

依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

队列的创建

  1. /**
  2. * 配置连接工厂
  3. *
  4. * @return
  5. */
  6. @Bean
  7. public ConnectionFactory connectionFactory() {
  8. CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
  9. connectionFactory.setUsername(username);
  10. connectionFactory.setPassword(password);
  11. connectionFactory.setVirtualHost("/");
  12. connectionFactory.setPublisherConfirms(true);
  13. return connectionFactory;
  14. }
  15. /**
  16. * 设置RabbitTemplate
  17. *
  18. * @return
  19. */
  20. @Bean
  21. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  22. public RabbitTemplate rabbitTemplate() {
  23. RabbitTemplate template = new RabbitTemplate(connectionFactory());
  24. return template;
  25. }
  26. /**
  27. * 指定 与queue绑定的exchange的名称
  28. *
  29. * @return
  30. */
  31. @Bean
  32. public DirectExchange exchange() {
  33. return new DirectExchange("ExchangeName");
  34. }
  35. /**
  36. * 创建queue
  37. *
  38. * @return
  39. */
  40. @Bean
  41. public Queue queue() {
  42. Map<String, Object> arguments = Maps.newHashMap();
  43. arguments.put("x-message-ttl", 300000);
  44. Queue queue = new Queue("queueName", true, false, false, arguments);
  45. return queue;
  46. }
  47. /**
  48. * exchange和queue的绑定关系
  49. *
  50. * @return
  51. */
  52. @Bean("binding")
  53. public Binding binding() {
  54. return BindingBuilder.bind(queue()).to(exchange()).with("routing.key");
  55. }

使用:AmqpAdmin 

  1. @Autowired
  2. private AmqpAdmin amqpAdmin;
  3. /**
  4. * 创建交换机
  5. * DirectExchange 点对点传递消息
  6. * FanoutExchange 一对多发送消息,发布-订阅模式
  7. * TopicExchange 一对多发送消息,但要分类
  8. */
  9. @Test
  10. void creatChange() {
  11. DirectExchange directExchange = new DirectExchange("exchange.test.direct", true,false);
  12. amqpAdmin.declareExchange(directExchange);
  13. FanoutExchange fanoutExchange = new FanoutExchange("exchange.test.fanout", true, false);
  14. amqpAdmin.declareExchange(fanoutExchange);
  15. TopicExchange topicExchange = new TopicExchange("exchange.test.topic", true, false);
  16. amqpAdmin.declareExchange(topicExchange);
  17. }
  18. /**
  19. * 创建队列
  20. */
  21. void creatQueue() {
  22. Queue queue = new Queue("queue.test", true, false, false);
  23. amqpAdmin.declareQueue(queue);
  24. }
  25. void binding() {
  26. Binding binding = new Binding("queue.test", Binding.DestinationType.QUEUE, "exchange.first.direct", "exchange.first.direct", null);
  27. amqpAdmin.declareBinding(binding);
  28. }

 

RabbitMQ队列的参数(Queue)


1、name: 队列的名称;
2、actualName: 队列的真实名称,默认用name参数,如果name为空,则根据规则生成一个;
3、durable: 是否持久化;
4、exclusive: 是否独享、排外的;
5、autoDelete: 是否自动删除;
6、arguments:队列的其他属性参数,有如下可选项:
     x-message-ttl:消息的过期时间,单位:毫秒;
     x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
     x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
     x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
     x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
     x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
     x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
     x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
     x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
     x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
     x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

RabbitMQ队列的参数(Exchange)

name:名称
Durability:持久化标志,如果为true,则表明此exchange是持久化的。
Auto-delete:删除标志,当所有队列在完成使用此exchange时,是否删除
delayed:如果要将交换声明为“x-delayed-message”交换,则返回true。需要代理上的延迟消息交换插件。需要在节点上延迟消息交换。默认false
durable: 是否持久化;
internal:是否是内部的。
internal:是否是内部的。
ignoreDeclarationExceptions:是否忽略声明时的错误。默认是false。
type:交换机的类型:支持四种模式 DIRECT, FANOUT TOPIC, HEADERS
    DIRECT:默认的模式,使用这个类型时,可以不指定routing key的名字(会默认创建一个和队列同名的louting key)。这种类型的Exchange,通常是将同一个message以一种循环的方式分发到的不同与该exchange绑定的Queue,即不同的消费者手中。
    FANOUT :这种类型的Exchange,会忽略routing key的存在,直接将message广播到所有与该exchange绑定的Queue中。相当于群发功能。
    TOPIC:根据routing key和Exchange的类型将message发送到与该exchange绑定一个或者多个Queue中,可以实现各种publish/subscribe,即发布订阅。交换机的routingkey不能有任意的。必须是由点分开的一串单词。可以由多个单词,但是有最大限制。最大限制是:255bytes.
        这种模式下的routing key支持模糊匹配。
        *:表示匹配任意一个单词
        #:表示匹配任意一个或多个单词。
    HEADERS:根据Message的一些头部信息来分发过滤Message,忽略routing key的属性,如果Header信息和message消息的头信息相匹配,那么这条消息就匹配上了。Arguments参数就是message的Hearders的参数,参数用来根据Headers Exchange 的 Headers 的Arguments设置 过滤消息

RabbitMQ队列的参数(routinf key)

routing key:路由规则 是决定被生产者发送到exchange的消息最终将被发送到哪一个/多个topic的匹配规则。多个消费者(topic)通过routing key和exchange绑定。当生产者生产消息时会指定exchange和routingkey。服务器通过比较exchange和routingkey决定这条消息要被发送到哪些topic中。

direct exchange:根据路由键(routingkey)需要完全匹配上才可以。会把消息路由到那些bingding key与routing key完全匹配的队列中

topic exchange:所有符合routingkey(此中状态下的key可以是一个表达式)的routingkey所有绑定的队列都可以收到消息。

队列监听

队列的监听使用 @RabbitListener注解在对应的方法即可获取到信息。

队列queue已经存在时监听的方式:

可以直接使用 @RabbitListener中的 queues (String[])参数指定要监听的队列即可。

  1. @Component
  2. public class MqListener{
  3. /**
  4. * 处理mq消息
  5. *
  6. * @param message
  7. */
  8. @RabbitListener(queues = AppConstans.PLUGIN_QUEUE)
  9. public void process(Message message) {
  10. try {
  11. String content = new String(message.getBody(), "UTF-8");
  12. //以下是业务逻辑
  13. } catch (Exception e) {
  14. }
  15. }
  16. }

队列queue已经不存在时监听的方式:

队列不存在时需要创建队列和交换机(exchange)。@RabbitListener的Binding(QueueBinding[])可以实现routingKey、queue、exchange的创建以及两者绑定。

@RabbitListener 的 bindings 属性声明 Binding(若 RabbitMQ 中不存在该绑定所需要的 Queue、Exchange、Routing Key 则自动创建,若存在则抛出异常)

  1. @RabbitListener(bindings = @QueueBinding(
  2. exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
  3. value = @Queue(value = "consumer_queue",durable = "true"),
  4. key = "key.#"
  5. ))
  6. public void processMessage1(Message message) {
  7. System.out.println(message);
  8. }
  9. /**
  10. * 手动提交ack,用于 防止数据丢失,保证数据一致性
  11. * @param message 封装的消息
  12. * @param orderEntity 发送消息时的类
  13. * @param channel 通信渠道
  14. */
  15. @RabbitListener(queues = {"consumer_queue"})
  16. @RabbitHandler
  17. public void recieveOrderMessage(Message message Channel channel) throws IOException {
  18. System.out.println("收到了消息了--->" + message + " ====》内容:" + orderEntity);
  19. System.out.println("渠道数量:" + channel.getChannelNumber());
  20. MessageProperties messageProperties = message.getMessageProperties();
  21. System.out.println("消息处理完成---------------------------------");
  22. //消息顺序,自增
  23. long deliveryTag = messageProperties.getDeliveryTag();
  24. System.out.println(deliveryTag);
  25. //回复,签收消息, fasle表示只签收当前消息,true签收所有
  26. channel.basicAck(deliveryTag, false);
  27. }
  28. @RabbitListener(queues = {"consumer_queue"})
  29. @RabbitHandler
  30. public void recieveOrderItemMessage(Message message, OrderItemEntity orderItemEntity, Channel channel){
  31. System.out.println("收到了消息了--->" + message + " ====》内容:" + orderItemEntity);
  32. System.out.println("渠道数量:" + channel.getChannelNumber());
  33. }

数据发送:

1. SpringBoot提供了AmqpTemplate来操作RabbitMQ

  1. //注入AmqpTemplate
  2. @Autowired
  3. private AmqpTemplate amqpTemplate;
  4. public void sendMsg(){
  5. //通过converAndSend方法发送消息
  6. String msg = "hello world";
  7. amqpTemplate.converAndSend("ex3","b",msg);
  8. }

使用 RabbitTemplate

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. public boolean sendMsg(String msg) {
  4. rabbitTemplate.convertAndSend("XXXXX", "XXXX", msg);
  5. }

RabbitMQ - 简书

RabbitMQ的四种ExChange_Danna_Danna的博客-CSDN博客_rabbitmq的exchange

RabbitMQ的各个参数 - 不死码农 - 博客园

RabbitMQ入门:发布/订阅(Publish/Subscribe) - JAVA开发老菜鸟 - 博客园

RabbitMQ消息中间件技术精讲6 几种交换机总结_key

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/399311
推荐阅读
相关标签
  

闽ICP备14008679号