赞
踩
1.1拉取
docker pull rabbitmq:3-management
1.2导入后加载镜像
docker load -i mq.tar
2 启动MQ
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-managment
登录管理界面
spring:
rabbitmq:
listener:
simple:
prefetch: 1
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;
......
String exchangeName = "name.fanout";
String message = "Hello World";
rabbitTemplate.convertAndSend(exchangeName, "", message);
......
consumer
@Configration public class FanoutConfig { @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("name.fanout"); } @Bean public Queue fanoutQueue1(){ return new Queue("name.queue1"); } @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Queue fanoutQueue2(){ return new Queue("name.queue2"); } @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
路由 Direct Exchange
将接收到的消息根据规则路由到指定的Queue。
每个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
BindingKey可以相同
consumer
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {
@RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称"), exchange = @Exchange(name = "交换机名称1", type = ExchangeTypes.DIRECT), key = {"key1","key2"}))
public void listenerDirectQueue1(String msg){
// 业务逻辑
}
@RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.DIRECT), key = {"key1","key2"}))
public void listenerDirectQueue2(String msg){
// 业务逻辑
}
}
publisher
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;
......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "key1";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......
主题 Topic Exchange
与DirectExchange类似,但Topic Exchange 的RoutingKey必须时多个单词列表,并且以 . 分割
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
consumer
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {
@RabbitListener(bindings= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.TOPIC), key = "xxx.#"))
public void listenerTopicQueue2(String msg){
// 业务逻辑
}
}
publisher
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;
......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "xxx.aaa";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......
# publisher // 建立连接 ConnecationFactory factory = new ConnectionFactory(); // 设置连接参数 主机名、端口号、vhost、用户名、密码 factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("admin"); factory.setPassword("admin"); // 建立连接 Connaction connetion = factory.newConnection(); // 创建通道Channel Channel channel = connetion.createChannel(); // 创建队列Queues String queueName = "one.queue"; channel.queueDeclare(queueName, false, false, false, null); // 发送消息 String message = "Hello World"; channel.basicPubulsh("", queueName, null, message.getBytes); // 关闭通道、连接 channel.close(); connetion.close();
# consumer // 建立连接 ConnecationFactory factory = new ConnectionFactory(); // 设置连接参数 主机名、端口号、vhost、用户名、密码 factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("admin"); factory.setPassword("admin"); // 建立连接 Connaction connetion = factory.newConnection(); // 创建通道Channel Channel channel = connetion.createChannel(); // 创建队列Queues String queueName = "one.queue"; channel.queueDeclare(queueName, false, false, false, null); // 订阅消息 String message = "Hello World"; channel.basicPubulsh(queueName, true, new DefaultConsumer()channel{ @Override pubilc void handleDelivery(String consumerTag, Envelope envelpoe, AMQP.BasicProperties properties, byte[] body) throe IOException { // 处理消息 } }); // 关闭通道、连接 channel.close(); connetion.close();
AMQP,应用程序之间传递消息的协议,与平台无关。
SpringAMQP,基于AMQP协议定义的一套API。spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
监听器容器,用于异步处理入站消息
用于发送和接收消息的RabbitTemplate
RabbitAdmin用于自动声明队列,交换和绑定
注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
<!-- 父工程中引入spring-amqp的依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 127.0.0.1 # rabbitMQ ip地址
port: 5672
virtual-host: /
username: admin
password: admin
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;
......
String queueName = "queueName";
String message = "Hello World";
rabbitTemplate.convertAndSend(queueName, message);
......
spring:
rabbitmq:
host: 127.0.0.1 # 主机名
port: 5672
virtual-host: /
username: admin
password: admin
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {
@RabbitListener(queue = "队列名称")
public void listenerSimpleQueue(String msg){
// 业务逻辑
}
}
消息转换器
RabbitTemplate ,将message对象序列化为字节
修改序列化方式
父工程
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
publisher
@Configration
public class SpringRabbitListener {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
以上是学习 黑马程序员《微服务技术全栈教程》的学习笔记
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。