当前位置:   article > 正文

RabbitMQ(消息中间件技术)_rabbittemplate.convertandsend

rabbittemplate.convertandsend

消息中间件解决了什么问题?

        1. 同步变异步

        2. 解耦

        3. 流量削峰(类似大坝)

https://www.csdn.net/tags/NtTacg0sMTAyNjMtYmxvZwO0O0OO0O0O.htmlhttps://www.csdn.net/tags/NtTacg0sMTAyNjMtYmxvZwO0O0OO0O0O.html

如:支付服务流程,支付服务流程本身不需要关心订单服务、仓储服务、短信服务是否完成,只需关心用户是否支付,但因为整个流程的后面三项服务需要支付服务提供的数据,所以没有消息中间件以前需要支付完成之后,再接着同步调用后续流程,耗时长。有了消息中间件之后,后续流程就无需同步执行了,只需让支付服务发送消息到中间件,后续服务订阅中间件消息即可,同时也解决了级联失败问题,后续流程失败,不影响支付服务的返回。下图解:

使用消息中间件前:

同步调用:支付服务——>订单服务——>仓储服务——>短信服务——>完成,总时长500ms

使用消息中间件后:

异步调用:支付服务——>Broker——>完成,总时长60ms,之后的订单服务、仓储服务、短信服务只需要等待订阅的消息中间件通知消费即可。

常用的消息中间件

在这里插入图片描述

 

ActiveMQ现在已经很少使用,社区不太活跃,放弃(但是使用很简单- -)。

RabbitMQ并发能力强、消息延时低、高可用、管理界面丰富,并且最重要的是:社区非常活跃,出现BUG都能及时解决。

Kafka和RocketMQ的特点都是高吞吐量,但是kafka消息可靠性比较一般,而且消息不保证有序性。RocketMQ弥补了Kafka的缺点,不过是阿里开源,社区不太活跃,文档也不够丰富。

安装RabbitMQ

# 通过Docker安装RabbitMQ

docker run \
    -e RABBITMQ_DEFAULT_USER=root \  # 修改默认用户名
    -e RABBITMQ_DEFAULT_PASS=root \ # 修改默认密码
    --name mq1 \
    --hostname my-rabbit \ # 集区部署需要
    -p 15672:15672 \  # 图形化界面端口号(通过该端口访问网页,查看数据情况)
    -p 5672:5672 \  # 订阅消息端口号
    -d \
    rabbitmq:latest

注意!注意!注意!最新版RabbitMQ默认是禁用web界面管理插件的,需要手动开启插件,开启方式如下:

rabbitmq启用和禁用web界面管理插件 - jacek - 博客园rabbitmq默认安装启动以后,是没有开启web管理界面的,通过rabbitmq-plugins list命令可列出插件的启用和禁用状态。 使用rabbitmq-plugins enable xxxhttps://www.cnblogs.com/liyuchuan/p/11148805.html

步骤一:进入RabbitMQ容器内部

        docker exec -it mq1 /bin/bash

步骤二:开启web界面管理插件

        rabbitmq-plugins enable rabbitmq_management:开启web界面管理插件

PS:rabbitmq-plugins list:命令可列出插件的启用和禁用状态

RabbitMQ结构

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

常见消息模型

-- 消息的六种模型(使用方式)

RabbitMQ快速入门(详细)_kavito的博客-CSDN博客_rabbitmq在介绍RabbitMQ之前,我们先来看下面一个电商项目的场景:商品的原始数据保存在数据库中,增删改查都在数据库中完成。搜索服务数据来源是索引库(Elasticsearch),如果数据库商品发生变化,索引库数据不能及时更新。商品详情做了页面静态化处理,静态页面数据也不会随着数据库商品更新而变化。如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的...https://blog.csdn.net/kavito/article/details/91403659?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165108319216781667853028%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=165108319216781667853028&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_positive~default-1-91403659.142%5Ev9%5Epc_search_result_cache,157%5Ev4%5Enew_style&utm_term=rabbitmq&spm=1018.2226.3001.4187

RabbitMQ Tutorials — RabbitMQhttps://www.rabbitmq.com/getstarted.html

SpringAMQP

什么是AMQP?

AMQP:一种高级消息队列协议。AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

SpringAMQP整合RabbitMQ使用

1)基本消息模型

生产者(消息提供者)

1. 导入依赖

  1. <!-- AMQP依赖,包含RabbitMQ -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2. 配置文件

  1. spring:
  2. application:
  3. name: spirng-boot-rabbitmq
  4. rabbitmq:
  5. host: 192.168.0.86 # RabbitMQ所在主机地址
  6. port: 5672 # 端口
  7. virtual-host: / # 虚拟机地址,对应root用户
  8. username: guest # 用户名
  9. password: guest # 密码,一定不要是数字,容易报错Socket ***

3. 注入RabbitTemplate实例

  1. @Autowired
  2. RabbitTemplate rabbitTemplate;

4. 发送消息,调用RabbitTemplate.convertAndSend方法

  1. @Autowired
  2. RabbitTemplate rabbitTemplate;
  3. public void testSendMessageWork(){
  4. String queue = "simple.queue"; //队列名
  5. String message = "Hello RabbitMQ!!!"; //发送的消息
  6. rabbitTemplate.convertAndSend(queue,message); //解析并发送消息
  7. }

ps:发送消息需要通过通道发送,使用SpringAMQP后无需关心通道创建,Spring会帮助创建通道,只需关心队列即可。

消费者(消息消费者)

 1. 导入依赖

  1. <!-- AMQP依赖,包含RabbitMQ -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2. 配置文件

  1. spring:
  2. application:
  3. name: spirng-boot-rabbitmq
  4. rabbitmq:
  5. host: 192.168.0.86 # RabbitMQ所在主机地址
  6. port: 5672 # 端口
  7. virtual-host: / # 虚拟主机
  8. username: guest # 用户名
  9. password: guest # 密码,一定不要是数字,容易报错Socket ***

3. 方法通过@RabbitListener注解监听队列,方法参数就是消息

  1. /**
  2. * @RabbitListener(queues = "simple.queue")
  3. * queues:监听的队列
  4. * @param message:监听到的消息
  5. */
  6. @RabbitListener(queues = "simple.queue")
  7. public void getMessage(String message){
  8. System.out.println("监听消息中间件消息为:" + message);
  9. }

 2)Work消息模型

【多个消费者绑定一个队列,多消费者抢夺队列消息】

work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。

能者多劳:RabbitMQ的消息预取机制导致两个消费者拿到的消息是一样的,不管有多少条消息,都会平均分摊,没有去考虑消费者性能分发消息。所以为了解决该问题,需要在配置文件中配置以下配置:

  1. spring:
  2. application:
  3. name: spirng-boot-rabbitmq
  4. rabbitmq:
  5. host: 192.168.0.86 # RabbitMQ所在主机地址
  6. port: 5672 # 端口
  7. virtual-host: / # 虚拟主机
  8. username: root # 用户名
  9. password: 0722 # 密码
  10. listener:
  11. simple:
  12. prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
  13. #prefetch:修改默认RabbitMQ消息预取机制

该模式相较于基础版区别:提高队列消息处理速度,避免消息堆积。

3)发布订阅消息模型之广播模式(Fanout)

exchange:交换机;

作用:转发消息到队列【将原本的一个消息(根据类型)分发给多个队列】

发布订阅模式中生产者只需要知道交换机是谁即可。

Fanout交换机类型(广播)

特点:Fanout类型交换机会将交换机接受到的消息分发给每一个跟其绑定的队列中去。

Fanout Exchange使用

消费者

1. 前面与基本消息模型一样,后续编码有所变化

2. 创建配置类,配置队列绑定交换机。

Queue(队列)、FanoutExchange(广播交换机)、Binding(队列绑定交换机)

  1. package cn.itcast.mq.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @Autor 白春恒
  10. *
  11. * FanoutExchange:广播类型交换机消息模型
  12. * 该类型会将交换机接受到的消息分发给每一个跟其绑定的队列中去。
  13. *
  14. * PS:这种声明交换机与队列方式比较繁琐,简洁方式看路由模式代码
  15. */
  16. @Configuration
  17. public class FanoutConfiguration {
  18. /**
  19. * 声明交换机:fanout1
  20. * @return 注入到SpringIOC容器,交给Spring进行管理
  21. */
  22. @Bean
  23. public FanoutExchange fanoutExchange(){
  24. return new FanoutExchange("fanout1");
  25. }
  26. /**
  27. * 声明队列:fanout.queue1
  28. * @return
  29. */
  30. @Bean
  31. public Queue queue1(){
  32. return new Queue("fanout.queue1");
  33. }
  34. /**
  35. * 绑定queue1到fanoutExchange交换机
  36. * @param queue1 队列1
  37. * @param fanoutExchange 广播类型交换机
  38. * @return
  39. */
  40. @Bean
  41. public Binding bindingQueue1(Queue queue1,FanoutExchange fanoutExchange){
  42. return BindingBuilder.bind(queue1).to(fanoutExchange);
  43. }
  44. /**
  45. * 声明队列:fanout.queue2
  46. * @return
  47. */
  48. @Bean
  49. public Queue queue2(){
  50. return new Queue("fanout.queue2");
  51. }
  52. /**
  53. * 绑定queue2到fanoutExchange交换机
  54. * @param queue2 队列2
  55. * @param fanoutExchange 广播类型交换机
  56. * @return
  57. */
  58. @Bean
  59. public Binding bindingQueue2(Queue queue2,FanoutExchange fanoutExchange){
  60. return BindingBuilder.bind(queue2).to(fanoutExchange);
  61. }
  62. }

3. 监听消息

  1. @RabbitListener(queues = "fanout.queue1")
  2. public void listenFanoutQueue1(String message) {
  3. System.out.println("监听fanout.queue1得到消息为:" + message);
  4. }
  5. @RabbitListener(queues = "fanout.queue2")
  6. public void listenFanoutQueue2(String message) {
  7. System.out.println("监听fanout.queue2得到消息为:" + message);
  8. }

生产者

1. 前面与基本消息模型一样,后续编码有所变化

2. 发送消息到交换机(FanoutExchange)

  1. @Autowired
  2. RabbitTemplate rabbitTemplate;
  3. /**
  4. * FanoutExchange方式发送消息
  5. * 先将消息返送到FanoutExchange,再通过交换机将“消息”路由到与其绑定的队列
  6. */
  7. @Test
  8. public void testSendMessageToFanout() {
  9. String exchange = "fanout1"; //Fanout交换机
  10. String message = "Hello every one"; //发送的消息
  11. rabbitTemplate.convertAndSend(exchange,"",message);
  12. }

4)发布订阅模型之路由模式(DirectExchange)

路由模式与广播模式区别:

        广播模式:会将Exchange交换机的消息分发给所有与之绑定的Queue队列;

        路由模式:Exchange与Queue之间绑定多了BindingKey,Exchange分发消息时会带上RoutingKey,Queue的BindingKey与RoutingKey保持一致才会收到消息。

DirectExhcange路由模式使用

消费者

1. 前面与基本消息模型一样,后续编码有所变化

2. 声明队列、交换机与RoutingKey,并监听队列

  1. /**
  2. * @RabbitListener注解参数:
  3. * bindings:绑定队列与交换机
  4. * BindingKey:队列与交换机之间的绑定值,交换机通过该值分发消息到指定队列
  5. * @param message 监听到的消息
  6. */
  7. @RabbitListener(bindings = @QueueBinding(
  8. value = @Queue(name = "direct.queue1"),//声明队列
  9. exchange = @Exchange(name = "direct1",type = ExchangeTypes.DIRECT),//声明交换机
  10. key = {"blue","yellow"}//BindingKey
  11. ))
  12. public void listenDirectQueue1(String message){
  13. System.out.println("路由模式DirectQueue1监听到的消息为:" + message);
  14. }

生产者

1. 前面与基本消息模型一样,后续编码有所变化

2. 发送消息到DirectExchange(路由模式)

  1. @Autowired
  2. RabbitTemplate rabbitTemplate;
  3. /**
  4. * DirectExchange方式发送消息
  5. * 先将消息返送到FanoutExchange,再通过交换机和RoutingKey两个值确认将“消息”分发到与其绑定的队列
  6. *
  7. * RoutingKey:对应队列绑定的BindingKey,两值匹配则会发消息到匹配的队列中。
  8. */
  9. @Test
  10. public void testSendMessageToDirect() {
  11. String exchange = "direct1"; //Fanout交换机
  12. String message = "Hello yellow"; //发送的消息
  13. //RoutingKey有值表示发送到指定RoutingKey绑定的队列
  14. rabbitTemplate.convertAndSend(exchange,"yellow",message);
  15. }

5)发布订阅模型之通配符模式(TopicExchange)

通配符模式与路由模式区别

        DirectExchange:路由模式只能监听到指定的RoutingKey

        TopicExchange:通配符模式可以监听到被通配到的所有RoutingKey,简化了监听多个RoutingKey的繁琐

TopicExchange通配符模式使用

消费者

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "topic1.queue1"),
  3. exchange = @Exchange(name = "topic1",type = ExchangeTypes.TOPIC),
  4. key = {"china.#"}
  5. ))
  6. public void listenTopicQueue1(String message){
  7. System.out.println("通配符模式TopicQueue1监听到的消息为:" + message);
  8. }
  9. @RabbitListener(bindings = @QueueBinding(
  10. value = @Queue(name = "topic1.queue2"),
  11. exchange = @Exchange(name = "topic1",type = ExchangeTypes.TOPIC),
  12. key = {"#.news"}
  13. ))
  14. public void listenTopicQueue2(String message){
  15. System.out.println("通配符模式TopicQueue2监听到的消息为:" + message);
  16. }

生产者

  1. @Autowired
  2. RabbitTemplate rabbitTemplate;
  3. @Test
  4. public void testSendMessageToTopic() {
  5. String exchange = "topic1"; //Fanout交换机
  6. String message = "新闻:恭喜发财,身体健康"; //发送的消息
  7. rabbitTemplate.convertAndSend(exchange,"china.news",message);
  8. }

消息转换(消息序列化方式)

如何变成Jackson方式序列化数据?

提供者

1. 添加jackson依赖

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.core</groupId>
  3. <artifactId>jackson-databind</artifactId>
  4. </dependency>

2. 注入Jackson2JsonMessageConverter到Spring容器中

  1. @Bean
  2. public MessageConverter messageConverter(){
  3. return new Jackson2JsonMessageConverter();
  4. }

3. 发送消息即可 

  1. @Test
  2. public void sendMessageToObjectQueue(){
  3. Map<String,Object> map = new HashMap<String,Object>();
  4. map.put("name","高圆圆");
  5. map.put("age",18);
  6. rabbitTemplate.convertAndSend("object.queue",map);
  7. }

消费者(同提供者)

1. 添加jackson依赖

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.core</groupId>
  3. <artifactId>jackson-databind</artifactId>
  4. </dependency>

2. 注入Jackson2JsonMessageConverter到Spring容器中

  1. @Bean
  2. public MessageConverter messageConverter(){
  3. return new Jackson2JsonMessageConverter();
  4. }

3. 接受消息即可

  1. @RabbitListener(queues = "object.queue")
  2. public void getObjectQueueMessage(Map<String,Object> map){
  3. System.out.println("object.queue队列接收到的消息为:" + map);
  4. }

报错问题

PS:

1. RabbitMQ的web管理界面进入交换机报错500解决

解决RabbitMQ Management API returned status code 500 问题_ZNineSun的博客-CSDN博客_rabbitmq报500我们在使用rabbitMq控制台进行操作的时候会发现提示以下错误:解决办法也很简单,我是用docker配置的,解决方案如下:进入容器:docker exec -it rabbitmq的镜像id /bin/bashcd到目录/etc/rabbitmq/conf.d/,然后执行以下命令:echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conhttps://blog.csdn.net/zhiyikeji/article/details/124178809

2. RabbitMQ用户名密码建议设置为字符串,不要全是数字,否则报错连接不上

3. 最新版RabbitMQ默认是禁用web界面管理插件的,需要手动开启插件

rabbitmq启用和禁用web界面管理插件 - jacek - 博客园rabbitmq默认安装启动以后,是没有开启web管理界面的,通过rabbitmq-plugins list命令可列出插件的启用和禁用状态。 使用rabbitmq-plugins enable xxxhttps://www.cnblogs.com/liyuchuan/p/11148805.html

步骤一:进入RabbitMQ容器内部

        docker exec -it mq1 /bin/bash

步骤二:开启web界面管理插件

        rabbitmq-plugins enable rabbitmq_management:开启web界面管理插件

PS:rabbitmq-plugins list:命令可列出插件的启用和禁用状态

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/399240?site
推荐阅读
相关标签
  

闽ICP备14008679号