赞
踩
同步调用的优点在于时效性高,可以立即得到结果
微服务之间基于Feign的调用属于同步方式,存在一些问题
异步调用常见的实现就是 事件驱动模式
异步通信的缺点:
MQ(MessageQueue),中文含义为消息队列,用来存放消息,也就是事件驱动模式中的 Broker
常见的MQ技术包含一下四种:
RabbitMQ基于Erlang语言开发的开源消息中间件。
RabbitMQ官方地址:Messaging that just works — RabbitMQ
在DockerHub上拉取RabbitMQ的镜像,然后运行。
#1.拉取RabbitMQ
docker pull rabbitmq:3
#2.运行RabbitMQ镜像,(15672 为管理UI界面的端口,5672为后期通信接口)
docker run \
-e RABBITMQ_DEFAULT_USER=shawn \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name rabbitmq3 \
--hostname myrabbit \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3
安装完成之后不能直接进行访问,需要先进入容器内部开启插件才可以访问
#1.进入 rabbitmq 容器内 docker exec -it myrabbitmq bash #2.开启插件 rabbitmq-plugins enable rabbitmq_management #3.如果直接拉取 management 版本的镜像,则无需以上步骤 docker pull rabbitmq:3-management #4.运行 management 版本的rabbit docker run \ -e RABBITMQ_DEFAULT_USER=shawn \ -e RABBITMQ_DEFAULT_PASS=123456 \ --name rabbitmq3 \ --hostname myrabbit \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
完成以后即可在浏览器中输入服务器地址+端口号访问了。
踩坑点:Rabbit的Web UI 中Channel模块无法打开并且提示 Stats in management UI are disabled on this node
是因为默认情况下Rabbit是禁止的。
The reason is that the default image disables metrics collector in the management_agent plugin
原因是默认图像禁用management_agent插件中的度量收集器
# 1.进入容器内部
docker exec -it myrabbitmq bash
# 2.切换至配置文件目录下
cd /etc/rabbitmq/conf.d/
# 3.将 management_agent.disable_metrics_collector.conf 文件中的 management_agent.disable_metrics_collector 的值修改为 false
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
# 4.重启 rabbit 容器
docker restart rabbitmq3
RabbitMQ通过 VirtualHost 进行隔离,相互不可见。
RabbitMQ中的相关概念
创建两个项目分别为发送者和订阅者,引入RabbitMQ的maven坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
或者在初始化项目时直接勾选RabbitMQ
消息发送者示例代码:publisher
@Test void publisher() throws IOException, TimeoutException { ConnectionFactory factory=new ConnectionFactory(); factory.setHost("192.168.119.101"); factory.setUsername("shawn"); factory.setPassword("123456"); factory.setPort(5672); factory.setVirtualHost("/"); //建立连接 Connection connection=factory.newConnection(); //创建通道 Channel channel=connection.createChannel(); //创建队列 String queueName="simple.queue"; String message="贾君鹏,你妈喊你回家吃饭!"; channel.basicPublish("",queueName,null,message.getBytes()); System.out.println("发送消息:"+message); channel.close(); connection.close(); }
消息接受者示例代码:consumer
@Test void consumer() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.119.101"); factory.setUsername("shawn"); factory.setPassword("123456"); factory.setPort(5672); factory.setVirtualHost("/"); //创建连接和通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); //订阅消息 channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 处理消息 String message=new String(body,"UTF-8"); System.out.println("接受消息:"+message); } }); System.out.println("等待接受消息,退出请按 CTRL+C"); }
官方文档:https://spring.io/projects/spring-amqp
AMQP是消息接受与发送的协议或者标准,与语言和平台无关
RabbitTemplate是一个Spring封装的用来发送消息的工具类,类似 RedisCache,RestTemplate,可以简单,高效,优雅的实现消息的发送和接收
引入 spring-boot-starter-amqp 依赖,然后在相关项目(publisher 和 consumer )的配置文件中配置相关参数。
spring:
application:
name: publisher
rabbitmq:
host: 192.168.119.101
# 默认端口为5672,使用默认端口时可不写
port: 5672
username: shawn
password: 123456
virtual-host: /
在 publisher 项目中编写测试代码,发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void publisher() {
String queueName="hello.world";
String message="Hello,RabbitTemplate";
# 这里使用 convertAndSend 进行消息的处理和发送
rabbitTemplate.convertAndSend(queueName,message);
System.out.println("消息发送完成");
}
在 consumer 项目中编写代码,订阅并且接受消息
/**
* 使用 Component 将当前类申明为一个 bean
* 定义一个类,使用 RabbitListener 注解订阅要接受消息的队列
*/
@Component
public class SimpleMessageRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleMessage(String message) {
System.out.println("接受消息:" + message);
}
}
注意:消息一旦消费,就会从消息队列中删除,RabbitMQ没有消息回溯
工作队列模型,多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。
工作队列,可以提高消息处理速度,避免消息堆积
@RabbitListener(queues = "simple.queue")
public void listenWorkMessage1(String message) throws InterruptedException {
System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkMessage2(String message) throws InterruptedException {
System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(200);
}
定义两个执行效率不同的消费者,模拟一个简单的工作队列模型,发现消费者并不会因为执行效率的高低自动的增加或处理消息的处理量,而是平均分配。
可以通过消息的欲取机制来控制消息的处理量
欲取机制,即默认情况下,有多少消息就拿多少消息,并不会考虑有多少个消费者。
通过调整 prefetch 属性的值,来控制消费者的消息预取能力
spring:
application:
name: consumer
rabbitmq:
host: 192.168.119.101
port: 5672
username: shawn
password: 123456
virtual-host: /
listener:
simple:
# 通过调整 prefetch 属性的值,来控制消费者的消息预取能力
prefetch: 1
发布订阅模式(Publish-Subscribe)的核心是,允许将一个消息发给多个消费者。具体的实现方式是加入了exchange(交换机)。
注意:exchange只负责消息的转发,而不是存储。路由失败则消息丢失
交换机的作用:
需要使用到的 Bean:FanoutExchange Queue Binding
Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue
在 Consumer 项目中定义队列(queue),交换机(exchange),并且将队列绑定到交换机上。
/** 示例: 声明一个交换机和两个队列,并且将队列与交换机进行绑定 */ @Configuration public class FanoutExchangeConfig { /** *声明一个FanoutExchange对象,并且添加到bean */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("shawn.exchange"); } /** *声明一个Queue对象,并且添加到bean */ @Bean public Queue fanoutQueue1() { return new Queue("shawn.queue1"); } /** * 将队列和Exchange进行绑定 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { // 使用 BindingBuilder 提供的方法进行绑定,最后返回Binding对象 return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Queue fanoutQueue2() { return new Queue("shawn.queue2"); } @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
接受消息的监听器示例代码:
@Component
public class SimpleMessageRabbitListener {
@RabbitListener(queues = "shawn.queue1")
public void listenWorkMessage1(String message) throws InterruptedException {
System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "shawn.queue2")
public void listenWorkMessage2(String message) throws InterruptedException {
System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(200);
}
}
在 Publisher 项目中编写消息发送的测试代码:
@Test
void testSendExchange() throws InterruptedException {
String exChangeName = "shawn.exchange";
String message = "贾君鹏,你妈喊你回家吃饭";
// 向指定名称的Exchage(交换机)发送消息
rabbitTemplate.convertAndSend(exChangeName, "", message);
}
测试结果:两个队列均能收到消息
DirectExchange会将接受到的消息根据规则路由到指定的Queue,因此成为路由模式(routes)。
当多个队列使用一个BindingKey时,DirectExchange会将消息发送给所有使用了这个BindingKey的队列
这种情况下,DirectExchang与FanoutExchange相同,也属于广播模式
因此可以认为 DirectExchange 可以模拟 FanoutExchang, 且比 FanoutExchange 灵活
示例:基于RabbitListener 实现 DirectExchange
编写 Consumer 项目的代码:
/** * 使用 RabbitListener 声明要绑定的队列、BindingKey、交换机和交换机类型,BindingKey可以设置多个 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "shawn.queue1"), exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenWorkMessage1(String message) throws InterruptedException { System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now()); Thread.sleep(20); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "shawn.queue2"), exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenWorkMessage2(String message) throws InterruptedException { System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now()); Thread.sleep(200); }
在 Publuisher 项目中编写测试代码:
@Test
void testSendDirectExchange() throws InterruptedException {
String exChangeName = "direct.exchange";
String message = "贾君鹏,你妈喊你回家吃饭";
String routingKey="blue";
rabbitTemplate.convertAndSend(exChangeName,routingKey, message);
}
此时,只有包含指定的 RoutingKey 的队列,才能收到消息。
TopicExchange 与 DirectExchange 的区别在于,routingKey必须是多个单词的列表,并且使用英文句号( . )分割
Queue 与 Exchange 进行绑定时支持通配符:
示例:使用 TopicExchange 实现消息发送和接受
在 Consumer 项目中编写示例代码
@Component public class TopicExchangeListener { /** * listenWorkMessage1 接收和 china.# 有关的消息 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "shawn.queue1"), exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenWorkMessage1(String message) throws InterruptedException { System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now()); Thread.sleep(20); } /** * listenWorkMessage2 接收和 #.news 有关的消息 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "shawn.queue2"), exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenWorkMessage2(String message) throws InterruptedException { System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now()); Thread.sleep(200); } }
在 Publisher 项目中编写测试方法
@Test
void testSendTopicExchange() throws InterruptedException {
String exChangeName = "topic.exchange";
String message = "贾君鹏,你妈喊你回家吃饭";
// 此处的 routingKey 需要按照TopicExchange的规则编写
String routingKey = "china.food";
rabbitTemplate.convertAndSend(exChangeName, routingKey, message);
}
定义的交换机和队列信息,均可以在 RabbitMQ Web UI 中看到
在SpringAMQP中,接受消息的类型时Object,也就是说,我们可以发送任何类型的对象给消费者,SpringAMQP会帮助我们进行序列化成字节后发送。
Spring中对消息对象的处理是由 SpringAMQP中的一个名为MessageConvert来处理的。默认实现是SimpleMessageConvert,基于JDK的ObjectOutputStream来完成。
通过重新定义一个 MessageConverter 的 Bean 来修改序列化方式。
推荐使用 JSON 方式序列化,消息体将会更加短小精悍,传输速度更快。
引入 jackson-dataformat-xml 依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.14.1</version>
</dependency>
定义 Bean
@Configuration
public class MessageConverterConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
编写一个传输对象的简单队列测试示例
@Test
void testSendObjectMessage() {
String queueName = "object.queue";
SystemLog log = new SystemLog();
log.setAddress("陕西省西安市");
log.setAge(29);
log.setName("shawn");
log.setId(1L);
log.setPassword("123456");
rabbitTemplate.convertAndSend("", queueName, log);
}
此时消费者接受到的消息类型将会是Json格式,将Json信息转换对应的对象就可以拿到对象消息了。
注意:发送消息和接收消息时注意使用相同的 MessageConverter。可直接将消息转换为发送时的对象(自定义类型需手动转换)
完结撒花。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。