赞
踩
理解如下问题:
1、消息队列为什么会出现?
2、消息队列能用来做什么?
3、使用消息队列存在的问题?
4、如何解决重复消费消息的问题?
5、如何解决消息的顺序消费问题?
6、如何解决分布式事务问题?
7、如何解决消息堆积问题?
我们可以把消息队列看作是一个 存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。(注意:消费消息是按照顺序来消费的。)
消息队列是分布式系统中重要的组件之一。使用消息队列主要是为了通过 异步处理 提高系统性能和削峰、降低系统耦合性 。
① 通过异步处理提高系统性能(减少响应所需时间)。
② 削峰/限流。
③ 降低系统耦合性。
① 系统可用性降低: 系统可用性在某种程度上降低,为什么这样说呢?在加入MQ之前,你不用考虑消息丢失或者说MQ挂掉等等的情况,但是,引入MQ之后你就需要去考虑了!(需要考虑消息是否丢失或挂掉)
② 系统复杂性提高: 加入MQ之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!(需要保证消息不被重复消费、丢失、传递的顺序前后一致等问题)
③ 一致性问题: 我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!(数据保存到MQ中,消息可能未被消费者正确消费,导致数据不一致)
JMS(Java Message Service,java消息服务)是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。JMS(Java Message Service,Java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
ActiveMQ就是基于JMS规范实现的。
①点对点(P2P)模型
使用 队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送100条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
②发布/订阅(Pub/Sub)模型
发布订阅模型(Pub/Sub)使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
①StreamMessage — Java原始值的数据流
②MapMessage — 一套名称-值对
③TextMessage — 一个字符串对象
④ObjectMessage — 一个序列化的Java对象
⑤BytesMessage — 一个字节的数据流
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
RabbitMQ就是基于AMQP协议实现的。
总结:
①AMQP为消息定义了 应用层(wire-level protocol)的协议 ,而JMS所定义的是 API规范 。在Java体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而AMQP天然具有跨平台、跨语言特性。
②JMS 支持TextMessage、MapMessage等复杂的消息类型 ;而AMQP仅 支持byte[]消息类型(复杂的类型可序列化后发送)。
③由于Exchange提供的路由算法,AMQP可以 提供多样化的路由方式 来传递消息到消息队列,而JMS 仅支持 队列 和 主题/订阅 方式 两种。
总结:
①ActiveMQ的 性能比较差 ,版本迭代很慢,不推荐使用。
②RabbitMQ在吞吐量方面虽然稍逊于Kafka和RocketMQ,但是由于它基于 erlang开发,所以 并发能力很强,性能极其好,延时很低,达到微秒级 。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ一定是你的首选。如果是大数据领域的 实时计算、日志采集等场景,用Kafka 。
③RocketMQ阿里出品,Java系开源项目,但接口这块不是按照标准JMS规范走的,有些系统要迁移需要修改大量代码。
④Kafka仅仅提供较少的核心功能,却可以实现 超高的吞吐量,ms级的延迟 ,极高的可用性以及可靠性,而且 分布式 可以任意扩展。同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量。kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性,天然适合大数据实时计算以及日志收集。
生产者: 消息生产者,就是 投递消息的一方 。消息一般包含两个部分:消息体(payload)和标签(Label)。
消费者: 消费消息,也就是 接收消息的一方 。消费者连接到 RabbitMQ 服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
Broker:可以看做RabbitMQ的 服务节点 。
Queue:RabbitMQ的内部对象,用于 存储消息 。多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
Exchange:交换器,生产者将消息发送到交换器,由交换器 将消息路由到一个或者多个队列中 。当路由不到时,或返回给生产者或直接丢弃。
DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列 。
导致死信的几种原因:
①消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
②消息TTL过期。
③队列满了,无法再添加。
延迟队列 指的是存储对应的延迟消息,消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。(发送到队列后,消费者不能立马消费,得等待一定时间才能消费,该队列即为延迟队列)
RabbitMQ 自身没有延迟队列的,那么如何实现延迟队列?
①通过RabbitMQ本身队列的特性来实现,需要使用RabbitMQ的 死信交换机(Exchange)和消息的存活时间TTL(Time To Live) 。
②在RabbitMQ3.5.7及以上的版本提供了一个 插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能 。同时,插件依赖Erlang/OPT 18.0及以上。
也就是说,AMQP协议以及RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过TTL和DLX模拟出延迟队列的功能。
RabbitMQ自V3.5.0有优先级队列实现, 优先级高的队列会先被消费 。
可以通过x-max-priority参数来实现优先级队列。不过,当消费速度大于生产速度且Broker没有堆积的情况下,优先级显得没有意义。
①简单模式;②work工作模式;③pub/sub发布订阅模式;④Routing路由模式;⑤Topic主题模式。
由于TCP链接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈,所以RabbitMQ使用 信道的方式 来传输数据。信道(Channel)是生产者、消费者与RabbitMQ通信的渠道,信道是建立在TCP链接上的虚拟链接,且每条TCP链接上的信道数量没有限制。就是说RabbitMQ在一条TCP链接上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个信道在RabbitMQ都有唯一的ID,保证了信道私有性,每个信道对应一个线程使用 。
消息可能丢失的情况: ①消息到MQ的过程中搞丢;②MQ自己搞丢;③MQ到消费过程中搞丢。
①生产者到RabbitMQ:事务机制或Confirm机制,注意:事务机制和Confirm机制是互斥的,两者不能共存,会导致RabbitMQ报错。
②RabbitMQ自身:持久化、集群、普通模式、镜像模式 。
③RabbitMQ到消费者:basicAck机制、死信队列、消息补偿机制 。
①拆分多个queue(消息队列),每个queue(消息队列)一个consumer(消费者),就是多一些queue(消息队列)而已,确实是麻烦点;(一个消费者一个队列)
②一个queue(消息队列)但是对应一个consumer(消费者),然后这个 consumer(消费者)内部用内存队列做排队,然后分发给底层不同的worker来处理。(就一个队列一个消费者,内部分任务)
RabbitMQ是比较有代表性的,因为是 基于主从(非分布式) 做高可用性的,我们就以RabbitMQ为例子讲解第一种MQ的高可用性怎么实现。RabbitMQ 有 三种模式:单机模式、普通集群模式、镜像集群模式 。
临时紧急扩容 。这种做法相当于是 临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的consumer机器来消费消息。
RabbtiMQ是可以设置过期时间的,也就是TTL。如果 消息在queue中积压超过一定的时间就会被RabbitMQ给清理掉 ,这个数据就没了。那怎么将有用的数据找回呢?批量重导,写程序查出丢失程序,再写回mq中。
RocketMQ 是一个 队列模型 的消息中间件,具有 高性能、高可靠、高实时、分布式 的特点。它是一个采用Java语言开发的分布式的消息系统,由阿里巴巴团队开发。
发布者(Publisher):消息的生产者
订阅者(Subscriber):消息的消费者
主题(Topic):存放消息的容器
Broker(消息存储服务器)
注意:
①一个主题包含多个队列,分布在不同的Broker(消息存储服务器)上。
②一个消费者集群共同消费一个topic(主题)内的多个队列。
③一个队列只能被一个消费者消费。
消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。
默认情况下就是集群消费,这种模式下 一个消费者组共同消费一个主题的多个队列,一个队列只会被一个消费者消费 ,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
而 广播消费消息会发给消费者组中的每一个消费者进行消费 。
处理消息重复问题,主要有业务端自己保证,主要的方式有两种:业务幂等和消息去重 。
业务幂等: 是保证消费逻辑的幂等性,也就是多次调用和一次调用的效果是一样的。这样一来,不管消息消费多少次,对业务都没有影响。
消息去重: 是业务端,对重复的消息就不再消费了。这种方法,需要保证每条消息都有一个唯一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性。具体做法是可以建立一个消费记录表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。
NameServer因为是无状态,且不相互通信的,所以只要 集群部署 就可以保证高可用。
RocketMQ的高可用主要是在体现在Broker的读和写的高可用,Broker的高可用是通过 集群 和 主从 实现的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。