赞
踩
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- /**
- * 配置连接工厂
- *
- * @return
- */
- @Bean
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setPublisherConfirms(true);
- return connectionFactory;
- }
-
- /**
- * 设置RabbitTemplate
- *
- * @return
- */
- @Bean
- @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- public RabbitTemplate rabbitTemplate() {
- RabbitTemplate template = new RabbitTemplate(connectionFactory());
- return template;
- }
-
- /**
- * 指定 与queue绑定的exchange的名称
- *
- * @return
- */
- @Bean
- public DirectExchange exchange() {
- return new DirectExchange("ExchangeName");
- }
-
- /**
- * 创建queue
- *
- * @return
- */
- @Bean
- public Queue queue() {
- Map<String, Object> arguments = Maps.newHashMap();
- arguments.put("x-message-ttl", 300000);
- Queue queue = new Queue("queueName", true, false, false, arguments);
- return queue;
-
- }
- /**
- * exchange和queue的绑定关系
- *
- * @return
- */
- @Bean("binding")
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(exchange()).with("routing.key");
- }
使用:AmqpAdmin
- @Autowired
- private AmqpAdmin amqpAdmin;
-
- /**
- * 创建交换机
- * DirectExchange 点对点传递消息
- * FanoutExchange 一对多发送消息,发布-订阅模式
- * TopicExchange 一对多发送消息,但要分类
- */
- @Test
- void creatChange() {
- DirectExchange directExchange = new DirectExchange("exchange.test.direct", true,false);
- amqpAdmin.declareExchange(directExchange);
-
- FanoutExchange fanoutExchange = new FanoutExchange("exchange.test.fanout", true, false);
- amqpAdmin.declareExchange(fanoutExchange);
-
- TopicExchange topicExchange = new TopicExchange("exchange.test.topic", true, false);
- amqpAdmin.declareExchange(topicExchange);
- }
-
- /**
- * 创建队列
- */
- void creatQueue() {
- Queue queue = new Queue("queue.test", true, false, false);
- amqpAdmin.declareQueue(queue);
- }
-
- void binding() {
- Binding binding = new Binding("queue.test", Binding.DestinationType.QUEUE, "exchange.first.direct", "exchange.first.direct", null);
- amqpAdmin.declareBinding(binding);
- }
1、name: 队列的名称;
2、actualName: 队列的真实名称,默认用name参数,如果name为空,则根据规则生成一个;
3、durable: 是否持久化;
4、exclusive: 是否独享、排外的;
5、autoDelete: 是否自动删除;
6、arguments:队列的其他属性参数,有如下可选项:
x-message-ttl:消息的过期时间,单位:毫秒;
x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
name:名称
Durability:持久化标志,如果为true,则表明此exchange是持久化的。
Auto-delete:删除标志,当所有队列在完成使用此exchange时,是否删除
delayed:如果要将交换声明为“x-delayed-message”交换,则返回true。需要代理上的延迟消息交换插件。需要在节点上延迟消息交换。默认false
durable: 是否持久化;
internal:是否是内部的。
internal:是否是内部的。
ignoreDeclarationExceptions:是否忽略声明时的错误。默认是false。
type:交换机的类型:支持四种模式 DIRECT, FANOUT TOPIC, HEADERS
DIRECT:默认的模式,使用这个类型时,可以不指定routing key的名字(会默认创建一个和队列同名的louting key)。这种类型的Exchange,通常是将同一个message以一种循环的方式分发到的不同与该exchange绑定的Queue,即不同的消费者手中。
FANOUT :这种类型的Exchange,会忽略routing key的存在,直接将message广播到所有与该exchange绑定的Queue中。相当于群发功能。
TOPIC:根据routing key和Exchange的类型将message发送到与该exchange绑定一个或者多个Queue中,可以实现各种publish/subscribe,即发布订阅。交换机的routingkey不能有任意的。必须是由点分开的一串单词。可以由多个单词,但是有最大限制。最大限制是:255bytes.
这种模式下的routing key支持模糊匹配。
*:表示匹配任意一个单词
#:表示匹配任意一个或多个单词。
HEADERS:根据Message的一些头部信息来分发过滤Message,忽略routing key的属性,如果Header信息和message消息的头信息相匹配,那么这条消息就匹配上了。Arguments参数就是message的Hearders的参数,参数用来根据Headers Exchange 的 Headers 的Arguments设置 过滤消息
routing key:路由规则 是决定被生产者发送到exchange的消息最终将被发送到哪一个/多个topic的匹配规则。多个消费者(topic)通过routing key和exchange绑定。当生产者生产消息时会指定exchange和routingkey。服务器通过比较exchange和routingkey决定这条消息要被发送到哪些topic中。
direct exchange:根据路由键(routingkey)需要完全匹配上才可以。会把消息路由到那些bingding key与routing key完全匹配的队列中
topic exchange:所有符合routingkey(此中状态下的key可以是一个表达式)的routingkey所有绑定的队列都可以收到消息。
队列的监听使用 @RabbitListener注解在对应的方法即可获取到信息。
队列queue已经存在时监听的方式:
可以直接使用 @RabbitListener中的 queues (String[])参数指定要监听的队列即可。
- @Component
- public class MqListener{
-
- /**
- * 处理mq消息
- *
- * @param message
- */
- @RabbitListener(queues = AppConstans.PLUGIN_QUEUE)
- public void process(Message message) {
- try {
- String content = new String(message.getBody(), "UTF-8");
- //以下是业务逻辑
- } catch (Exception e) {
-
- }
- }
- }
队列queue已经不存在时监听的方式:
队列不存在时需要创建队列和交换机(exchange)。@RabbitListener的Binding(QueueBinding[])可以实现routingKey、queue、exchange的创建以及两者绑定。
@RabbitListener 的 bindings 属性声明 Binding(若 RabbitMQ 中不存在该绑定所需要的 Queue、Exchange、Routing Key 则自动创建,若存在则抛出异常)
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
- value = @Queue(value = "consumer_queue",durable = "true"),
- key = "key.#"
- ))
- public void processMessage1(Message message) {
- System.out.println(message);
- }
-
-
-
-
- /**
- * 手动提交ack,用于 防止数据丢失,保证数据一致性
- * @param message 封装的消息
- * @param orderEntity 发送消息时的类
- * @param channel 通信渠道
- */
- @RabbitListener(queues = {"consumer_queue"})
- @RabbitHandler
- public void recieveOrderMessage(Message message Channel channel) throws IOException {
- System.out.println("收到了消息了--->" + message + " ====》内容:" + orderEntity);
- System.out.println("渠道数量:" + channel.getChannelNumber());
-
- MessageProperties messageProperties = message.getMessageProperties();
- System.out.println("消息处理完成---------------------------------");
- //消息顺序,自增
- long deliveryTag = messageProperties.getDeliveryTag();
- System.out.println(deliveryTag);
- //回复,签收消息, fasle表示只签收当前消息,true签收所有
- channel.basicAck(deliveryTag, false);
- }
-
-
- @RabbitListener(queues = {"consumer_queue"})
- @RabbitHandler
- public void recieveOrderItemMessage(Message message, OrderItemEntity orderItemEntity, Channel channel){
- System.out.println("收到了消息了--->" + message + " ====》内容:" + orderItemEntity);
- System.out.println("渠道数量:" + channel.getChannelNumber());
- }
1. SpringBoot提供了AmqpTemplate来操作RabbitMQ
-
- //注入AmqpTemplate
- @Autowired
- private AmqpTemplate amqpTemplate;
- public void sendMsg(){
- //通过converAndSend方法发送消息
- String msg = "hello world";
- amqpTemplate.converAndSend("ex3","b",msg);
- }
使用 RabbitTemplate
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- public boolean sendMsg(String msg) {
- rabbitTemplate.convertAndSend("XXXXX", "XXXX", msg);
- }
RabbitMQ的四种ExChange_Danna_Danna的博客-CSDN博客_rabbitmq的exchange
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。