赞
踩
1、Direct exchange: 直连交换机,根据Routing Key(路由键)进行投递到不同队列。
代码实现:
消息发送者,在pom.xml导入相关依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
在application.yml自定义direct交换机的属性
# 应用名称 spring: application: name: 001-rabbitmq-send rabbitmq: host: 192.168.126.124 #rabbitmq服务器地址 port: 5672 #AMQP协议端口 username: admin #账号 password: admin #密码 virtual-host: / #虚拟机 server: port: 1001 #自定义消息队列的属性 custom: rabbitmq: # direct类型的消息的发送 direct: queue-name: directQueue exchange-name: directExchange routing-key: directRoutingKey
读取application.yml自定义属性付给实体类对象
@Data
@Component
@ConfigurationProperties(prefix = "custom.rabbitmq.direct")
public class ExchangeProperties {
private String queueName;
private String exchangeName;
private String routingKey;
}
声明Direct类型交换机、消息队列、绑定关系
@Configuration public class RabbitMqConfiguration { @Autowired private ExchangeProperties exchangeProperties //声明Direct类型交换机、消息队列、绑定关系 @Bean public DirectExchange directExchange(){ //默认创建的交换机为持久化、非自动删除(自动删除没有监听的交换机,该交换机会被自动删除)的交换机 return new DirectExchange(exchangeProperties.getExchangeName()); } @Bean public Queue directQueue(){ //默认创建的消息队列持久化、非自动删除、非排外(当前消息可以被任意消息消费,但是只能有一个消费者消费) return new Queue(exchangeProperties.getQueueName()); } @Bean public Binding directBinding(Queue directQueue,DirectExchange directExchange){ return BindingBuilder .bind(directQueue) .to(directExchange) .with(exchangeProperties.getRoutingKey()); } }
消息发送者发送消息
@Service public class SendMsgImpl implements SendMsg { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ExchangeProperties exchangeProperties; @Override public void sendDirectMessage() throws JsonProcessingException { rabbitTemplate.convertAndSend( exchangeProperties.getExchangeName(), exchangeProperties.getRoutingKey(), "这是第2条测试消息" ); } }
消息接收者,application.yml
# 应用名称 spring: application: name: 001-rabbitmq-receive rabbitmq: host: 192.168.126.124 #rabbitmq服务器地址 port: 5672 #AMQP协议端口 username: admin #账号 password: admin #密码 virtual-host: / #虚拟机 listener: simple: #消息确认机制,manual表示手动确认,auto代表自动确认 acknowledge-mode: manual # 应用服务 WEB 访问端口 server: port: 2001
消息接收者接收消息并手动确认消息
package com.wen.servive.impl; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import com.wen.servive.ReceiveMsg; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.List; import java.util.Map; @Service public class ReceiveMsgImpl implements ReceiveMsg { @RabbitListener(queues = "directQueue") public void receiveDirectMessage(Message msg, Channel channel) throws IOException { System.out.println(msg); //手动消费,注入一个参数Channel对象 channel.basicAck( //根据消息标识,来手动消费消息 msg.getMessageProperties().getDeliveryTag(),false); } }
2、Fanout exchange
不使用路由键,使用广播模式,将消息传递到所有绑定Fanout exchang的消息队列,当消息队列未被消息消费者监听时,自动删除消息队列,所以消息队列的消息是非持久化。
消息发送者,application.yml自定义fanout交换机的属性
#自定义消息队列的属性
custom:
rabbitmq:
# fanout类型的消息的发送
fanout:
queue-name: fanoutQueue
exchange-name: fanoutExchange
读取application.yml自定义属性付给实体类对象
@Data
@Component
@ConfigurationProperties(prefix = "custom.rabbitmq.fanout")
public class FanoutProperties {
private String queueName;
private String exchangeName;
}
消息发送者发送消息
@Override
public void sendFanoutMessage() throws JsonProcessingException {
rabbitTemplate.convertAndSend(
//Fanout交换机没有路由键
fanoutProperties.getExchangeName(), "","这是第3条测试消息");
}
消息接收者接收消息服务
- 声明fanout类型交换机、消息队列、绑定关系
- 接收消息
@RabbitListener(bindings = {
@QueueBinding(
//声明绑定关系中的消息队列,如果没有命名消息队列名称,是随机命名的名称
value = @Queue,
exchange = @Exchange(name="fanoutExchange",type = "fanout")
)
})
public void receiveFanoutMessage(Message msg, Channel channel) throws IOException {
System.out.println(msg);
//手动消费,注入一个参数Channel对象
channel.basicAck(
//根据消息标识,来手动消费消息
msg.getMessageProperties().getDeliveryTag(), false
);
}
3、Topic exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,.表示一个词。
例子:
路由键为routingKeyA aa,routingKeyB aa.bb,routingKeyC aa.bb.cc
Topic exchange的路由通配规则如下
aa.* 必须以aa开头的两个单词,只有routingKeyB符合
aa.# 必须以aa开头的零个或多个单词,routingKeyA routingKeyB routingKeyC符合
消息发送者,application.yml自定义Topic 交换机的属性
#自定义消息队列的属性
custom:
rabbitmq:
# topic类型的消息的发送
topic:
queue-name1: topicQueue1
queue-name2: topicQueue2
queue-name3: topicQueue3
binding-key1: aa
binding-key2: aa.*
binding-key3: aa.#
exchange-name: topicExchange
routing-key1: aa
routing-key2: aa.bb
routing-key3: aa.bb.cc
读取application.yml自定义属性付给实体类对象
@Data @Component @ConfigurationProperties(prefix = "custom.rabbitmq.topic") public class TopicProperties { private String queueName1; private String queueName2; private String queueName3; private String bindingKey1; private String bindingKey2; private String bindingKey3; private String exchangeName; private String routingKey1; private String routingKey2; private String routingKey3; }
声明Topic类型交换机、消息队列、绑定关系
@Configuration public class RabbitMqConfiguration { @Autowired private TopicProperties topicProperties; //声明 Topic类型交换机(1)、消息队列(3)、绑定关系(3) @Bean public TopicExchange topicExchange(){ return new TopicExchange(topicProperties.getExchangeName()); } @Bean public Queue topicQuene1(){ return new Queue(topicProperties.getQueueName1()); } @Bean public Queue topicQuene2(){ return new Queue(topicProperties.getQueueName2()); } @Bean public Queue topicQuene3(){ return new Queue(topicProperties.getQueueName3()); } @Bean public Binding topicBinding1(){ return BindingBuilder .bind(topicQuene1()) .to(topicExchange()) .with(topicProperties.getBindingKey1()); } @Bean public Binding topicBinding2(){ return BindingBuilder .bind(topicQuene2()) .to(topicExchange()) .with(topicProperties.getBindingKey2()); } @Bean public Binding topicBinding3(){ return BindingBuilder .bind(topicQuene3()) .to(topicExchange()) .with(topicProperties.getBindingKey3()); } }
消息发送者发送消息
@Override
public void sendTopicMessage() throws JsonProcessingException {
rabbitTemplate.convertAndSend(
topicProperties.getExchangeName(), topicProperties.getRoutingKey1(),"这是第4条测试消息");
rabbitTemplate.convertAndSend(
//Fanout交换机没有路由键
topicProperties.getExchangeName(), topicProperties.getRoutingKey2(),"这是第5条测试消息");
rabbitTemplate.convertAndSend(
//Fanout交换机没有路由键
topicProperties.getExchangeName(), topicProperties.getRoutingKey3(),"这是第6条测试消息");
}
消息接收者接收消息
@RabbitListener(queues = {"topicQueue1",
"topicQueue2",
"topicQueue3"
})
public void receiveTopicMessage(Message msg, Channel channel) throws IOException {
System.out.println(msg);
//手动消费,注入一个参数Channel对象
channel.basicAck(
//根据消息标识,来手动消费消息
msg.getMessageProperties().getDeliveryTag(), false
);
}
4、Header exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。
消息发送者,application.yml自定义正常交换机和死信交换机的属性
#自定义消息队列的属性
custom:
rabbitmq:
#模拟死性队列的场景 ,direct类型的交换机
model:
queue-name: modelQuene
exchange-name: modelExchange
routing-key: modelRoutingKey
#direct类型的交换机
dead:
queue-name: deadQuene
exchange-name: deadExchange
routing-key: deadRoutingKey
读取application.yml自定义属性付给实体类对象
@Data
@Component
@ConfigurationProperties(prefix = "custom.rabbitmq.model")
public class ModelProperties {
private String queueName;
private String exchangeName;
private String routingKey;
}
@Data
@Component
@ConfigurationProperties(prefix = "custom.rabbitmq.dead")
public class DeadProperties {
private String queueName;
private String exchangeName;
private String routingKey;
}
声明正常交换机和死信交换机、消息队列、绑定关系
@Configuration public class RabbitMqConfiguration { @Autowired private ModelProperties modelProperties; @Autowired private DeadProperties deadProperties; //声明死性队列场景 //向Model中发送消息,等待消息过期,会自动发送到死信交换机中,根据死信routingKey转发到死信队列中 @Bean public DirectExchange modelExchange(){ return new DirectExchange(modelProperties.getExchangeName()); } @Bean public Queue modelQuene(){ Map<String,Object> arguments = new HashMap<>(); //指定消息过期的时间,30秒过期 arguments.put("x-message-ttl",30*1000); //指定死信交换机 arguments.put("x-dead-letter-exchange",deadProperties.getExchangeName()); //指定死信routingKey arguments.put("x-dead-letter-routing-key",deadProperties.getRoutingKey()); return new Queue(modelProperties.getQueueName(),true,false,false,arguments); } @Bean public Binding modelBinding(){ return BindingBuilder .bind(modelQuene()) .to(modelExchange()) .with(modelProperties.getRoutingKey()); } @Bean public DirectExchange deadExchange(){ return new DirectExchange(deadProperties.getExchangeName()); } @Bean public Queue deadQuene(){ return new Queue(deadProperties.getQueueName()); } @Bean public Binding deadBinding(){ return BindingBuilder .bind(deadQuene()) .to(deadExchange()) .with(deadProperties.getRoutingKey()); } }
消息发送者发送消息到正常交换机
@Override
public void sendModelMessage() throws JsonProcessingException {
rabbitTemplate.convertAndSend(
modelProperties.getExchangeName(), modelProperties.getRoutingKey(),"这是第7条测试消息");
}
死信消息接收者接收消息
@Service public class ReceiveMsgImpl implements ReceiveMsg { //接收modelQueue的消息队列,测试死信队列,这里不能消费消息 // @RabbitListener(queues = "modelQueue") public void receiveModelMessage(Message msg, Channel channel) throws IOException { System.out.println(msg); //手动消费,注入一个参数Channel对象 channel.basicAck( //根据消息标识,来手动消费消息 msg.getMessageProperties().getDeliveryTag(), false ); } // 当modelQueue消息超时时,会被转发到死信队列中进行处理 @RabbitListener(queues = "deadQuene") public void receiveDeadMessage(Message msg, Channel channel) throws IOException { System.out.println("receiveDeadMessage:"+msg); //手动消费,注入一个参数Channel对象 channel.basicAck( //根据消息标识,来手动消费消息 msg.getMessageProperties().getDeliveryTag(), false); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。