赞
踩
视频:
优势:时效性强,等待结果后才返回
劣势:拓展性差,性能下降,级联失败问题
异步调用就是基于消息通知的方式,一般含有三个角色
(1)消息发送者:投递消息的人,原来的调用方
(2)消息代理:管理、暂存、转发消息,可以理解微信服务器
(3)消息接受者:接收和处理消息的人,原来服务提供方
Broker是消息代理
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:RabbitMQ: One broker to queue them all | RabbitMQhttps://www.rabbitmq.com/
队列可以查看接收到的消息
将Virtual host切换为/
(1)新建一个用户
(2)为用户创建virtual host
(3)测试不同virtual host直接数据隔离现象,通过修改virtual host即可
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- spring:
- rabbitmq:
- virtual-host: /hamll
- port: 5672
- host: 192.168.92.136
- username: hmall
- password: 123
- package cn.itcast.mq.helloworld;
-
- import org.junit.jupiter.api.Test;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- @SpringBootTest
- public class SpringAmqpTest {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void testSendMessageQueue(){
- String queueName = "simple.queue";
- String msg = "Hello,amqp";
- rabbitTemplate.convertAndSend(queueName,msg);
- }
-
- }
- package cn.itcast.mq.listeners;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Slf4j
- @Component
- public class MqListener {
-
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueue(String msg){
- System.out.println("消费者收到消息:"+msg);
- }
- }
- @Test
- void testSendFanout(){
- String exchangeName = "hmall.fanout";
- String msg = "Hello,everyone!";
- rabbitTemplate.convertAndSend(exchangeName,null,msg);
- }
- @RabbitListener(queues = "fanout.queue1")
- public void listenFanoutQueue(String msg) throws InterruptedException {
- System.out.println("消费者1收到消息:"+msg);
- }
- @RabbitListener(queues = "fanout.queue2")
- public void listenFanout2Queue(String msg) throws InterruptedException {
- System.err.println("消费者2收到消息:....."+msg);
- }
注意:Direct交换机绑定队列时配置Routing Key
如下图所示:
绑定queue1要配置blue和red的Routing Key,而绑定queue2要配置yellow和red的Routing Key
- @Test
- void testSendDirect(){
- String exchangeName = "hmall.direct";
- String msg = "Hello,every Direct!";
- rabbitTemplate.convertAndSend(exchangeName,"blue",msg);
- }
- @RabbitListener(queues = "direct.queue1")
- public void listenDirectQueue(String msg) throws InterruptedException {
- System.out.println("消费者1收到消息:"+msg);
- }
- @RabbitListener(queues = "direct.queue2")
- public void listenDirect2Queue(String msg) throws InterruptedException {
- System.err.println("消费者2收到消息:....."+msg);
- }
注意:Topic交换机绑定队列时配置Routing Key
- @Test
- void testSendTopic(){
- String exchangeName = "hmall.topic";
- String msg = "Hello,every Topic!";
- rabbitTemplate.convertAndSend(exchangeName,"china.hello",msg);
- }
- @RabbitListener(queues = "topic.queue1")
- public void listenTopicQueue(String msg){
- System.out.println("消费者1收到消息:"+msg);
- }
-
- @RabbitListener(queues = "topic.queue2")
- public void listenTopicQueue2(String msg){
- System.err.println("消费者2收到消息:....."+msg);
- }
两种创建交换机、队列、和绑定队列的方式
- package cn.itcast.mq.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class FanoutConfig {
-
- @Bean
- public FanoutExchange fanoutExchange(){
- // ExchangeBuilder.fanoutExchange("hmall.fanout").build();
- return new FanoutExchange("hmall.fanout1");
- }
-
- @Bean
- public Queue fanoutQueue3(){
- // QueueBuilder.durable("fanout.queue1").build();
- return new Queue("fanout.queue3");
- }
-
- @Bean
- public Binding fanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
- }
-
-
- @Bean
- public Queue fanoutQueue4(){
- return new Queue("fanout.queue4");
- }
-
- @Bean
- public Binding fanoutBinding4(){
- return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
- }
- }
示例代码:
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1",durable = "true"),
- exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
- key = {"red","blue"}
- ))
- public void listenDirectQueue(String msg) throws InterruptedException {
- System.out.println("消费者1收到消息:"+msg);
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = "direct.queue2",durable = "true"),
- exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
- key = {"red","yellow"}
- ))
- public void listenDirect2Queue(String msg) throws InterruptedException {
- System.err.println("消费者2收到消息:....."+msg);
- }
(8)消息转换器
- @Test
- void testSendObject(){
- Map<String, Object> msg = new HashMap<>(2);
- msg.put("name","Jack");
- msg.put("age",21);
- rabbitTemplate.convertAndSend("object.queue",msg);
- }
- <!-- Jackson-->
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- </dependency>
- @Bean
- public MessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。