赞
踩
同步:A向B发送消息,A在接受到B的反馈之前都停止活动,接受到B的反馈后进行下一步操作
异步:A向B发送消息,A不用等B的反馈,在给B发送完成消息之后就可以进行下一步操作
同步调用的缺点:
耦合度高:增加新功能都要重新修改代码 性能下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和 资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源 级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障
异步调用的优点:解耦合(事件驱动模式)、性能提升(不用等待响应)、 没有级联、流量消峰(应对高并发)
1.拉取镜像 docker pull rabbitmq:3-management 3:是rabbitmq的版本 management:是指RabbitMQ 的 management 镜像,management 镜像提供了许多管理工具和功能,如消息队列管理、用户管理、权限管理等,这些功能在普 通的 Docker 镜像中可能没有或者不完整。
2.执行容器镜像
docker run -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=123456 --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-mannagement
-e RABBITMQ_DEFAULT_USER=user:运行的用户名为user -e RABBITMQ_DEFAULT_PASS=123456:运行的密码为123456 --name mq:运行的容器名称 --hostname mq1:容器的主机名 -p 15672:15672:控制台的端口 -p 5672:5672:消息通信的端口 -d:后台运行 rabbitmq:3-management:镜像
publisher:生产者 exchange:交换机 queue:消息队列 consumer:消费者
virustualhost:虚拟主机(各个虚拟主机互不干扰) channel:操作MQ的工具
基本消息队列(BasicQueue)
工作消息队列(WorkQueue)
发布订阅(Publish、Subscribe):Fanout Exchange:广播
Direct Exchange:路由
Topic Exchange:主题
1.基本消息队列(BasicQueue)
导入依赖和配置
- <!--依赖放在pom文件中-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
-
-
-
- //yml配置文件,配置主机,端口,用户名,密码,虚拟主机
- spring:
- rabbitmq:
- host: 192.168.35.150 #自己运行rabbitmq的ip地址
- port: 5672
- virtual-host: /
- username: user #之前运行rabbitmq镜像的用户名
- password: 123456 #之前运行rabbitmq镜像的密码
- listener:
- simple:
- prefetch: 1 #有多个消费者时,消费者会默认预取消息,设置取消息的上限为1个
发送消息
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class PublisherTest {
-
- @Autowired
- private RabbitTemplate template;
-
- @Test
- public void sendSimpleQueue(){
- String message="hello RabbitAMQP"; //消息
- String queueName="simple.queue"; //队列
- template.convertAndSend(message,queueName); //发送消息
- }
- }
接受消息
- @Component
- public class RabbitMQListener {
-
- @RabbitListener(queues = "simple.queue") //RabbitListener监听
- public void listenerMessage(String msg){ //用msg来接受监听到的消息
- System.out.println("接受到消息:"+msg);
- }
- }
2.工作消息队列(WorkQueue)
多个消费者,提高消息处理速度,避免消息堆积
发送消息
- @Test
- public void sendWorkQueue() throws InterruptedException {
- String message="hello RabbitAMQP";
- String queueName="simple.queue";
- for(int i=0;i<50;i++){
- template.convertAndSend(queueName,message+i);
- Thread.sleep(20); //休眠让消费者有时间处理消息
- }
-
- }
接受消息
- //消费者1
- @RabbitListener(queues = "simple.queue")
- public void listenerWork1(String msg) throws InterruptedException {
- System.out.println("接受到消息1:"+msg+" "+ LocalDateTime.now());
- Thread.sleep(20); //休眠时间代表消息的处理能力 1是2的10倍
- }
-
- //消费者2
- @RabbitListener(queues = "simple.queue")
- public void listenerWork2(String msg) throws InterruptedException {
- System.err.println("接受到消息2:"+msg+" "+LocalDateTime.now());
- Thread.sleep(200);
- }
3.发布订阅(Publish、Subscribe)
fanout(广播):默认自动发送到多个消费者
Direct (路由):根据key,如图中的(orange,black,green)路由到对应的消费者
Topic (主题):根据key (key必须是多个单词,并以.号区分),可以用通配符来代替
发送消息
- //发送消息
- @Test
- public void sendFanoutExchange(){ //fanout
- String exchangeName="fanout";
- String message="hello fanoutExchange";
- template.convertAndSend(exchangeName,"",message);
- }
- @Test
- public void sendDirectExchange(){ //direct
- String exchangeName="direct";
- String message="hello blue";
- template.convertAndSend(exchangeName,"blue",message);
- }
- @Test
- public void sendTopicExchange(){ //topic
- String exchangeName="topic";
- String message="hello topic";
- template.convertAndSend(exchangeName,"china.news",message);
- }
接受消息
- //接受消息1
- @RabbitListener(bindings=@QueueBinding(
- value = @Queue(name = "direct.queue1"),
- exchange = @Exchange(name = "direct",type = "direct"),
- key = {"red","blue"} //这个为direct,fanout没有key
- //key={"china.#"} //这个是topic
- ))
- public void listenerDirectExchange1(String message){
- System.err.println("接受到消息direct1:"+message+" "+LocalDateTime.now());
- //Thread.sleep(20) //这个fanout休眠对于每个消息的处理
- }
-
- //接受消息2
- @RabbitListener(bindings=@QueueBinding(
- value = @Queue(name = "direct.queue2"),
- exchange = @Exchange(name = "direct",type = "direct"),
- key = {"yellow"} //这个为direct,fanout没有key
- //key={"#.news"} //这个是topic
- ))
- public void listenerDirectExchange2(String message){
- System.err.println("接受到消息direct2:"+message+" "+LocalDateTime.now());
- //Thread.sleep(20) //这个fanout休眠对于每个消息的处理
- }
在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送
发送消息
- @Test
- public void testSendMap() throws InterruptedException {
- // 准备消息
- Map<String,Object> msg = new HashMap<>();
- msg.put("name", "张三");
- msg.put("age", 30);
- // 发送消息
- rabbitTemplate.convertAndSend("simple.queue", msg);
- }
接受消息
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
- System.out.println("spring 消费者接收到消息:" + msg);
- }
因为默认发送的数据是jdk序列化之后的,难以阅读。所以要进行反序列化。
导入依赖
- <!--jackson依赖-->
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- <version>2.13.3</version>
- </dependency>
在config配置类中配置bean
- @Bean
- public MessageConverter jsonMessageConverter(){
- return new Jackson2JsonMessageConverter();
- }
重启服务
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。