当前位置:   article > 正文

服务异步通信RabbitMQ_rabbittemplate发送消息

rabbittemplate发送消息

目录

1. 初识MQ

  1. 同步通讯
    直播、微服务间基于Feign的调用
    问题:耦合高、性能下降(调用耗时、等待响应(资源浪费)、吞吐量下降、服务阻塞、级联失败
  2. 异步通讯
    微信聊天
    事件驱动模式
    在这里插入图片描述
    优势一:服务解耦
    优势二:性能提升,吞吐量提高
    优势三:服务没有强依赖,解决级联失败问题
    优势四:流量消峰
    问题一:依赖于Broker的可靠性、安全性、吞吐能力
    问题二:架构复杂了,业务没有明显的流水线,不好追踪管理
  3. 异步通讯框架
    MQ,存放消息的队列,也就是事件驱动架构中的Broker。
    在这里插入图片描述

2. RabbitMQ快速入门

  1. RabbitMQ概述与安装
    基于Erlang语言开发的开源消息通信中间件
    使用docker安装
1.1拉取
docker pull rabbitmq:3-management
1.2导入后加载镜像
docker load -i mq.tar
2 启动MQ
docker run \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=admin \
 --name mq \
 --hostname mq1 \ 
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-managment 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

登录管理界面
在这里插入图片描述
在这里插入图片描述

  1. 常见消息模型
    publisher:消息发布者,将消息发送到队列queue
    queue:消息队列,负责接收并缓存消息
    consumer:订阅队列,处理队列中的消息
    subscribe:订阅
  • 基本消息队列 BasicQueue
    HelloWorld
    在这里插入图片描述
  • 工作消息队列 WorkQueue
    可以提高消息处理速度,避免队列消息堆积
    在这里插入图片描述
    消息预取机制:在处理消息之前,取出所有消息
    解决:每次只能获取一条消息,处理完成才能获取下一个消息
spring:
 rabbitmq:
  listener:
   simple:
    prefetch: 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 发布订阅
    允许将同一消息发送给多个消费者。
    Exchange:只负责消息的转发
    广播 Fanout Exchange
    将收到的消息路由到每一个跟其绑定的queue
    在这里插入图片描述
    publisher
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;

......
String exchangeName = "name.fanout";
String message = "Hello World";
rabbitTemplate.convertAndSend(exchangeName, "", message);
......

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

consumer

@Configration
public class FanoutConfig {
 @Bean
 public FanoutExchange fanoutExchange(){
  return new FanoutExchange("name.fanout");
 }
 @Bean
 public Queue fanoutQueue1(){
	  return new Queue("name.queue1");
 }
 @Bean
 public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
	  return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
 }
 @Bean
 public Queue fanoutQueue2(){
	  return new Queue("name.queue2");
 }
 @Bean
 public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
	  return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
 }
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

路由 Direct Exchange
将接收到的消息根据规则路由到指定的Queue。

每个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
BindingKey可以相同
在这里插入图片描述
consumer

//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {
 @RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称"), exchange = @Exchange(name = "交换机名称1", type = ExchangeTypes.DIRECT), key = {"key1","key2"}))
 public void listenerDirectQueue1(String msg){
  // 业务逻辑
 }
 @RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.DIRECT), key = {"key1","key2"}))
 public void listenerDirectQueue2(String msg){
  // 业务逻辑
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

publisher

//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;

......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "key1";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

主题 Topic Exchange
与DirectExchange类似,但Topic Exchange 的RoutingKey必须时多个单词列表,并且以 . 分割
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
在这里插入图片描述
consumer

//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {
 @RabbitListener(bindings= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.TOPIC), key = "xxx.#"))
 public void listenerTopicQueue2(String msg){
  // 业务逻辑
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

publisher

//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;

......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "xxx.aaa";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. 快速入门
# publisher
// 建立连接
ConnecationFactory factory = new ConnectionFactory();
// 设置连接参数 主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 建立连接
Connaction connetion = factory.newConnection();
// 创建通道Channel
Channel channel = connetion.createChannel();
// 创建队列Queues
String queueName = "one.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 发送消息
String message = "Hello World";
channel.basicPubulsh("", queueName, null, message.getBytes);
// 关闭通道、连接
channel.close();
connetion.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
# consumer
// 建立连接
ConnecationFactory factory = new ConnectionFactory();
// 设置连接参数 主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 建立连接
Connaction connetion = factory.newConnection();
// 创建通道Channel
Channel channel = connetion.createChannel();
// 创建队列Queues
String queueName = "one.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 订阅消息
String message = "Hello World";
channel.basicPubulsh(queueName, true, new DefaultConsumer()channel{
@Override
pubilc void handleDelivery(String consumerTag, Envelope envelpoe, AMQP.BasicProperties properties, byte[] body) throe IOException {
 // 处理消息
 }
});
// 关闭通道、连接
channel.close();
connetion.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

3. SpringAMQP

  • AMQP,应用程序之间传递消息的协议,与平台无关。

  • SpringAMQP,基于AMQP协议定义的一套API。spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

    监听器容器,用于异步处理入站消息
    用于发送和接收消息的RabbitTemplate
    RabbitAdmin用于自动声明队列,交换和绑定

注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

  1. 引入依赖
<!-- 父工程中引入spring-amqp的依赖 -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  1. publisher 发送消息
spring:
 rabbitmq:
  host: 127.0.0.1 # rabbitMQ ip地址
  port: 5672
  virtual-host: /
  username: admin
  password: admin
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;

......
String queueName = "queueName";
String message = "Hello World";
rabbitTemplate.convertAndSend(queueName, message);
......

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. consumer 监听消息
spring:
 rabbitmq:
  host: 127.0.0.1 # 主机名
  port: 5672
  virtual-host: /
  username: admin
  password: admin
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {
 @RabbitListener(queue = "队列名称")
 public void listenerSimpleQueue(String msg){
  // 业务逻辑
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

消息转换器
RabbitTemplate ,将message对象序列化为字节

修改序列化方式

父工程

<dependency>
	<groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-databind</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

publisher

@Configration
public class SpringRabbitListener {
 @Bean
 public MessageConverter messageConverter(){
  return new Jackson2JsonMessageConverter();
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

最后

以上是学习 黑马程序员《微服务技术全栈教程》的学习笔记

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/399205
推荐阅读
相关标签
  

闽ICP备14008679号