赞
踩
目录
1.为什么要引入MQ/RabbitMQ(中间件),直接读写数据库不行吗?
8、如何确保消息正确地发送至RabbitMQ?如何确保消息接收方消费了消息?(类似7)
1、在分布式系统下中间件具备异步处理,流量削峰等一系列高级功能;
2、中间件可以实现生产者和消费者之间的解耦。
3、拥有持久化的机制,进程消息,队列中的信息也可以保存下来。
4、对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定量的限流,利于数据库的操作。
5、可以使用消息队列达到异步下单的效果,后台进行逻辑下单。
RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的消息中间件,核心思想是生产者不会将消息直接发送给队列,消息在发给客户端时会先发给交换机,然后再由交换机发送给对应的队列。
1、解耦
系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
2、异步
将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
3、削峰
并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
1、一致性问题
A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的问题。
2、系统的可用性降低
系统引入的外部依赖越多,系统越容易挂掉,本来只是A系统调用BCD三个系统接口就好,ABCD四个系统不报错整个系统会正常运行。引入了MQ之后,虽然ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。
3、系统的复杂性提高
引入了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序?
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue:按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个oroker!里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,.每个channelf代表一个会话任务。
直连交换机(Direct Exchange):根据消息携带的路由键将消息投递给对应队列,它是完全匹配、单播的模式。
扇型交换机(Fanout Exchange):这个交换机没有路由键概念,就算你绑了路由键也是无视的。这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
主题交换机(Topic Exchange):发布订阅模式,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。*(星号)用来表示一个单词(必须出现的),#(井号)用来表示任意数量(零个或多个)单词
消息提供方->路由->一至多个队列
消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。
通过队列路由键,可以把队列绑定到交换器上。
消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);
常用的交换器主要分为一下三种
1.fanout:如果交换器收到消息,将会广播到所有绑定的队列上
2.direct:如果路由键完全匹配,消息就被投递到相应的队列
3.topic:可以使来自不同源头的消息能够到达同一个队列。 使用topic交换器时,可以使用通配符
若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能。
1.引入spring-boot-starter--amqp
2.application.yml配置
3.测试RabbitMQ
保证消息不丢失,可靠抵达,可以使用事务消息,但性能下降250倍,为此引入RabbitMQ的消息确认机制
消息只要被broker接收到就会执行confirmCallback,如果是cluster模式,需要所有broker接收到才会调用confirmCallback,表示message已经到达服务器。
用到return退回模式,保证消息一定要投递到目标queue里,如果未能投递到目标queue里将调用returnCallback,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。
消费者获取到消息,成功处理,可以回复Ack给Broker。
1.将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。
2.一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。
3.如果 RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(notacknowledged,未确认)消息。发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性;
1、在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;
2、在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。
这个问题针对业务场景来答分以下几点:
1、 拿到这个消息做数据库的insert操作。然后给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
2、 拿到这个消息做Redis的set的操作,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
3、 如果上面两种情况还不行。准备一个第三方介质,来做消费记录。以Redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入Redis。那消费者开始消费前,先去Redis中查询有没消费记录即可。
死信的概念 :queue 中的某些消息无法被消费
在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。
- 插件源码地址:
- https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
-
- 插件下载地址:
- https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
-
- 安装:
-
- 进入插件安装目录
- {rabbitmq-server}/plugins/(可以查看一下当前已存在的插件)
- 下载插件
- rabbitmq_delayed_message_exchange
-
- wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
-
- 1
-
- (如果下载的文件名称不规则就手动重命名一下如:
- rabbitmq_delayed_message_exchange-0.0.1.ez)
- 启用插件
-
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-
- (关闭插件)
- rabbitmq-plugins disable rabbitmq_delayed_message_exchange
-
-
- 插件使用
-
- 通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性
- x-delayed-message是插件提供的类型,并不是rabbitmq本身的
-
- // ... elided code ...
- Map<String, Object> args = new HashMap<String, Object>();
- args.put("x-delayed-type", "direct");
- channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
- // ... more code ...
-
-
- 发送消息的时候通过在header添加"x-delay"参数来控制消息的延时时间
-
- // ... elided code ...
- byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put("x-delay", 5000);
- AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
- channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
- // ... more code ...
-
-
- 使用示例:
-
- 消息发送端:
-
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
-
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class Send {
- // 队列名称
- private final static String EXCHANGE_NAME="delay_exchange";
- private final static String ROUTING_KEY="key_delay";
-
- @SuppressWarnings("deprecation")
- public static void main(String[] argv) throws Exception {
- /**
- * 创建连接连接到MabbitMQ
- */
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.12.190");
- factory.setUsername("admin");
- factory.setPassword("admin");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-
- // 声明x-delayed-type类型的exchange
- Map<String, Object> args = new HashMap<String, Object>();
- args.put("x-delayed-type", "direct");
- channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true,
- false, args);
-
-
- Map<String, Object> headers = new HashMap<String, Object>();
- //设置在2016/11/04,16:45:12向消费端推送本条消息
- Date now = new Date();
- Date timeToPublish = new Date("2016/11/04,16:45:12");
-
- String readyToPushContent = "publish at " + sf.format(now)
- + " \t deliver at " + sf.format(timeToPublish);
-
- headers.put("x-delay", timeToPublish.getTime() - now.getTime());
-
- AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()
- .headers(headers);
- channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props.build(),
- readyToPushContent.getBytes());
-
- // 关闭频道和连接
- channel.close();
- connection.close();
- }
- }
-
-
- 消息接收端:
-
- import java.text.SimpleDateFormat;
- import java.util.Date;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
-
- public class Recv {
-
- // 队列名称
- private final static String QUEUE_NAME = "delay_queue";
- private final static String EXCHANGE_NAME="delay_exchange";
-
- public static void main(String[] argv) throws Exception,
- java.lang.InterruptedException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.12.190");
- factory.setUsername("admin");
- factory.setPassword("admin");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
-
- channel.queueDeclare(QUEUE_NAME, true,false,false,null);
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
- channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
- SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- try {
- System.out.println("****************WAIT***************");
- while(true){
- QueueingConsumer.Delivery delivery = queueingConsumer
- .nextDelivery(); //
-
- String message = (new String(delivery.getBody()));
- System.out.println("message:"+message);
- System.out.println("now:\t"+sf.format(new Date()));
- }
-
- } catch (Exception exception) {
- exception.printStackTrace();
-
- }
-
- }
- }
-
-
- 启动接收端,启动发送端
- 运行结果:
-
- ****************WAIT***************
- message:publish at 2016-11-04 16:44:16.887 deliver at 2016-11-04 16:45:12.000
- now: 2016-11-04 16:45:12.023
-
-
- 结果显示在我们2016-11-04 16:45:12.023接收到了消息,距离我们设定的时间2016-11-04 16:45:12.023有23毫秒的延迟
-
- Note:使用rabbitmq-delayed-message-exchange插件时发送到队列的消息数量在web管理界面可能不可见,不影响正常功能使用
-
- Note :使用过程中发现,当一台启用了rabbitmq-delayed-message-exchange插件的RAM节点在重启的时候会无法启动,查看日志发现了一个Timeout异常,开发者解释说这是节点在启动过程会同步集群相关数据造成启动超时,并建议不要使用Ram节点
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。