当前位置:   article > 正文

SpringCloud微服务学习日志-RabbitMQ_微服务日志统一处理通过mq存表

微服务日志统一处理通过mq存表

一.传统同步通讯的优缺点

我们使用的像Feign调用这样的消费者和提供者之间通讯的方法叫做同步通讯。

同步通讯就是消费者发送请求后,要等着提供者返回数据。

但是有的时候,当我们提供者所有存在的示例全都宕机了的话,我们的消费者也会卡住。 当然,一个请求卡住了可能过会儿他就去发下一个请求了,但是,如果下一个请求也这样卡住了呢?用户的请求时间就会被无限拉长。

同步通讯的请求方式 

优点: 

  • 时效性强,可以立即获取结果

缺点:

  • 耦合度过高,不利于业务拓展
  • 性能和吞吐能力较差
  • 额外的资源消耗
  • 有级联失败的可能 


二.异步通讯及其优缺点

为了解决在业务量太大的情况下同步通讯出现的服务卡顿的情况,就产生了异步通讯这个概念。

异步通讯的请求方式 

优点

  • 耦合度低
  • 吞吐量小
  • 故障隔离
  • 流量削峰 

缺点

  • 依赖于中间件的可靠性和安全性还有吞吐能力
  • 架构复杂后无法追踪流程 

三.什么是MQ及其常见技术

MQ(Message Queue),消息队列,是用来实现异步通讯的中间件:

MQ的常见技术
RabbitMQActiveMQRocketMQKafka
公司RabbitApache阿里(Apache)Apache
开发语言ErlangJava        JavaScala&Java
协议支持

AMQP,XMPP,SMTP

STOMP

OpenWire,STOMP

REST,XMPP

AMQP

自定义自定义
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

四.RabbitMQ

本章所有的MQ都以RabbitMQ进行。

(一).RabbitMQ的安装

因为在CenterOs直接装的话还要装Erlang的环境,就懒的部署Erlang了。这边直接使用docker对RabbitMQ进行一个安装。

1.pull一下rabbitmq的镜像

docker pull rabbitmq:latest

2.运行

docker run \

-e RABBITMQ_DEFAULT_USER=username \    #后台系统登陆的账号

-e RABBITMQ_DAFAULT_PASS=password \     #后台系统登陆的密码

--name mq1                                                         #搭建MQ集群时用来区分MQ的名字

-p15672:15672 \                                                  #管理平台的端口

-p5672:5672 \                                                      #使用MQ的端口

-d \

rabbitmq:latest

3.开启管理平台 

docker ps                #查看运行着的镜像
docker exec -it 镜像ID /bin/bash      #进入rabbitmq的镜像内部

  cd  /etc/rabbitmq/conf.d/                                 #进入目录


echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf                        #开启channel

rabbitmq-plugins enable rabbitmq_management              #开启管理平台

docker restart 镜像ID                                        #重启

至此,rabbitmq的安装完成

 (二).RabbitMQ的结构 

  •  chaneel:操作MQ的工具
  • exchange:路由消息到队列
  • queue:缓存的消息
  • VirtualHost:虚拟主机,用来隔离环境 

(三).RabbitMQ常见的消息模型 

1.队列(不涉及交换机)

基本消息队列:

 工作消息队列:

2.发布订阅(涉及交换机) 

广播(发布订阅):

 路由:

主题:

(四).体验HelloWorld的消息队列 

在rabbitmq的官网可以下载helloworld的示例:

RabbitMQ tutorial - "Hello World!" — RabbitMQ

这里基于示例进行改造演示:

1.准备一个两个模块,分别作为提供者和消费者

2.父工程引入依赖 

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

3.在提供者新建测试类发送消息

  1. @Test
  2. public void testSendMessage() throws IOException, TimeoutException {
  3. // 1.建立连接
  4. ConnectionFactory factory = new ConnectionFactory();
  5. // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
  6. factory.setHost("192.168.110.23");
  7. factory.setPort(5672);
  8. factory.setVirtualHost("/");
  9. factory.setUsername("admin");
  10. factory.setPassword("admin);
  11. // 1.2.建立连接
  12. Connection connection = factory.newConnection();
  13. // 2.创建通道Channel
  14. Channel channel = connection.createChannel();
  15. // 3.创建队列
  16. String queueName = "simple.queue";
  17. channel.queueDeclare(queueName, false, false, false, null);
  18. // 4.发送消息
  19. String message = "hello, rabbitmq!";
  20. channel.basicPublish("", queueName, null, message.getBytes());
  21. System.out.println("发送消息成功:【" + message + "");
  22. // 5.关闭通道和连接
  23. channel.close();
  24. connection.close();
  25. }

4.在消费者准备接收消息的测试类

  1. public static void main(String[] args) throws IOException, TimeoutException {
  2. // 1.建立连接
  3. ConnectionFactory factory = new ConnectionFactory();
  4. // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
  5. factory.setHost("192.168.110.23");
  6. factory.setPort(5672);
  7. factory.setVirtualHost("/");
  8. factory.setUsername("admin");
  9. factory.setPassword("admin);
  10. // 1.2.建立连接
  11. Connection connection = factory.newConnection();
  12. // 2.创建通道Channel
  13. Channel channel = connection.createChannel();
  14. // 3.创建队列
  15. String queueName = "simple.queue";
  16. channel.queueDeclare(queueName, false, false, false, null);
  17. // 4.订阅消息
  18. channel.basicConsume(queueName, true, new DefaultConsumer(channel){
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope,
  21. AMQP.BasicProperties properties, byte[] body) throws IOException {
  22. // 5.处理消息
  23. String message = new String(body);
  24. System.out.println("接收到消息:【" + message + "");
  25. }
  26. });
  27. System.out.println("等待接收消息。。。。");
  28. }

 5.运行提供者的测试类

可以在rabbitmq的管理后台看到创建的队列

 点进去也可以看到发送的消息

6.启动消费者测试类 

启动后就可以接收到消息

五.SpringAMQP

(一).什么是SpringAMQP

AMQP(Advanced Message Queuing Protocol)指的是在应用程序或之间传递业务消息的开发标准。该协议与语言和平台无关。

SpringAMQP是基于AMQP协议来制定的一套API规范,提供了模板来发送和接收消息。其中SpringAMQP是抽象,Spring-rabbit是底层的默认实现。

(二).使用SpringAMQP发送HelloWorld基础消息队列

(1).在父工程中引入AMQP的依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

(2).在publisher端编写application.yml

  1. spring:
  2. rabbitmq:
  3. addresses: 127.0.0.1 #rabbitMQ的地址
  4. port: 5672 #rabbitMQ提供的客户端连接端口(非管理后台端口)
  5. username: admin #登陆账号
  6. password: admin #登陆密码
  7. virtual-host: / #虚拟主机路径

(3).在publisher的Test内编写Test方法发送消息

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class PublisherTest {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @Test
  7. public void AMQPTestDemo() {
  8. String queue = "simple.queue";
  9. String msg = "Hello World";
  10. //指定消息队列名以及消息内容发送消息
  11. rabbitTemplate.convertAndSend(queue,msg);
  12. }
  13. }

可以看到这里消息已经成功发送到MQ的消息队列里面了 

(三).使用SpringAMQP监听消息队列

(1).在customer端编写application.yml

  1. spring:
  2. rabbitmq:
  3. addresses: 127.0.0.1 #rabbitMQ的地址
  4. port: 5672 #rabbitMQ提供的客户端连接端口(非管理后台端口)
  5. username: admin #登陆账号
  6. password: admin #登陆密码
  7. virtual-host: / #虚拟主机路径

(2).在customer里面编写类来消费消息

  1. @Component
  2. public class SpringRabbitMQListener {
  3. //指定监听的消息队列名
  4. @RabbitListener(queues = "simple.queue")
  5. void listenerSimpleQueueMessage(String msg){
  6. //打印消息
  7. System.out.println("Message为: " + msg);
  8. }
  9. }

(3).启动,控制台成功打印消息

注:消息一旦被消费就会消失

(四).工作队列

工作队列模型图 

工作队列与普通的消息队列有所不同的是:

  • 多个消费端共同消费同一个队列中的消息。
  • 多个消费端之间是竞争关系,竞争这一条消息
  • 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

这里为了方便看到测试结果,新建一个方法用来充当消费者2。我设置了一个线程延迟时间,不设置的话其中一个会把所有消息给抢完。

  1. @Test
  2. public void AMQPTestDemo() throws InterruptedException {
  3. String queue = "simple.queue";
  4. String msg = "66666";
  5. //这里为了看到他更加明显的一个运行状况把发送次数提高到50次
  6. for (int i = 0; i < 50; i++) {
  7. rabbitTemplate.convertAndSend(queue,msg + i);
  8. Thread.sleep(20);
  9. }
  10. }
  1. @Component
  2. //消费者1
  3. public class SpringRabbitMQListener {
  4. @RabbitListener(queues = "simple.queue")
  5. void listenerSimpleQueueMessage(String msg) throws InterruptedException {
  6. System.out.println("Message为: " + msg);
  7. Thread.sleep(20);
  8. }
  9. //消费者2
  10. @RabbitListener(queues = "simple.queue")
  11. void listenerSimpleQueueMessage2(String msg) throws InterruptedException {
  12. System.err.println("Message2为: " + msg);
  13. Thread.sleep(200);
  14. }
  15. }

 可以看到message和message2两个消费者消费的消息是不一样的,说明他们之间是争夺关系。

 可以在消费端的application设置prefetch,意味着要把多少条消息消费完才能进入下一条:

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: admin
  6. password: admin
  7. virtual-host: /
  8. listener:
  9. simple:
  10. //要把多少条消息消费完
  11. prefetch: 1

 可以发现Message2变得更加规律了些(为什么还是这么少,因为延迟了200ms)

(五).PubSub模型

 PubSub(Publisher Subscribe)意为发布订阅模型。它与前面不同的是可以发布消息到多个消费者。常见的PubSub模型有:

  • Fanout-广播
  • Direct-路由
  • Topic-话题

1.Fanout广播模式

Fanout会将收到的消息转发到每一个与其绑定的Queue上。 

(1).声明Exchange,Queue和Binding:

  1. @Configuration
  2. public class FanoutConfig {
  3. //声明交换机
  4. @Bean
  5. public FanoutExchange fanoutExchange(){
  6. return new FanoutExchange("demo.fanout");
  7. }
  8. //声明队列1
  9. @Bean
  10. public Queue queue1(){
  11. return new Queue("fanout.queue1");
  12. }
  13. //声明队列2
  14. @Bean
  15. public Queue queue2(){
  16. return new Queue("fanout.queue2");
  17. }
  18. //声明队列1与交换机的绑定关系
  19. @Bean
  20. public Binding binding1(Queue queue1,FanoutExchange fanoutExchange){
  21. return BindingBuilder.bind(queue1).to(fanoutExchange);
  22. }
  23. //声明队列2与交换机的绑定关系
  24. @Bean
  25. public Binding binding2(Queue queue2,FanoutExchange fanoutExchange){
  26. return BindingBuilder.bind(queue2).to(fanoutExchange);
  27. }
  28. }

(2).在Publisher中写测试类:

  1. @Test
  2. public void AMQPTestDemo() throws InterruptedException {
  3. String exchange = "demo.fanout";
  4. String msg = "66666";
  5. for (int i = 0; i < 50; i++) {
  6. //在Fanout模式下不用指定RouteKey
  7. rabbitTemplate.convertAndSend(exchange,"",msg + i);
  8. Thread.sleep(20);
  9. }
  10. }

(3).查看消息:

交换机作用:

  • 接受publisher发送的消息
  • 按照规则路由到与之匹配的队列
  • 不能缓存消息,路由失败则消息会丢失
  • FanoutExchange会将其路由到与之绑定的队列 

交换机,队列,绑定对应的Bean为:

  • FanoutExchange
  • Queue
  • Binding 

 2.Direct路由模式

  • 每一个Queue与DirectExchange之间有一个bindingKey
  • 发布者发布消息时,指定消息的RoutingKey
  • Exchange将消息路由到与之对的RoutingKey的Queue上

 (1).使用注解的形式来声明Exchange,Queue,Binding的关系

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "direct.queue1"),
  3. exchange = @Exchange(name = "demo.direct",type = ExchangeTypes.DIRECT),
  4. key = {"admin","user"} //指定key为admin或者user时才触发
  5. ))
  6. void listenerDirectQueueMessage1(String msg) throws InterruptedException {
  7. System.err.println("MessageDirect1为: " + msg);
  8. }
  9. @RabbitListener(bindings = @QueueBinding(
  10. value = @Queue(name = "direct.queue2"),
  11. exchange = @Exchange(name = "demo.direct",type = ExchangeTypes.DIRECT),
  12. key = {"admin"} //指定key为admin时触发
  13. ))
  14. void listenerDirectQueueMessage2(String msg) throws InterruptedException {
  15. System.err.println("MessageDirect2为: " + msg);
  16. }

(2).publisher处声明测试类发送消息 

  1. @Test
  2. public void AMQPTestDemo() throws InterruptedException {
  3. String exchange = "demo.direct";
  4. String msg = "66666";
  5. //此处RoutingKey指定为user
  6. rabbitTemplate.convertAndSend(exchange,"user",msg);
  7. }

(3).只有MessageDirect1收到消息:

 Direct与Fanout的区别:

  • Fanout广播消息与之绑定的队列
  • Direct路由消息到与之对应RoutingKey的队列
  • 如果多个队列的RoutingKey相同,则两者无区别

3.Topic主题模式 

TopicExchange和DirectExchange有点类似,区别不同的是TopicExchange的RoutingKey必须是多个单词的列表,且用.进行分割。且Queue和Exchange指定RoutingKey时可以使用通配符*和#

  • *指代0或者多个单词
  •  #指代一个单词

  (1).使用注解的形式来声明Exchange,Queue,Binding的关系

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "topic.queue1"),
  3. exchange = @Exchange(name = "demo.topic",type = ExchangeTypes.TOPIC),
  4. key = {"admin.#"}
  5. ))
  6. void listenerTopicQueueMessage1(String msg) throws InterruptedException {
  7. System.err.println("MessageTopic1为: " + msg);
  8. }
  9. @RabbitListener(bindings = @QueueBinding(
  10. value = @Queue(name = "topic.queue2"),
  11. exchange = @Exchange(name = "demo.topic",type = ExchangeTypes.TOPIC),
  12. key = {"#.user"}
  13. ))
  14. void listenerTopicQueueMessage2(String msg) throws InterruptedException {
  15. System.err.println("MessageTopic2为: " + msg);
  16. }
  17. @RabbitListener(bindings = @QueueBinding(
  18. value = @Queue(name = "topic.queue3"),
  19. exchange = @Exchange(name = "demo.topic",type = ExchangeTypes.TOPIC),
  20. key = {"#.book"}
  21. ))
  22. void listenerTopicQueueMessage3(String msg) throws InterruptedException {
  23. System.err.println("MessageTopic3为: " + msg);
  24. }

(2).publisher处声明测试类发送消息 

  1. @Test
  2. public void AMQPTestDemo() throws InterruptedException {
  3. String exchange = "demo.topic";
  4. String msg = "66666";
  5. rabbitTemplate.convertAndSend(exchange,"admin.book",msg);
  6. }

(3).Message1和Message3收到了消息

Direct和Topic交换机的区别:

  • Direct是单个RoutingKey进行匹配
  • Topic可以进行多个RoutingKey匹配  

(六).消息转换器 

在SpringAMQP中,我们默认接受的消息类型是Object。这也就意味着,SpringAMQP会默认将消息序列化为字节后发送。但是,当发送的内容是一个对象的时候,转换过去的是默认的字节。

我们可以引用其他的消息转换器来替代默认使用的JDK的转换器:

1.引入依赖 

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.core</groupId>
  3. <artifactId>jackson-databind</artifactId>
  4. </dependency>

2.在costumer端注入消息转换器

  1. @Bean
  2. public MessageConverter messageConverter(){
  3. return new Jackson2JsonMessageConverter();
  4. }

3. 清空消息后再查看

 变成JSON格式的了。

注:发送方和接收方必须使用同样的消息转换器

四.RabbitMQ高级特性

(一).消息的可靠性投递

在使用RabbitMQ的时候,消息的发送方自然不希望出现消息遗失遗漏的问题。RabbitMQ的官方为我们提供了两种方式来确保消息的可靠性投递:

  • confirm确认模式
  • return退回模式

 RabbitMQ的整个消息流程

两种回调函数的执行位置 

  •  当消息从publisher成功传到exchange时,则会返回一个confirmCallBack回调函数
  • 当消息从exchange失败传到queue时,则会返回一个returnCallBack回调函数

 1.开启confirm

(1).在publisher处开启confirm确认机制

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. username: admin
  5. password: admin
  6. virtual-host: /
  7. //none代表不开启,默认为none
  8. //correlated代表开启消息确认机制,触发confirm回调函数
  9. //simple和correlated差不多
  10. //但是使用rabbitmqTemplate.waitForConfirms()会等待等待broker节点返回发送结果
  11. publisher-confirm-type: correlated

(2)改造publisher代码,设置confirmCallBack回调函数:

  1. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  2. @Override
  3. public void confirm(CorrelationData correlationData, boolean b, String s) {
  4. System.out.println("confirm执行");
  5. }
  6. });
  7. String queue = "demo.direct";
  8. rabbitTemplate.convertAndSend(queue,"admin","hello");
  9. //必须加上线程睡眠,否则无法测试到回调函数的执行
  10. Thread.sleep(200);

(3)成功发送

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/639829
推荐阅读
相关标签
  

闽ICP备14008679号