赞
踩
我们使用的像Feign调用这样的消费者和提供者之间通讯的方法叫做同步通讯。
同步通讯就是消费者发送请求后,要等着提供者返回数据。
但是有的时候,当我们提供者所有存在的示例全都宕机了的话,我们的消费者也会卡住。 当然,一个请求卡住了可能过会儿他就去发下一个请求了,但是,如果下一个请求也这样卡住了呢?用户的请求时间就会被无限拉长。
同步通讯的请求方式
优点:
缺点:
为了解决在业务量太大的情况下同步通讯出现的服务卡顿的情况,就产生了异步通讯这个概念。
异步通讯的请求方式
优点
缺点
MQ(Message Queue),消息队列,是用来实现异步通讯的中间件:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
公司 | Rabbit | Apache | 阿里(Apache) | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP STOMP | OpenWire,STOMP REST,XMPP AMQP | 自定义 | 自定义 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
本章所有的MQ都以RabbitMQ进行。
因为在CenterOs直接装的话还要装Erlang的环境,就懒的部署Erlang了。这边直接使用docker对RabbitMQ进行一个安装。
docker pull rabbitmq:latest
docker run \
-e RABBITMQ_DEFAULT_USER=username \ #后台系统登陆的账号
-e RABBITMQ_DAFAULT_PASS=password \ #后台系统登陆的密码
--name mq1 #搭建MQ集群时用来区分MQ的名字
-p15672:15672 \ #管理平台的端口
-p5672:5672 \ #使用MQ的端口
-d \
rabbitmq:latest
docker ps #查看运行着的镜像docker exec -it 镜像ID /bin/bash #进入rabbitmq的镜像内部cd /etc/rabbitmq/conf.d/ #进入目录
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf #开启channelrabbitmq-plugins enable rabbitmq_management #开启管理平台docker restart 镜像ID #重启
至此,rabbitmq的安装完成
基本消息队列:
工作消息队列:
广播(发布订阅):
路由:
主题:
在rabbitmq的官网可以下载helloworld的示例:
这里基于示例进行改造演示:
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- @Test
- public void testSendMessage() throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
- factory.setHost("192.168.110.23");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setUsername("admin");
- factory.setPassword("admin);
- // 1.2.建立连接
- Connection connection = factory.newConnection();
- // 2.创建通道Channel
- Channel channel = connection.createChannel();
- // 3.创建队列
- String queueName = "simple.queue";
- channel.queueDeclare(queueName, false, false, false, null);
- // 4.发送消息
- String message = "hello, rabbitmq!";
- channel.basicPublish("", queueName, null, message.getBytes());
- System.out.println("发送消息成功:【" + message + "】");
- // 5.关闭通道和连接
- channel.close();
- connection.close();
- }
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
- factory.setHost("192.168.110.23");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setUsername("admin");
- factory.setPassword("admin);
- // 1.2.建立连接
- Connection connection = factory.newConnection();
- // 2.创建通道Channel
- Channel channel = connection.createChannel();
- // 3.创建队列
- String queueName = "simple.queue";
- channel.queueDeclare(queueName, false, false, false, null);
- // 4.订阅消息
- channel.basicConsume(queueName, true, new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 5.处理消息
- String message = new String(body);
- System.out.println("接收到消息:【" + message + "】");
- }
- });
- System.out.println("等待接收消息。。。。");
- }
可以在rabbitmq的管理后台看到创建的队列
点进去也可以看到发送的消息
启动后就可以接收到消息
AMQP(Advanced Message Queuing Protocol)指的是在应用程序或之间传递业务消息的开发标准。该协议与语言和平台无关。
SpringAMQP是基于AMQP协议来制定的一套API规范,提供了模板来发送和接收消息。其中SpringAMQP是抽象,Spring-rabbit是底层的默认实现。
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- spring:
- rabbitmq:
- addresses: 127.0.0.1 #rabbitMQ的地址
- port: 5672 #rabbitMQ提供的客户端连接端口(非管理后台端口)
- username: admin #登陆账号
- password: admin #登陆密码
- virtual-host: / #虚拟主机路径
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class PublisherTest {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void AMQPTestDemo() {
- String queue = "simple.queue";
- String msg = "Hello World";
- //指定消息队列名以及消息内容发送消息
- rabbitTemplate.convertAndSend(queue,msg);
- }
- }
可以看到这里消息已经成功发送到MQ的消息队列里面了
- spring:
- rabbitmq:
- addresses: 127.0.0.1 #rabbitMQ的地址
- port: 5672 #rabbitMQ提供的客户端连接端口(非管理后台端口)
- username: admin #登陆账号
- password: admin #登陆密码
- virtual-host: / #虚拟主机路径
- @Component
- public class SpringRabbitMQListener {
-
- //指定监听的消息队列名
- @RabbitListener(queues = "simple.queue")
- void listenerSimpleQueueMessage(String msg){
- //打印消息
- System.out.println("Message为: " + msg);
- }
- }
注:消息一旦被消费就会消失
工作队列模型图
工作队列与普通的消息队列有所不同的是:
这里为了方便看到测试结果,新建一个方法用来充当消费者2。我设置了一个线程延迟时间,不设置的话其中一个会把所有消息给抢完。
- @Test
- public void AMQPTestDemo() throws InterruptedException {
- String queue = "simple.queue";
- String msg = "66666";
- //这里为了看到他更加明显的一个运行状况把发送次数提高到50次
- for (int i = 0; i < 50; i++) {
- rabbitTemplate.convertAndSend(queue,msg + i);
- Thread.sleep(20);
- }
- }
- @Component
- //消费者1
- public class SpringRabbitMQListener {
- @RabbitListener(queues = "simple.queue")
- void listenerSimpleQueueMessage(String msg) throws InterruptedException {
- System.out.println("Message为: " + msg);
- Thread.sleep(20);
-
- }
- //消费者2
- @RabbitListener(queues = "simple.queue")
- void listenerSimpleQueueMessage2(String msg) throws InterruptedException {
- System.err.println("Message2为: " + msg);
- Thread.sleep(200);
- }
- }
可以看到message和message2两个消费者消费的消息是不一样的,说明他们之间是争夺关系。
可以在消费端的application设置prefetch,意味着要把多少条消息消费完才能进入下一条:
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: admin
- password: admin
- virtual-host: /
- listener:
- simple:
- //要把多少条消息消费完
- prefetch: 1
可以发现Message2变得更加规律了些(为什么还是这么少,因为延迟了200ms)
PubSub(Publisher Subscribe)意为发布订阅模型。它与前面不同的是可以发布消息到多个消费者。常见的PubSub模型有:
Fanout会将收到的消息转发到每一个与其绑定的Queue上。
(1).声明Exchange,Queue和Binding:
-
- @Configuration
- public class FanoutConfig {
- //声明交换机
- @Bean
- public FanoutExchange fanoutExchange(){
- return new FanoutExchange("demo.fanout");
- }
- //声明队列1
- @Bean
- public Queue queue1(){
- return new Queue("fanout.queue1");
- }
- //声明队列2
- @Bean
- public Queue queue2(){
- return new Queue("fanout.queue2");
- }
- //声明队列1与交换机的绑定关系
- @Bean
- public Binding binding1(Queue queue1,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(queue1).to(fanoutExchange);
- }
- //声明队列2与交换机的绑定关系
- @Bean
- public Binding binding2(Queue queue2,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(queue2).to(fanoutExchange);
- }
- }
(2).在Publisher中写测试类:
- @Test
- public void AMQPTestDemo() throws InterruptedException {
- String exchange = "demo.fanout";
- String msg = "66666";
- for (int i = 0; i < 50; i++) {
- //在Fanout模式下不用指定RouteKey
- rabbitTemplate.convertAndSend(exchange,"",msg + i);
- Thread.sleep(20);
- }
- }
(3).查看消息:
交换机作用:
交换机,队列,绑定对应的Bean为:
(1).使用注解的形式来声明Exchange,Queue,Binding的关系
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1"),
- exchange = @Exchange(name = "demo.direct",type = ExchangeTypes.DIRECT),
- key = {"admin","user"} //指定key为admin或者user时才触发
- ))
- void listenerDirectQueueMessage1(String msg) throws InterruptedException {
- System.err.println("MessageDirect1为: " + msg);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2"),
- exchange = @Exchange(name = "demo.direct",type = ExchangeTypes.DIRECT),
- key = {"admin"} //指定key为admin时触发
- ))
- void listenerDirectQueueMessage2(String msg) throws InterruptedException {
- System.err.println("MessageDirect2为: " + msg);
- }
(2).publisher处声明测试类发送消息
- @Test
- public void AMQPTestDemo() throws InterruptedException {
- String exchange = "demo.direct";
- String msg = "66666";
- //此处RoutingKey指定为user
- rabbitTemplate.convertAndSend(exchange,"user",msg);
- }
(3).只有MessageDirect1收到消息:
Direct与Fanout的区别:
TopicExchange和DirectExchange有点类似,区别不同的是TopicExchange的RoutingKey必须是多个单词的列表,且用.进行分割。且Queue和Exchange指定RoutingKey时可以使用通配符*和#
(1).使用注解的形式来声明Exchange,Queue,Binding的关系
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue1"),
- exchange = @Exchange(name = "demo.topic",type = ExchangeTypes.TOPIC),
- key = {"admin.#"}
- ))
- void listenerTopicQueueMessage1(String msg) throws InterruptedException {
- System.err.println("MessageTopic1为: " + msg);
- }
-
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue2"),
- exchange = @Exchange(name = "demo.topic",type = ExchangeTypes.TOPIC),
- key = {"#.user"}
- ))
- void listenerTopicQueueMessage2(String msg) throws InterruptedException {
- System.err.println("MessageTopic2为: " + msg);
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue3"),
- exchange = @Exchange(name = "demo.topic",type = ExchangeTypes.TOPIC),
- key = {"#.book"}
- ))
- void listenerTopicQueueMessage3(String msg) throws InterruptedException {
- System.err.println("MessageTopic3为: " + msg);
- }
(2).publisher处声明测试类发送消息
- @Test
- public void AMQPTestDemo() throws InterruptedException {
- String exchange = "demo.topic";
- String msg = "66666";
- rabbitTemplate.convertAndSend(exchange,"admin.book",msg);
- }
(3).Message1和Message3收到了消息
Direct和Topic交换机的区别:
在SpringAMQP中,我们默认接受的消息类型是Object。这也就意味着,SpringAMQP会默认将消息序列化为字节后发送。但是,当发送的内容是一个对象的时候,转换过去的是默认的字节。
我们可以引用其他的消息转换器来替代默认使用的JDK的转换器:
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- @Bean
- public MessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
变成JSON格式的了。
注:发送方和接收方必须使用同样的消息转换器
在使用RabbitMQ的时候,消息的发送方自然不希望出现消息遗失遗漏的问题。RabbitMQ的官方为我们提供了两种方式来确保消息的可靠性投递:
RabbitMQ的整个消息流程
两种回调函数的执行位置
(1).在publisher处开启confirm确认机制
- spring:
- rabbitmq:
- host: 127.0.0.1
- username: admin
- password: admin
- virtual-host: /
- //none代表不开启,默认为none
- //correlated代表开启消息确认机制,触发confirm回调函数
- //simple和correlated差不多
- //但是使用rabbitmqTemplate.waitForConfirms()会等待等待broker节点返回发送结果
- publisher-confirm-type: correlated
(2)改造publisher代码,设置confirmCallBack回调函数:
-
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean b, String s) {
- System.out.println("confirm执行");
- }
- });
- String queue = "demo.direct";
-
- rabbitTemplate.convertAndSend(queue,"admin","hello");
- //必须加上线程睡眠,否则无法测试到回调函数的执行
- Thread.sleep(200);
(3)成功发送
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。