当前位置:   article > 正文

ElasticSearch-Kafka-RabbitMQ_kafka 自写程序读取、解析、写入更新elasticsearch索引库

kafka 自写程序读取、解析、写入更新elasticsearch索引库

目录

ElasticSearch

什么是Elasticsearch

Elasticsearch 的基本概念

倒排索引和正排索引

text和keyword类型的区别

query和filter 的区别?

es写数据的过程(ES的写入流程:)

(ES的更新和删除流程)写数据的底层原理:

ES在高并发下如何保证读写一致性?

Elasticsearch的分布式原理(ES如何选举Master节点)

Elasticsearch是如何避免脑裂现象:

ES的深度分页与滚动搜索scroll

Kafka

什么是Kafka?Kafka中有哪几个组件?

Kafka为什么那么快?

Kafka系统工具有哪些类型?

Kafka的message格式是什么?

Kafka的优点有那些?

为什么要使用Kafka?为什么要使用消息队列?

Kafka存在那些局限性?

Kafka为什么不支持读写分离?

Kafka如何实现延迟队列?

Kafka如何保证消息可靠性

Kafka中是怎么体现消息顺序性的?

创建topic时如何选择合适的分区数?

消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

有哪些情形会造成重复消费?

哪些情景下会造成消息丢失

RabbitMQ

MQ如何保证消息幂等性?

消息中间件的应用场景

什么是消息队列?什么是RabbitMQ?

为什么要使用消息队列? 消息队列的优点

消息队列的缺点,消息队列带来的问题

消息过期时间,队列过期时间

死信队列(DLX),变成死信的情况

RabbitMQ延时队列,优先级队列

RabbitMQ的持久化

消息的有序性?

惰性队列

RabbitMQ 组件

RabbitMQ工作模型

RabbitMQ消费模式包括哪些?

多个消费者监听一个队列,消息如何分发?

RabbitMQ如何保证消息的可靠性?

如何避免消息重复投递或重复消费?

RabbitMQ消息的状态

死信完成延迟队列

RabbitMQ集群

RabbitMQ如何保证消息的顺序性?

消息积压在消息队列中会导致什么结果?产生的原因是?如何解决?

Kafka和RabbitMQ的对比

RabbitMQ的优缺点,Kafka的优缺点


ElasticSearch

什么是Elasticsearch

Elasticsearch 是基于 Lucene 的 Restful 的分布式实时全文搜索引擎, 每个字段都被索引并可被搜索,可以快速存储、搜索、分析海量的数据。

        Elasticsearch是一个开源的分布式搜索和分析引擎,最初由Elasticsearch BV开发。它被设计用于快速、灵活地搜索大量的数据,也可以用于分析数据和可视化数据。 Elasticsearch是一个基于Lucene库的分布式搜索引擎,它提供了一个简单易用的RESTful API,可以快速地进行全文搜索、结构化搜索、地理空间搜索、过滤搜索和分析,同时还支持实时搜索、数据聚合和分析。

        Elasticsearch可以存储和搜索各种类型的数据,包括文本、数字、日期、地理位置等等。它具有可扩展性、高可用性、可靠性和安全性。

        Elasticsearch的使用范围非常广泛,可以在全文搜索、商业智能、日志分析、安全分析、知识图谱、推荐系统、电子商务等领域得到广泛应用。

Elasticsearch 的基本概念

(1)index 索引:索引类似于mysql 中的数据库,Elasticesearch 中的索引是存在数据的地方,包含了一堆有相似结构的文档数据。

(2)type 类型:类型是用来定义数据结构,可以认为是 mysql 中的一张表,type 是 index 中的一个逻辑数据分类

(3)document 文档:类似于 MySQL 中的一行,不同之处在于 ES 中的每个文档可以有不同的字段,但是对于通用字段应该具有相同的数据类型,文档是es中的最小数据单元,可以认为一个文档就是一条记录。

(4)Field 字段:Field是Elasticsearch的最小单位,一个document里面有多个field

(5)shard 分片:单台机器无法存储大量数据,es可以将一个索引中的数据切分为多个shard,分布在多台服务器上存储。有了shard就可以横向扩展,存储更多数据,让搜索和分析等操作分布到多台服务器上去执行,提升吞吐量和性能。

(6)replica 副本:任何一个服务器随时可能故障或宕机,此时 shard 可能会丢失,因此可以为每个 shard 创建多个 replica 副本。replica可以在shard故障时提供备用服务,保证数据不丢失,多个replica还可以提升搜索操作的吞吐量和性能。primary shard(建立索引时一次设置,不能修改,默认5个),replica shard(随时修改数量,默认1个),默认每个索引10个 shard,5个primary shard,5个replica shard,最小的高可用配置,是2台服务器。

        以下是Elasticsearch的一些基本概念:

        节点(node):Elasticsearch集群中的一个服务器称为节点。节点可以存储数据和执行指令。

        集群(cluster):一个或多个节点组成的逻辑集合称为集群。集群用于管理整个数据库,跨节点协调、管理数据。

        索引(index):类似于数据库中的表,索引是一个包含文档的逻辑空间。每个索引都包含一组文档,它们具有一组公共的特征,如相同的数据类型或目的。

        文档(document):文档是存储在索引中的最小数据单元。它使用JSON格式表示。文档可以包含各种类型的数据,如字符串、数字、日期、嵌套结构等。

        类型(type):Elasicsearch在7.0版本后已经不再支持类型,之前的版本中称为类型的概念现在被称为mapping 。

        映射(mapping):mapping定义了文档的结构和字段的数据类型。它告诉Elasticsearch如何处理文档中的数据。

        分片(shard):索引可以分成多个分片,每个分片都是一个独立的Lucene索引,处理数据时可以在多个分片中并行执行,以提高查询性能。

        复制(replica):每个分片都可以有零个或多个复制分片,复制分片是分片的副本,用于提高集群的可用性和容错性。

        查询(query):Elasticsearch提供了丰富的查询DSL (domain-specific language)语言,可以根据各种条件和参数进行高级查询。

        聚合(aggregation):聚合是一种数据分析方法,用于计算和处理文档中的数据,Elasticsearch提供了多种聚合功能,如统计、分组、排序等。

        以上是Elasticsearch的基本概念,这些概念在使用和管理Elasticsearch集群时都会涉及。

倒排索引和正排索引

正向索引是最传统的,根据id索引的方式。 但根据词条查询时,必须先逐条获取每个文档, 然后判断文档中是否包含所需要的词条,是根据文档找词条的过程。

  • 而倒排索引则相反,是先找到用户要搜索的词条, 根据词条得到保护词条的文档的id,然后根据id获取文档。是根据词条找文档的过程。

正向索引

  • 优点:

    • 可以给多个字段创建索引

    • 根据索引字段搜索、排序速度非常快

  • 缺点:

    • 根据非索引字段,或者索引字段中的部分词条查找时,只能全表扫描。

倒排索引

  • 优点:

    • 根据词条搜索、模糊搜索时,速度非常快

  • 缺点:

    • 只能给词条创建索引,而不是字段

    • 无法根据字段做排序

        在全文检索中,倒排索引 (Inverted Index) 和正排索引 (Forward Index) 是两个重要的概念。它们用于在文本数据中进行快速和准确的搜索和匹配。

        1.正排索引 (Forward Index)

        正排索引是一种按文档编号或者文档的生成时间等确定文档排序的索引结构。在正排索引中,文档中的每个单词都与文档的编号相关联。因此,如果您要按照文档编号查找文档内容,则必须使用正排索引来执行此操作。

        2.倒排索引 (Inverted Index)

        倒排索引是一种关键词到文档的映射,其中每个单词都与包含该单词的所有文档ID列表相关联。因此,如果要查找包含特定单词或词组的文档,则可以使用倒排索引来执行此操作。

        在倒排索引中,通过倒排的方式记录每个单词在哪些文档出现过,即此单词可能出现在哪些文档中;而在正排索引中,记录的是每个文档的信息,也就是每个文档包含了哪些单词。倒排索引直接把相同单词的文档引用在一起,方便快速建立检索引用。

        总的来说,正排索引记录文档与文档之间的关系,而倒排索引记录词语与文档之间的关系。在实际的搜索引擎中,一般会采用倒排索引来实现搜索引擎的基本功能,以提高搜索的效率和精度。

text和keyword类型的区别

        两个的区别主要分词的区别:keyword 类型是不会分词的,

        直接根据字符串内容建立倒排索引,keyword类型的字段只能通过精确值搜索到;

        Text 类型在存入 Elasticsearch 的时候,会先分词,然后根据分词后的内容建立倒排索引

        在Elasticsearch中,text和keyword是两种最常用的字段类型。它们在搜索和索引及存储方式上有明显的差异。

        text类型

        text类型用于存储长文本内容,通常是被分词的、可以被搜索到的数据。当一个文档被存储时,多个text字段的值会被合并到一个被称为全文字段的大字段中。全文字段中包含所有被索引的文本内容,同时采用了倒排索引,方便进行全文检索。

        text类型适用于全文搜索场景,例如对文本进行模糊匹配、关键字搜索、短语搜索等。由于text类型的值和被存储在索引中的文档的内容有所不同,因此需要进行分析和建立倒排索引,这会占用一定的存储空间和索引构建时间。

        keyword类型

        keyword类型用于存储短文本内容,例如ID、状态、颜色等。在keyword类型中,每个文档中的字段仅包含一个关键字特定的值,无需分析和建立倒排索引,从而占用更少的存储空间。

        keyword类型适用于过滤、排序或聚合等场景。单值字段,如标识符、状态或类别,通常使用keyword类型存储。

        需要注意的是,text类型和keyword类型是不可互换的。如果将一个text类型的字段用于过滤、排序或聚合等操作,会导致查询性能下降或产生错误的结果。同样的,将一个keyword类型的字段用于全文搜索,也会出现问题。

        因此,在创建和设计Elasticsearch索引时,需要根据实际需求和场景选择适合的字段类型,以充分利用Elasticsearch的功能和性能优势。

query和filter 的区别?

(1)query:查询操作不仅仅会进行查询,还会计算分值,用于确定相关度;

(2)filter:查询操作仅判断是否满足查询条件,不会计算任何分值, 也不会关心返回的排序问题,同时,filter 查询的结果可以被缓存,提高性能。

        在Elasticsearch中,query和filter都用于在搜索请求中定义查询条件,但它们在查询数据时的方式和用途有所不同。

        Query查询

        Query查询是用于评分搜索的查询,评分搜索指的是根据相关度对文档进行排序的搜索,以便将最匹配搜索条件的文档排在最前面。查询语句被用于匹配文档的部分或全部内容,它根据匹配的相关度计算每个文档的得分,然后将每个文档按得分从高到低排列。常见的查询类型有match、matchphrase、multimatch、fuzzy等。 Query查询是在查询中指定的,具有查询语句和相应的评分,通过计算文档与搜索语句的相关度来确定文档在结果集中的排序。

        Filter过滤

        Filter过滤是一种用于筛选文档的高效查询。和查询不同的是,过滤条件不会影响文档得分,它只会根据查询条件来筛选文档,因此可以更快地返回结果。常用的筛选器有term、terms、range、exists、missing等。

        Filter过滤器只对查询结果进行过滤,它不参与文档得分的计算。与Query查询不同的是,Filter过滤器不需要计算相关度得分,因此在查询速度上要快于Query查询。

        需要注意的是,query查询和filter过滤在用途上是不同的,它们可以组合在一起形成较为复杂的查询语句。如果想要实现纯粹只筛选文档而不做评分计算,应该使用filter语句,以提高查询的性能。如果需要对每个文档进行评分并根据评分排序,应该使用query语句。正确地使用query和filter将显著提高Elasticsearch的查询性能和准确性。

es写数据的过程(ES的写入流程:)

(1)客户端选择一个 node 发送请求过去,这个 node 就是 coordinating node (协调节点)

(2)coordinating node 对 document 进行路由,将请求转发给对应的 node(有 primary shard)

(3)实际的 node 上的 primary shard 处理请求,然后将数据同步到 replica node

(4)coordinating node 等到 primary node 和所有 replica node 都执行成功之后,就返回响应结果给客户端。

        Elasticsearch的写入流程经常是和索引切分和数据分发相关的。

        下面是ES写数据的大概流程:

        ●客户端向Elasticsearch发送一个写入请求,请求包含了需要写入的文档等信息。

        ●Elasticsearch集群接收到写入请求,将其路由到特定的节点。

        ●Elasticsearch节点将请求转换为内部文档表示,并写入节点本地的倒排索引数据结构。

        ●数据写入内存缓存区,并将缓存区的数据定期刷新到磁盘上的低级别段。

        ●当低级别段大小到达指定的阈值时,低级别段将被合并为更高级别段。

        其中,Elasticsearch的文档写入本身是顺序执行的操作,Elasticsearch会将文档写入事务日志,并等待日志存储成功,之后再返回成功响应。

        在Elasticsearch集群中,数据分布和负载均衡是通过分片和副本实现的。当索引被创建时,每个索引都会被分成多个分片,每个分片可以分布到不同的节点上,这有助于提高数据的处理效率。同时,每个分片都可以拥有多个副本,使数据能够在集群中高可用。

        总之,Elasticsearch的写入流程是一个高度优化的过程,它利用了多种索引结构和数据副本来提高数据写入的并行度和可靠性,从而获得更好的写入性能和可靠性。

(ES的更新和删除流程)写数据的底层原理:

1)数据 删除和更新都是写操作,但是由于 Elasticsearch 中的文档是不可变的,因此不能被删除或者改动以展示其变更;所以 ES 利用 .del 文件 标记文档是否被删除,磁盘上的每个段都有一个相应的.del 文件

(1)如果是删除操作,文档其实并没有真的被删除,而是在 .del 文件中被标记为 deleted 状态。该文档依然能匹配查询,但是会在结果中被过滤掉。

(2)如果是更新操作,就是将旧的 doc 标识为 deleted 状态,然后创建一个新的 doc。

在Elasticsearch中,更新和删除操作也是建立在写操作之上的。它们都需要先读取数据,并将更新或删除的信息写入事务日志中,最后再根据具体的操作更新或删除原有数据。

下面是Elasticsearch的更新和删除流程:

  1. 客户端向Elasticsearch发送更新或删除请求,请求包含目标文档的ID以及需要更新或删除的字段内容。

  2. Elasticsearch集群接收到请求后,将其路由到特定的节点上。

  3. Elasticsearch节点将请求转换为内部表示形式,并读取原有数据。

  4. 节点先将原有数据标记为已删除状态,然后再将新数据写入内存中的索引结构中。

  5. 更新或删除操作的修改先被写入本地事务日志,以确保数据持久化保存。

  6. 在下一次提交操作的时候,Elasticsearch会将所有操作同步到所有副本,并将数据刷新到磁盘上。

在这个过程中,Elasticsearch通过在内存中的数据结构中维护多个版本的文档信息来支持文档的更新和删除。同时,重复利用已删除的空间和使用分段和副本来进行数据的分发和复制,也是实现更新和删除操作的关键技术。

需要注意的是,在Elasticsearch中,因为查询和写入的性能相对较高,所以更新和删除操作往往会被转换为先删除原有数据,再写入更新后的数据的操作。这通过可以减少数据结构的复杂度,从而提高数据的查询和索引性能。

综上所述,Elasticsearch的更新和删除操作都是基于文档的写入操作而实现的,通过在内存中缓存数据并利用多个索引结构和数据副本来提高写入性能和可靠性,同时利用事务日志来保证数据的持久性。

ES在高并发下如何保证读写一致性?

(1)对于更新操作:可以通过版本号使用乐观并发控制,以确保新版本不会被旧版本覆盖

(2)对于写操作,一致性级别支持 quorum/one/all,默认为 quorum,即只有当大多数分片可用时才允许写操作。

(3)对于读操作,可以设置 replication 为 sync(默认),这使得操作在主分片和副本分片都完成后才会返回

        在Elasticsearch集群中,多个节点同时处理读写请求时,可能会发生数据不一致的情况。为了避免此类问题,Elasticsearch采用以下几种方式保证读写一致性:

  1. 写入操作的全局视图

        在Elasticsearch中,每个写入操作会生成一个全局的版本号,该版本号会被分配给写入的文档或数据记录。在节点之间进行通信时,版本号会用于确定写入的顺序,以确保多个节点之间写入数据的一致性。当一个写入请求被接收时,Elasticsearch会检查当前版本号和要写入的文档或数据记录的版本号是否匹配,如果不匹配,则拒绝写入请求。

  1. 使用分布式锁

        Elasticsearch使用分布式锁来避免并发写入操作产生的问题。在Elasticsearch中,每个分片都有自己的分布式锁,用于保护该分片上的写入操作。当一个写入请求到达节点时,它会先尝试获得分片上的锁,以确保并发的写入操作不会相互冲突。

  1. 数据复制和副本同步

        在Elasticsearch集群中,每个数据分片都会有多个副本,这些副本会被同步到不同的节点上。在写入操作发生时,操作会先写入本地节点的内存中,在确认写入操作成功后,节点会将数据同步到其他副本所在的节点上。这种方式可以确保数据在集群中的各个节点之间同步,从而保证读写一致性。

        需要注意的是,Elasticsearch并不能完全避免并发写入操作带来的问题,但上述机制可以最大限度地减少并发操作带来的影响。同时,为了确保数据的一致性,Elasticsearch还提供了一些手动操作,如刷新和同步操作,可以在高并发读写场景中使用,以确保数据的一致性。

Elasticsearch的分布式原理(ES如何选举Master节点)

Elasticsearch 会对存储的数据进行切分,将数据划分到不同的分片上,

同时每一个分片会保存多个副本,主要是为了保证分布式环境的高可用。

在 Elasticsearch 中,节点是对等的,节点间会选取集群的 Master,

由 Master 会负责集群状态信息的改变,并同步给其他节点。

        Elasticsearch采用分布式架构,将索引数据分散存储在集群的多个节点中。在这种架构下,节点之间需要协调工作以实现高可用性、数据分发和复制等功能。

在Elasticsearch集群中,每个节点有可能成为Master节点、Data节点或者Master-eligible节点。

        Master节点是集群中的管理节点,它负责管理集群状态,包括索引和分片的分配、路由和复制等。一般情况下,Elasticsearch只会选取一个节点作为Master节点。

        Data节点是集群中存储数据的节点,负责数据的存储、查询和分析等操作。

        Master-eligible节点是既可以成为Master节点,也可以成为Data节点的节点。它们具有Master节点和Data节点的所有功能,并可以参与Master节点的选举过程。

        在Elasticsearch中,Master节点是通过选举产生的。Master节点负责监控集群状态,并做出相应的决策来保持集群状态的稳定性。在Master节点出现故障或不可用的情况下,集群会自动重新选举一个新的Master节点。Master节点的选举过程如下:

  1. 当集群中的Master节点出现故障或不可用时,其他节点(即Master-eligible节点)会开始一个选举流程。选举开始时,每个节点会投出一张选票,表示它自己可以成为一个新的Master节点。

  2. 每个节点都会将自己投出的选票广播给集群中的其他节点。

  3. 如果一个节点收到了超过半数的选票,它将成为新的Master节点。这个节点会通知其他节点,告诉它们它已经成为了Master节点。

  4. 如果没有一个节点获得了超过半数的选票,选举将会失败。此时,节点会等待一段时间后重新开始选举过程。

        需要注意的是,Elasticsearch的Master节点选举过程是自动的,并且是通过所有Master-eligible节点之间的协作和投票来实现的。因此,如果一个节点被选举为Master节点后出现故障或不可用,集群将重新进行Master节点的选举过程,并从可用的Master-eligible节点中选择一个新的Master节点。

        总之,Elasticsearch的分布式架构和Master节点选举机制保证了集群的高可用性和稳定性,同时具有良好的扩展性和负载均衡能力。

Elasticsearch是如何避免脑裂现象:

(1)当集群中 master 候选节点数量不小于3个时(node.master: true), 可以通过设置最少投票通过数量(discovery.zen.minimum_master_nodes), 设置超过所有候选节点一半以上来解决脑裂问题,即设置为 (N/2)+1

(2)当集群 master 候选节点只有两个时,这种情况是不合理的, 最好把另外一个node.master改成false。 如果我们不改节点设置,还是套上面的(N/2)+1公式,此时discovery.zen.minimum_master_nodes应该设置为2。 这就出现一个问题,两个master备选节点,只要有一个挂,就选不出master了

        “脑裂”又称为“脑分裂”,是指分布式系统中的节点发生网络隔离或宕机之后,不同子集的节点认为它们被选举为主节点(Master节点),导致系统出现不一致或数据丢失的现象。为了避免这种情况,在Elasticsearch中,采取了以下几种方法:

  1. 节点互通性检测

        Elasticsearch会在各个节点之间定期发送心跳包,以检测各个节点的可用性,从而防止正在运行的节点被隔离。如果节点不再收到来自主节点的响应,它将暂时认为它们已经被隔离,并将自己标记为不可用。

  1. 脑裂检测和处理机制

        为了避免脑裂现象,Elasticsearch引入了脑裂检测和处理机制。在Elasticsearch集群中,每个节点都有一个唯一的ID。当一个Master节点被选举时,它会将自己的ID发送到其他节点,并要求它们承认自己是次要Master节点。一旦Master节点检测到自己失去了与大部分节点的联系,它就会检查是否还有其他节点认为它们是Master节点。如果有,则Master节点会发送消息给其他节点,要求它们放弃成为Master节点的自认。如果没有,则Master节点会认为集群已发生脑裂并进行相应处理。

  1. 可靠的网络和硬件设备

        为了最大限度地减少脑裂现象的发生,Elasticsearch强烈建议使用可靠的网络和硬件设备,如高可用网络、冗余电源和存储以及集群容错软件等。

        通过上述措施和机制,Elasticsearch可以最大程度地避免脑裂现象的发生,保持集群的一致性和可用性。同时,如果脑裂现象发生,Elasticsearch的数据、复制和分片机制也可以在此基础上支持切分数据、复制数据以及高效分发数据,从而恢复集群的一致性和可用性。

ES的深度分页与滚动搜索scroll

(1)深度分页:** 深度分页其实就是搜索的深浅度,(比如第1页,第2页,第10页,第20页,是比较浅的;第10000页,第20000页就是很深了。) 搜索得太深,就会造成性能问题,会耗费内存和占用cpu。 而且es为了性能,他不支持超过一万条数据以上的分页查询。

(2)滚动搜索:** 一次性查询1万+数据,往往会造成性能影响,因为数据量太多了。 这个时候可以使用滚动搜索,也就是 scroll。 滚动搜索可以先查询出一些数据,然后再紧接着依次往下查询。 在第一次查询的时候会有一个滚动id,相当于一个锚标记 ,随后再次滚动搜索会需要上一次搜索滚动id 每次搜索都是基于一个历史的数据快照,查询数据的期间,如果有数据变更,那么和搜索是没有关系的。

        在Elasticsearch中,查询时涉及到分页问题,如果需要分页查看较多的结果,而这些结果又恰好分布在整个查询结果集中,这时就需要进行较多的查询请求,并且在每个查询请求中需要找到从哪个结果开始分页,自而影响查询性能。为了解决这个问题,Elasticsearch提供了两种解决方式:深度分页(Deep Pagination)和滚动搜索(Scroll)。

        深度分页:

        深度分页是指在查询结果集中跳过一定数量的结果,从而实现下一页(或者前一页)的操作。例如,当我们需要使用SQL查询语句 LIMIT 1000, 100 来实现查询第1001个结果时,Elasticsearch索引中的深度分页可以通过添加size和from参数来完成类似的操作,即查询size数量的结果,跳过from数量的结果,从而获得目标结果。

        深度分页可以非常好地解决数据分页的问题,但是在处理大规模数据时会变得耗时且不稳定。原因在于分页操作需要每次从头开始搜索,因此需要对查询进行多次操作,并在每次查询之间存储数据。对于太大的查询结果,这种方式可能会导致性能问题或者服务器内存不够。

        滚动搜索:

        滚动搜索是一种基于深度分页的改进,它将查询结果分成若干个批次,每个批次类似于深度分页的查询结果集,用户使用最初查询的ID获取下一个批次的数据。滚动搜索中,数据被存储在Elasticsearch中,不需要存储在客户端中。这种方法还使用了一种优化方式,即不删除已检索的文档,而只是在每个批次之间缓存查询结果,因此效率更高,且支持查询大型结果集。

        和深度分页相比,滚动搜索提供了更高效的遍历查询结果的方式。可以减少Elasticsearch的查询次数,提高检索效率和性能,并支持更细节的结果导航操作。但是,与深度分页相比,滚动搜索占用的资源更多,并且当时间太长时,缓存过期的问题需要进行调整。

        总之,Elasticsearch提供了多种方法来存储和检索大型数据集,包括深度分页和滚动搜索等。开发人员必须选择适合其数据分页需求的正确分页策略。它将大大提高Elasticsearch浏览和分页数据集的可伸缩性和性能。

Kafka

什么是Kafka?Kafka中有哪几个组件?

Kafka是分布式发布-订阅消息系统是一个可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。

组件:

主题(Topic):Kafka主题是一堆或一组消息。

生产者(Producer):在Kafka,生产者发布通信以及向Kafka主题发布消息。

消费者(Consumer):Kafka消费者订阅了一个主题,并且还从主题中读取和处理消息。

经纪人(Brokers):在管理主题中的消息存储时,我们使用Kafka Broker

        Kafka 是一种分布式流处理平台,通常被用于构建实时数据管道和流应用程序。它是由 Apache 软件基金会开发并开源的,最初由 LinkedIn 公司开发。Kafka 的目的是提供一种高吞吐量、低延迟的平台,用于处理实时数据流。它可以处理海量数据,并且能够在多个客户端之间进行数据共享。

        在 Kafka 中,数据的流向是通过 Kafka Topic 实现的。一个 Topic 可以被划分为多个 Partition,每个 Partition 可以在不同的 Broker 上,实现数据在集群中的分布式存储。而每个 Partition 的 offset 能够保证多个消费者消费同一个 Topic 的数据时,消费进度的一致性。同时,Kafka 提供了一套完备的生产者和消费者 API,实现生产者向 Topic 写入数据,消费者从         Topic 读取数据的功能。

        Kafka 的主要组件包括:

  1. Broker: Kafka 中的 Broker是 Kafka 服务器节点。它负责接收来自生产者的消息,存储这些消息并能将消息转发给消费者。

  2. Topic: 在Kafka中,所有的消息都必须进行归类。Topic是指用于分类消息的类别,即发布和订阅的消息类型。

  3. Partition: Partition 是Topic的分区,用于水平扩展Topic,分散数据存储和分配负载。一个Topic可以有多个分区,不同分区的数据被存储在不同的Broker上。

  4. Producer: 生产者是指向Kafka Broker发送消息的客户端应用。它将消息写入Kafka Topic中的一个分区中。

  5. Consumer: 消费者是指从Kafka Broker读取消息的客户端应用程序。它从Kafka Topic中读取消息,并将其传递给相应的应用程序。

  6. Consumer Group: 消费者分组是一组消费者的集合,它们共享相同的消费者组ID,并共同读取一个或多个分区。一旦分配到一个分区,一个分组中的消费者就可以互相协作处理该分区中的消息。

  7. Offset: Offset是指用来表示消费者在某个分区中已经处理的消息的位置。Kafka中的每个消息都有一个唯一的Offset,消费者消费过的消息将被记录在消费者端,对应的offset信息用来跟踪消费者的消费进度。

        总之,Kafka 提供了一种快速、可扩展、高性能的数据处理,消息处理和流处理架构,通过简化大数据应用的开发实现,使得企业更好地处理和利用大数据。

Kafka为什么那么快?

  1. Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的, 但是实际上,Kafka的特性之一就是高吞吐率。

  2. Kafka也可以轻松支持每秒百万级的写入请求,超过了大部分的消息中间件, 这种特性也使得Kafka在日志处理等海量数据场景广泛应用。

  3. 写入数据 顺序写入,由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。

  4. 读取数据 基于sendfile实现Zero Copy零拷技术减少拷贝次数,Batching of Messages 批量处理。 合并小的请求,然后以流的方式进行交互,直顶网络上限。

        Kafka 之所以能够处理大量的消息数据以及具有非常高的吞吐量,是由于其内部采取了一些优化手段,具有以下几个方面的原因:

  1. 基于分布式架构: Kafka 的设计文章采用了分布式架构,利用多个节点增加了集群的处理能力和容错能力,有效利用了大量的服务器资源。

  2. 内存和磁盘结合的架构: Kafka 消息存储的方式采用了“预读、预写”的方式,即数据首先在内存中被写入,然后再批量写入磁盘,这样可以避免频繁地访问磁盘而导致的性能瓶颈,并提高了数据写入和读取的效率。

  3. 批量处理技术:Kafka 会对生产者的数据进行异步批量处理,把多个小数据包批量提交到服务器,这样就可以大大减少网络通信的次数,从而显著提高了生产者的吞吐量。

  4. 零副本技术:Kafka 的每个分区都有多个副本,这些副本统称为 ISR(In-Sync Replicas)。当主副本故障后,Kafka 会从 ISR 中自动选择一个副本作为新的主副本,从而不需要等待人工介入,提高了系统的可用性和响应速度。

  5. 提供基于索引机制:Kafka 以任意顺序接收和发送消息,并为消息存储位置维护一个离散的索引结构,这种索引结构可以帮助 Kafka 快速地定位和访问指定的消息。

        综上所述,Kafka通过采用分布式架构、内存和磁盘结合的方式、批量处理技术、零副本技术以及提供基于索引机制等方式,从架构

Kafka系统工具有哪些类型?

  1. Kafka迁移工具:它有助于将代理从一个版本迁移到另一个版本。

  2. Mirror Maker:Mirror Maker工具有助于将一个Kafka集群的镜像提供给另一个。

  3. 消费者检查:对于指定的主题集和消费者组,它显示主题,分区,所有者。

        Kafka 提供了一些系统工具,用于监视、管理和诊断 Kafka 系统的运行情况和问题。这些系统工具可以帮助管理员和开发人员更好地理解和控制 Kafka 系统。根据不同的用途和功能,Kafka 的系统工具主要可以分为以下几种类型:

        1.生产和消费者工具:Kafka 提供了一些命令行工具,如 kafka-topics.sh、kafka-console-producer.sh、kafka-console-consumer.sh 等,用于生产和消费消息,并帮助你测试 Kafka 集群是否正常工作。

        2.管理工具:Kafka 提供了一些命令行工具,如 kafka-server-start.sh、kafka-configs.sh、kafka-acls.sh 等,用于管理 Kafka 集群的配置、访问控制和安全性等问题。

        3.监控工具:Kafka 提供了 Kafka Metrics Reporter API,可以将 Kafka 集群的指标报告给外部监控工具,如 Graphite、Ganglia 等。此外,还可以使用 Kafka Connect、Kafka Streams 等工具,对 Kafka 集群的运行状态和消息流进行监控和分析。

        4.诊断工具:Kafka 提供了一些工具,如 kafka-logs-dump.sh、kafka-producer-perf-test.sh、kafka-consumer-perf-test.sh 等,用于分析和优化 Kafka 集群的性能和吞吐量,在确定和解决问题时非常有用。

        总之,Kafka 的系统工具可以帮助管理员和开发人员更好地理解、配置、管理和监控 Kafka 集群,从而缩短故障排除时间、提高系统的可用性和稳定性。

Kafka的message格式是什么?

一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成

header部分

  • 由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成。

body部分

  • 由N个字节构成的一个消息体,包含了具体的key/value消息

        Kafka 的 Message 是指生产者向 Kafka Broker 发送的数据单元,它由固定格式的 Message Header 和 Message Body 两部分组成。

        1. Message Header

        Message Header 包含了以下的信息:

        ●标志位(bit):1 bit,用于标识当前 Message 是否压缩。

        ●版本号(byte):1 字节,用于标识当前 Message 的版本号。

        ●代码(byte):1 字节,用于表示当前 Message 的压缩类型,如 GZIP、Snappy 等。

        ●消息计数(varint):1 ~ 5 字节的变长整型,用于表示 message body 的长度。

        ●关键字(byte array):可变长度的字节数组,用于作为 Kafka 的查询标识。

        2. Message Body

        Message Body 是实际的消息内容,包含要传输的数据。在 Kafka 中,Message Body 需要被处理为字节数组类型,可以是任何可序列化为字节数组的数据类型。

        总之,Kafka 的 Message 格式非常简单,由固定格式的 Message Header 和 Message Body 两部分组成,而且支持多种压缩算法和字节序列化方式,可以通过配置文件自由选择。深入了解 Message 的格式,有助于我们更好地理解和掌握 Kafka 的工作原理和开发技能。

Kafka的优点有那些?

1 高吞吐量:我们在Kafka中不需要任何大型硬件,因为它能够处理高速和大容量数据。 此外,它还可以支持每秒数千条消息的消息吞吐量。

2 低延迟:Kafka可以轻松处理这些消息,具有毫秒级的极低延迟,这是大多数新用例所要求的。

3 容错:Kafka能够抵抗集群中的节点/机器故障。

4 耐久性:由于Kafka支持消息复制,因此消息永远不会丢失。这是耐久性背后的原因之一。

5 可扩展性:Kafka可以扩展,而不需要通过添加额外的节点而在运行中造成任何停机。

        Kafka 作为一个分布式流消息系统,拥有很多优点,包括:

  1. 高吞吐量:Kafka 致力于高性能消息传输。它可以轻松承受大规模的消息流量,并且具有非常高的吞吐量,每秒可以处理数百万个消息。

  2. 可靠性:Kafka 具有高可靠性,采用副本机制来防止单点故障。即使系统中的一个节点宕机,其他节点也可以继续正常工作,确保数据的完整性和可用性。

  3. 可伸缩性:Kafka 可以水平扩展,通过添加更多的节点来实现更高的存储容量和处理能力,并支持无缝快速的扩容和缩容。

  4. 多消费者支持:Kafka 的多消费者系统支持多个消费者共享订阅的 Topic 消息,实现了高实时、低延迟的消息传输。

  5. 灵活的消息持久性:Kafka 通过以日志文件的形式记录每个消息的事实,从而实现了持久性。Kafka 还提供了许多可配置的数据保留策略来支持各种应用程序。

  6. 多语言支持:Kafka 提供了多种语言的客户端API,如 Java、C、Python、.NET等,便于不同的开发者和应用程序与 Kafka 交互以及本地化应用程序的开发。

  7. 开源和社区支持:Kafka 以开源方式发布,使用和开发者都可以享受到丰富的社区支持,包括更好的文档、库和例子等。

        总之,Kafka 具有高可靠性、高扩展性、高吞吐量和灵活性等多种优势,已经成为数据高速管道的流行选择。同时,Kafka 社区不断推出新的功能和改进,使其变得更强大、更易于使用、更适合多种应用场景。

为什么要使用Kafka?为什么要使用消息队列?

  1. 缓冲和削峰: 上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余, kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。

  2. 解耦和扩展性: 项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。 只需要遵守约定,针对数据编程即可获取扩展能力。

  3. 冗余: 可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。

  4. 健壮性: 消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。

  5. 异步通信: 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制, 允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

        为什么要使用 Kafka?

  1. 大规模数据处理:Kafka 适用于大规模数据流处理,能够高效地传输和处理海量的数据。

  2. 高性能:Kafka 具有高性能的特点,可以快速的传输和处理大量的信息,每秒能够处理数百万条消息,支持互联网级的实时数据流处理。

  3. 可靠性:Kafka 具有高可靠性,支持数据即时持久化,数据安全存储在分布式系统中。

  4. 可伸缩性:Kafka 是一种可伸缩的系统,能够快速适应不同的负载,支持水平扩展,适用于不断增长的需求。

  5. 多语言支持:Kafka 支持多种编程语言,便于不同团队和开发者同时使用。

  6. 灵活性:Kafka 提供了灵活的部署方式和数据持久化策略,适用于许多不同的应用场景。

        为什么要使用消息队列?

  1. 解耦:消息队列可以将不同的组件或服务完全解耦,降低了系统间的依赖性,从而提高系统的可维护性和扩展性。

  2. 异步:消息队列可以实现异步通信,系统间不需要等待对方响应,能够更快地响应请求和处理任务。

  3. 缓冲和削峰:消息队列能够异步存储和缓存客户端发送的请求和响应,降低系统的负载压力,避免流量峰值对系统性能的影响。

  4. 可恢复性:消息队列将数据保存在持久化存储上,即使某个组件或服务发生故障也可以保证数据的安全。

  5. 顺序性:消息队列在排队过程中可以保证消息的有序性。

  6. 多处理方式:消息队列提供了不同的消息处理方式,支持多个消费者相互协作来进行线程任务调度。

        总之,消息队列和 Kafka 是非常有用的工具,适用于需要处理大量数据和解决高并发、时间敏感性等问题的应用场景。这两种工具不仅可以提高系统的可靠性和性能,而且能够降低系统间的耦合度,提高系统的可维护性和扩展性。

Kafka存在那些局限性?

  1. 没有完整的监控工具集

  2. 消息调整的问题

  3. 不支持通配符主题选择

  4. 速度问题

        尽管 Kafka 是一种优秀的分布式消息系统,但它仍然存在一些局限性:

  1. 存储问题:Kafka 为了保证高吞吐量和低延迟,采用了基于磁盘的存储方式,但是这样会占用较高的磁盘空间,且不太适合存储大量的小型消息。

  2. 无法实时查询:虽然 Kafka 可以通过索引机制很快地访问消息,但无法支持实时查询。

  3. 复杂性:Kafka 的架构和使用相对复杂,需要配置和管理多个组件和参数,需要较高的技术水平才能使用。

  4. 不提供消息顺序保证:在同一分区中,Kafka 能保证消息的顺序性,但对于多个分区或不同主题的情况,Kafka 并不能提供消息顺序的保证。

  5. 可靠性问题:尽管 Kafka 通过副本机制实现了高可靠性,但在某些特殊情况下仍然可能出现数据丢失或数据不一致的问题。

  6. 负载均衡问题:Kafka 需要进行负载均衡,但如果负载均衡不当,则可能会导致某些节点的负载过高,而另一些节点无法充分利用。

        总体来说,Kafka 作为一个分布式消息系统,具有很多的优点,但也有一些缺点和局限性。在使用 Kafka 时需要综合考虑实际需求和系统瓶颈,做出合理的配置和决策,从而最大程度地发挥 Kafka 的优势。

Kafka为什么不支持读写分离?

在 Kafka中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的, 从而实现的是一种主写主读的生产消费模型。

Kafka 并不支持主写从读,因为主写从读有 2 个很明 显的缺点:

延时问题

数据一致性问题

        Kafka 目前不支持读写分离,这是由于其设计之初就并不考虑这种场景。以下是一些可能的原因:

  1. 写入操作不频繁:Kafka 的设计初衷是为了支持高吞吐量的写入操作,因此它主要优化了写入操作的性能和效率。相比之下,读取操作不太频繁,因此在设计时并未优化读取性能,也就不需要支持读写分离。

  2. 一致性要求高:Kafka 中的 Topic 消息默认的副本因子为1,即每个分区只有一个副本。这是为了最小化延迟和数据不一致性的风险。如果支持读写分离,则会增加数据不一致的可能性,并且可能会影响系统稳定性。

  3. 分区机制:Kafka 使用分区机制管理消息,每个分区都有一定数量的副本。如果采用读写分离,需要将写操作的消息复制到所有副本中,这可能会带来额外的开销和复杂性,从而影响系统的性能和可靠性。

        总之,Kafka 并不支持读写分离这一特性,这是由其设计初衷和架构特点决定的。对于需要读写分离场景的业务需求,可以考虑使用其他同类产品,或通过自行拓展来实现。

Kafka如何实现延迟队列?

Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,

而是基于时间轮自定义了一个用于实现延迟功能的定时器

底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。

TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask.

        Kafka 并不是一个专门的延迟队列,但是可以通过特定的处理方法实现延迟队列的功能。下面介绍两种实现方式:

        1. 通过时间戳来控制消费时间

        在生产者创建消息时,可以通过在消息的 Header 中增加一个时间戳字段来指定消息的过期时间。消费者在消费消息之前可以通过对消息头的时间戳进行判断,如果消息已经过期,则不进行消费。这样就可以将 Kafka 用作延迟队列,让消费者在指定的延迟时间之后才消费消息。

        2. 通过分区和偏移量控制消费时间

        在消费者消费消息时,可以控制消息的消费时间,即消费者不会立刻消费该消息,而是暂时先不消费,将偏移量保存在内存中。然后在指定的延迟时间之后,再去消费消息。这样就可以实现延迟消费的功能。

        需要注意的是,这种方式也有风险,如果某个消费者在指定的延迟时间内宕机了,那么这个消息可能会失效。因此需要在选择延迟队列的实现方式时,根据实际业务需求选择最适合的方案。

        总之,Kafka 通过在消息头中添加时间戳字段、控制消费时间等方式,可以实现延迟队列的功能。但需要注意的是,使用 Kafka 作为延迟队列的情况属于非正常用法,需要注意可靠性和实现方式的安全性。

Kafka如何保证消息可靠性

导致消费者弄丢数据的情况就是

拉取了这个消息,然后消费者那边自动提交了,Kafka 以为你已经消费好了这个消息,

但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢了。

解决:

关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢

broker弄丢数据

解决:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。

  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1, 这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。

  • 在 producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。

  • 在 producer 端设置 retries=10000000(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

生产者弄丢数据

如果设置了 acks=all,一定不会丢,要求是,你的 leader 接收到消息,

所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次

        Kafka 的可靠性保证是它最重要的特性之一,以下是 Kafka 如何保证消息可靠性的方式:

  1. 消息持久化:Kafka 通过将所有消息持久化到磁盘中,确保即使在出现硬件故障、程序崩溃或其他异常情况下,平台仍然可以保持数据完整性。

  2. 复制机制:Kafka 使用了副本机制,即每个分区都有多个副本,每个副本都能一次性接收和处理分配到该分区的全部数据。Kafka 会把分布在不同 Broker 上的分区副本保存在不同的机架或节点上,以保证分区复制极具弹性和扩展能力。同时,Kafka 也能够自动地检测和处理副本损坏或者丢失的情况,保证数据完整性和可用性。

  3. 同步写入机制:Kafka 有两种发送消息的方式,异步和同步。异步写入是指在发送消息时不等待 Broker 的确认,即异步发送消息,在该操作下,出现错误后也不会有异常处理。

        而同步写入机制是指客户端发送一条消息到集群前,先等待 Broker 的确认,直到 Broker 确认并将消息写入磁盘后,才会响应请求发送成功。这种同步写入机制会消耗额外的开销,但可以保证消息不丢失、不重复处理。

  1. 批量处理机制:Kafka 采用了批量处理机制,将多个消息一起批量发送,提高 Kafka 处理消息的性能和效率。

  2. 故障转移机制:在 Kafka 集群中,当 Leader 发生故障时,ZooKeeper 会自动将这个 Partition 的副本切换为 New Leader,在这个过程中,Kafka 自动重平衡,确保消息不丢失并且高可用。

        总之,Kafka 通过消息持久化、复制机制、同步写入机制、批量处理机制和故障转移机制等措施,保证了消息的可靠性。即使在一些不可避免的异常情况下,Kafka 都能够稳定可靠地运行,处理海量的消息。

Kafka中是怎么体现消息顺序性的?

  1. 可以设置topic 有且只有一个partition

  2. 根据业务需要,需要顺序的 指定为同一个partition

  3. 根据业务需要,需要顺序的指定为同一个partition (比如同一个订单,使用同一个key,可以保证分配到同一个partition上)

        在 Kafka 中,消息顺序性是由 Partition 分区来体现的。具体地说,同一个 Partition 中的消息是有序的,而不同 Partition 中的消息可能是无序的。

        每个 Partition 有一个 Leader 节点和多个 Follower 节点。Producer 发送消息时,指定消息发送到指定 Partition 的 Leader 节点,Leader 负责将消息追加到该 Partition 的本地日志中,并将该消息同步到 Follower 节点中。由于消息追加是单线程进行的,Leader 节点可以保证同一 Partition 中的消息的顺序性。

        另外,每个消息在 Kafka 中都有一个唯一的 Offset,表示该消息在 Partition 中的相对位置。消费者消费消息时,可以通过指定 Offset 精确定位消息的位置,从而保证消息的顺序性。消费者可以通过对消息的 Offset 进行排序或者根据条件筛选需要消费的消息,再进行后续的处理。

        总之,Kafka 中的消息顺序性是由 Partition 来保证的,同一 Partition 中的消息是有序的,而不同 Partition 中的消息可能是无序的。消费者可以通过 Offset 精确定位消息的位置,从而保证消息的顺序性。

创建topic时如何选择合适的分区数?

kafka集群中,单Topic的partition也并不是越多越好,但通常对于业务方来说

需要根据具体的场景进行分析以确定partition的数量。

通常Kafka集群中的分区越多,吞吐量就越高。但是,必须意识到总分区或每个Broker拥有太多分区 对可用性和延迟等方面的潜在影响,通常这部分需要业务方和基础服务方进行合理规划和调整。

一般情况下,分区数可以配置为Broker节点数的整数倍,

比如:Broker节点是3,那么可以设置分区数为3、6、9。

但是不能无限扩展, 有临界值, 临界值多少, 需要通过压测工具进行测试

        在创建 Kafka Topic 时选择合适的分区数可以影响 Kafka 的性能和吞吐量。以下是一些选择分区数的建议:

  1. 考虑 Topic 的用途:分区数的大小应该基于要处理的数据的类型和用途。例如,如果需要处理实时的高并发数据,则需要选择较多的分区数来支持更多的并发处理。而如果仅用于备份数据或仅处理低频数据,可以选择较少的分区数。

  2. 考虑数据处理能力:分区数的大小应该与处理数据的实际能力相匹配。如果 Kafka 集群的处理能力不足,过多的分区数可能会导致延迟和性能下降。

  3. 考虑数据流量大小:分区数的大小应该基于数据流量的大小来确定。如果数据流量很小,过多的分区数可能会导致资源浪费。而如果需要处理高流量数据,则需要选择更多的分区数。

  4. 考虑负载均衡:分区数的大小应该基于负载均衡的需要。例如,如果希望 Kafka 集群各节点负载均衡,并且同时希望保证每个节点上的分区数量尽量均衡,可以考虑选择分区数为节点数量的整数倍。

  5. 考虑数据冗余:选择合适的分区数还需要考虑数据冗余的需求。如果希望数据在多个节点之间进行冗余备份,需选择更多的分区数。

        总之,在选择分区数时需要综合考虑数据用途、处理能力、数据流量、负载均衡和数据冗余等因素。正确选择分区数可以提高 Kafka 的性能和吞吐量,提高系统稳定性。

消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

offset+1

        在 Kafka 消费者提交消费位移时,应该提交的是消费者已经消费的最后一条消息的 offset + 1,表示消费者已经成功消费了该消息,准备消费下一条消息。

        具体来说,Kafka 的每个分区都有一个对应的消费位移(Consumer Offset),表示消费者最后一次成功消费该分区消息时的 offset。在消费者消费消息时,应该记录当前消费到的最后一条消息的 offset,即准备消费下一条消息的 offset。而在提交消费位移时,应该将上一条已经成功消费的消息的 offset + 1 提交给 Kafka。

        只有当消费者成功消费了一条消息并提交该消息的 offset + 1,才能保证已经处理了该消息,避免消息被重复消费或漏掉消息,从而保证每条消息只被消费一次。

        总之,消费者提交消费位移时提交的是消费者已经成功消费的最后一条消息的 offset + 1,表示消费者已经成功消费了该消息,准备消费下一条消息。

有哪些情形会造成重复消费?

消费者消费后没有commit offset(程序崩溃/强行kill/消费耗时/自动提交偏移情况下unscrible)

        在 Kafka 中,可能会出现一些情况导致重复消费。以下是几个可能导致重复消费的情况:

  1. 消费者提交消费位移不准确:如果消费者提交的消费位移不准确,即消费者提交了未消费的分区,该分区可能会被重新读取,导致重复消费。

  2. 网络故障或进程崩溃:如果由于网络故障或进程崩溃导致消费者重启,他可能会从头开始读取分区的消息,因此这些消息可能会被重复消费。

  3. 设置不当的消息保留策略:如果设置消息保留的策略过短,Kafka 会自动清理旧消息,如果消费者在消息被清理之前没有正确地处理消息,则可能会导致消息的重复消费。

  4. 消费者组编号未设置或设置不当:如果消费者组编号未正确设置,或者同一个消费者组的多个消费者在处理同一分区的消息时,并没有正确地协调管理消费位移,这可能会导致消息的重复消费。

        要防止消息的重复消费,可以采取以下方法:

  1. 消费者进行消费位移管理:需要确保消费者提交的消费位移是正确的,即已经成功消费该消息并将消费位移提交给了 Kafka。

  2. 设置适当的消息保留策略:需要注意保留消息的时间,确保消费者在消费期间能够正常地读取到消息,避免消息被过早清理。

  3. 设置消费者组编号:确保同一消费者组的多个消费者能够协调管理消费位移,并在消费期间正确地处理消息。

  4. 配置合适的消息过滤策略:消费者可以根据业务需求设置过滤规则,只选择需要的消息进行消费,减少重复消费的可能性。

        总之,要防止消息的重复消费,需要对消费位移进行管理并设置适当的消息保留策略、正确地设置消费者组编号并配置合适的消息过滤策略。这样可以最大程度地避免重复消费带来的影响。

哪些情景下会造成消息丢失

消费者没有处理完消息 提交offset(自动提交偏移 未处理情况下程序异常结束)

        Kafka 设计时考虑了数据可靠性,但一些特殊情况下,仍有可能导致 Kafka 中的消息丢失。以下是一些可能导致 Kafka 消息丢失的情况:

  1. 生产者消息未完全持久化问题:如果 Kafka 在消息写入内存后,却在消息写入磁盘之前发生了错误,例如 Kafka 节点宕机、网络异常等,那么这些写入内存但未持久化的数据将会丢失。

  2. 异步发送消息且未等待确认:如果生产者发送消息使用的是异步方式,而且未等待确认,如果在消息发送和写入磁盘之前出现了故障,例如网络异常、Broker 宕机等原因,那么这些未被确认的消息将会丢失。

  3. 分区数量设置不合理:如果某个主题的分区数量设置不合理,可能会出现某个分区过热、过多的消息无法平均分配到各个分区等现象,导致消息丢失。

  4. 消费者提交位移不正确:如果消费者提交了错误的消费位移,例如提交过大的消费位移,此时 Kafka 无法找到要消费的消息,从而导致消息丢失。

  5. Broker 出现故障:如果 Kafka 集群中的某个 Broker 出现了硬件故障或者崩溃,当消息被写入该 Broker 且还未同步到其他 Broker 上时,这些未同步的消息可能会丢失。

        要避免 Kafka 消息丢失,可以采取以下措施:

  1. 使用可靠性级别(Replication):Kafka 应该通过设置可靠性级别(Replication)为 1 或更多来保证数据可靠性。这样可以保证即使 Kafka 集群中的某些节点出现故障,仍然能够保持数据的完整性。

  2. 生产者使用同步方式:建议将消息生产者的发送方式设为同步写入,以确保生产者成功写入 Kafka 并得到确认后,才会继续发送下一条消息,降低消息丢失的风险。

  3. 控制分区数量:设定合理的分区数量可以分散消息的负载,避免过热的分区和过多的消息无法平均分配到各个分区等问题,从而降低消息丢失的风险。

  4. 消费者使用自动提交时定期提交偏移量:消费者使用自动提交模式时,应该定期提交位移,以避免由于异常中断等原因,消费位移没有提交导致消息丢失的情况。

        总之,要避免 Kafka 中消息的丢失,需要使用可靠性级别、生产者采用同步方式、控制分区数量、消费者定期提交消费位移等方法来保证数据的可靠性。

RabbitMQ

MQ如何保证消息幂等性?

  1. 乐观锁机制

  2. 基于Redis的原子操作

  3. 增加消息状态表。通俗来说就是一个账本,用来记录消息的处理状态, 每次处理消息之前, 都去状态表中查询一次,如果已经有相同的消息存在,那么不处理,可以防止重复发送

        保证消息的幂等性是消息中间件(MQ)在面对网络分区、请求重发、系统错误等情况下保证业务数据一致性的重要手段。下面介绍几个常见的消息队列(MQ)保证消息幂等性的方法:

  1. 消息体去重: 在消息体中添加唯一 ID 标识,并在消费者端判断该 ID 是否已经处理过,若已经处理,则直接忽略该消息,确保该条消息仅被处理一次。

  2. 利用数据库的唯一性约束:可以利用数据库的唯一性约束特性,将消息唯一标识作为数据表的主键,确保多次重复写入时只保留一条记录,忽略其他的消息。在确认处理成功之前,需要确保唯一性约束不会被绕过,否则会导致数据丢失。

  3. 分布式锁:可以利用分布式锁来保证同一个消费者在相同的时刻只能处理一条相同的消息。例如,利用 ZooKeeper 或 Redis 等工具实现分布式锁,锁住消息体中的唯一 ID,确保同一消息只能被一个消费者处理。

  4. 利用 Redis 的 setnx 命令:在 Redis 中利用 setnx 命令,将消息体中的唯一 ID 作为参数,将其作为 key 存储并设置过期时间,即可防止同一消息被重复处理。多个线程或多个消费者在处理消息时可以通过 setnx 命令竞争 key,只有竞争成功的线程或消费者才能成功处理消息。

        总之,在实际应用中可以采用以上介绍的方法来保证消息的幂等性。需要根据具体的业务场景和功能需求,综合考虑使用哪种方式来达到最好的效果,并需要进行针对性的测试,保障系统的可靠性和正确性。

消息中间件的应用场景

开发中消息队列通常有如下应用场景:

1、任务异步处理。

将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

2、应用程序解耦合

MQ 相当于一个中介,生产方通过 MQ 与消费方交互,它将应用程序进行解耦合。

        MQ(Message Queue),即消息队列,是一种异步通信的工具,可以在分布式系统中实现应用解耦、提高可靠性、降低延迟和消峰限流等如下场景。

  1. 异步处理:通过消息队列实现异步处理,将请求发送到消息队列中,再让其他进程或服务在后台异步处理实际请求,能够提高系统的响应速度和并发能力。

  2. 应对突发流量:在特定时间出现大量请求时,消息中间件可以缓解压力,通过存储消息并限制消费速率等方式实现请求的限流或丢弃,避免系统崩溃或缓慢。

  3. 解耦合并:通过消息队列实现应用程序或微服务的解耦,降低不同部分之间的耦合性,提高系统的可扩展性和可维护性。比如实现应用系统的拆分和协作,数据平台的数据分发,微服务架构的自治。

  4. 高可靠性消息传递:消息队列采用多副本备份的方式来保存消息和元数据。当某个节点出现故障时,可以从备份节点重新进行数据恢复,确保数据的可靠性。

  5. 事件驱动:事件驱动架构(Event-Driven Architecture)中,消息队列作为事件生产者和事件消费者之间的交互媒介,分离出不同事件,并提供可靠的消息传递,从而实现松耦合架构,使系统更具可伸缩性。

  6. 数据缓冲层:利用消息队列作为数据缓冲层,可以平滑处理负载峰值,使后端处理程序可以以更快速的速度处理数据。

        总之,消息队列在分布式系统中可以应对各种场景,例如解耦合,并发控制,可靠的消息传递等场景,能够有效提高系统的可靠性和性能,并且具有广泛的适用性。

什么是消息队列?什么是RabbitMQ?

消息队列

使用队列来通信的组件,把要传输的消息放在队列中。

RabbitMQ

它是实现了高级消息队列协议(AMQP)的消息中间件。

        消息队列(Message Queue)是一种在分布式系统中应用广泛的解耦工具,它将应用程序之间传递的消息往往存储在一个中间件(如消息服务提供商)中。通过将消息的所有者(发送者)与使用者(接收者)分离开来,消息队列可以实现异步处理、提高系统可靠性、降低延迟和消峰限流等功能。

        RabbitMQ 是一种开源的消息队列中间件,它是采用 Erlang 语言开发的 AMQP(Advanced Message Queuing Protocol)实现,具有可靠性高、兼容性好、扩展性强等优点。RabbitMQ 支持多种消息传递模式,如点对点模式(Point-to-Point)和发布/订阅模式(Publish/Subscribe),以及多种交换器和队列类型,使用灵活、易于定制和维护。

        RabbitMQ 与其它消息队列相比,具有如下优点:

  1. 可靠性高: RabbitMQ 使用写入磁盘的方式保存消息,即使在出现系统错误的情况下,也可以保证消息被保存。

  2. 代理传递:RabbitMQ 采用基于代理的传递方式,可以减轻客户端的负担,降低通信开销。

  3. 支持多种编程语言:RabbitMQ 提供了多种语言(例如 Java、Python、Ruby、JavaScript 等)的客户端库,使得开发人员可以在不同的编程语言中使用 RabbitMQ。

  4. 可扩展性强:RabbitMQ 支持集群部署,具有良好的扩展性和可靠性,可以满足高吞吐量、高并发量的场景。

        总之,RabbitMQ 是一个功能强大、简单易用的开源消息队列中间件,它为分布式系统提供了传递消息、分离解耦和实现异步处理等重要功能,被广泛地应用于许多企业级应用场景中。

为什么要使用消息队列? 消息队列的优点

1 - 系统解耦:解耦消息生产者和消费者之间的关系

2 - 异步调用:用户调用接口时,由于接口之间调用导致用时时间比较久,用户体验不好。 调用接口后将消息放入到MQ后就返回,用户体验好,最终一致性由MQ来保证

3 - 流量削峰:减少高峰时期对服务器压力,先把请求放到MQ中,系统根据实际能处理的并发量来消费请求

        使用消息队列的好处有很多,下面列举几个常见的优点:

  1. 解耦:通过将消息队列作为中间件,可以将不同部分之间的耦合性降至最低。生产者只需要将消息发送到消息队列中,而不需要知道谁会处理消息,由消费者来主动获取消息并处理,从而实现不同组件之间的解耦。

  2. 异步处理:消息队列能够实现异步处理,将请求发送到消息队列中,再让其他进程或服务在后台异步处理实际请求,能够提高系统的响应速度和并发能力。

  3. 提高可靠性:消息队列采用多副本备份的方式来保存消息和元数据,当消息队列服务节点出现故障时,可以从备份节点重新进行数据恢复,大大提高了系统的可靠性。

  4. 缓存:利用消息队列的缓存功能,可以缓存常用的数据或资源,使用消息队列作为缓存层,能够减轻后端的负担,提高系统的性能。

  5. 可伸缩性:消息队列支持负载均衡和水平扩展等功能,可以动态地添加或删除节点,从而实现可伸缩性的需求。

  6. 自动重试:当消费者在消费消息时出现故障时,消息队列能够支持自动重试,直到消息被成功消费为止,避免消息丢失和应用程序异常中断。

        总之,消息队列在分布式系统中应用广泛,主要通过解耦、异步处理、提高可靠性、缓存、自动重试和可伸缩性等功能,提高系统性能和可靠性,同时降低系统实现成本和维护难度。

消息队列的缺点,消息队列带来的问题

1 - 系统可用性降低。如果MQ挂了,整个系统就不能服务了

2 - 系统复杂性提高。消息丢失、消息重复消费、消息重复发送、消息顺序错乱等问题

3 - 一致性问题。将消息放到MQ后就返回给用户成功的信息,但是其他系统消费消息时, 若某个系统失败了,导致数据不一致

        使用消息队列也存在一些缺点和问题,下面列举一些常见的:

  1. 消息丢失问题:由于消息队列本质上是异步的,生产者将消息发送到队列中之后,只能保证消息已经被成功发送到队列中,但不能保证消息是否被消费者成功消费。如果在消息被消费之前,消费者或队列节点由于某种原因宕机或崩溃,消息便会丢失,造成数据的不一致。

  2. 系统复杂性问题:使用消息队列过程中,需要考虑消息的生产、传输和消费过程中可能出现的错误和故障,并进行相应的处理。此外,由于消息队列需要引入第三方中间件作为消息媒介,会增加系统的复杂性和维护难度。

  3. 同步性问题:对于一些需要有数据同步需求的业务场景,如银行交易等,若使用消息队列进行异步处理则可能会造成数据同步失效,其实时性和可靠性不能得到保证。

  4. 性能问题:尽管消息队列通过缓解请求压力和实现异步消息处理等方式来提高系统整体的并发能力,但是在消息发送、解析、存储等过程中会带来一定的性能损耗,并增加系统的延迟。

  5. 数据一致性问题:在进行分布式事务或者分布式锁等场景时,使用消息队列可能会导致数据一致性问题。异步消息和分布式锁的组合使用需要谨慎,在具体实现时需要考虑保证数据一致性的机制。

        总之,尽管消息队列在分布式系统中应用广泛,但是在使用时也需要注意这些缺点和问题,并在具体的业务场景下进行加以处理和优化。

消息过期时间,队列过期时间

消息过期时间

目前有两种方式设置消息的过期时间,一是通过队列属性设置,二是通过消息单独设置。 如果两种方式同时使用,以最小值为准。

队列过期时间

通过 channel queueDeclare 方法中的 expires 参数可以控制队列被自动删除前处于未使用状态的时间 。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过Basic Get命令

        消息过期时间和队列过期时间是两个不同的概念,分别用于生产者将消息发送到队列中设置过期时间和消息队列清理消息的时间。

  1. 消息过期时间: 当生产者发送消息到消息队列时,可以设置消息过期时间。如果消息在该时间内没有被消费,则会在消息队列中自动被清除。可以通过配置消息队列的 ttl(Time-To-Live,生存时间)属性来设置消息过期时间。

  2. 队列过期时间:队列过期时间是指当一个队列在一段时间内没有被使用时(如没有新的消息和客户端订阅),将自动被删除。队列的过期时间一般不是在消息的元数据中设置,而是由消息队列服务提供商设置。消息队列服务提供商提供相应的系统参数,系统管理员可以调整这些参数来控制队列的生命周期。

        通过设置消息过期时间和队列过期时间,可以有效地避免过期/无用的消息和队列占用系统资源的问题。在实际应用中,需要根据具体业务场景和性能要求设置合理的过期时间来保证系统的可靠性和高效性。同时,也需要了解所使用的消息队列服务提供商的具体实现和规则,以便合理属性消息过期时间和队列过期时间。

死信队列(DLX),变成死信的情况

DLX ,全称为 Dead-Letter-Exchange ,可以称之为死信交换器。

当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中, 这个交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。

消息变成死信一般由于以下几种情况:

  • 消息被拒绝

  • 消息过期

  • 队列达到最大长度

        死信队列(Dead Letter Queue,DLQ)是指当消息不能够被正常消费时,将消息存储到另外一个队列中,即死信队列。死信队列一般是一个专门用来存储无法被处理的消息的中间件对象。

        当消息传递到消费者时,会有一些错误出现,例如消息超时、消息体不符合消息格式、消费者消费失败等。这些错误被称为“死亡”,消息将被标记为“死信”,并被发送到预先定义的死信队列中,以便进一步处理。

以下是一些消息被标记为“死信”的常见情况:

  1. 消息过期:当消息在消息队列中排队等待时间超出其生存时间时,消息会被标记为“死信”。

  2. 消息被拒绝:当消费者拒绝消费某个消息时,消息可能被标记为“死信”。一种常见的情况是,当消费者无法对消息进行处理并将其退回到消息队列中时,消息将被标记为“死信”。

  3. 队列满:如果消息队列的容量已经达到限制,无法再存储更多的消息,此时新的消息将被标记为“死信”。

  4. 消费失败:如果消费者处理消息失败,可能会将消息标记为“死信”。

  5. 其他异常情况:例如消息的元数据缺失、消息体格式错误、没有适合的消费者等情况。

        总之,死信队列是一种处理那些传递失败的消息的方式,可以提高消息的可靠性和灵活性。在实际应用中,合理规划和使用死信队列,可以帮助我们有效应对各种消息处理异常情况,并实现更加高效可靠地消息通信。

RabbitMQ延时队列,优先级队列

延时队列

在rabbitmq中可通过过期时间(TTL)和死信队列(DLX)实现延时队列

优先级队列

可设置队列支持的最大优先级,然后发送消息时设置消息的优先级。 但是要注意这种情况只在消费者能力小于生产者,有消息堆积时有效。

        RabbitMQ 支持延时队列和优先级队列,下面分别介绍。

  1. 延时队列:

        RabbitMQ 支持延时队列通过插件 rabbitmq_delayed_message_exchange 来实现。消息在发送到延时交换机上时,会根据消息中所定义的延时时间被放置在不同的延时队列中。在指定的延迟时间结束后,延时队列中的消息将被自动投递到消费者。延时队列常用于订单超时未支付、验证码过期等具有时间敏感性的业务场景。

  1. 优先级队列:

        RabbitMQ 支持优先级队列,其可根据队列中消息的指定优先级进行消费,使得高优先级的消息能够更快地得到处理,提高系统效率。在 RabbitMQ 中,可以通过在创建队列的时候指定队列最大优先级实现。然后在发送消息时给消息设置优先级,以便 RabbitMQ 能够将高优先级的消息尽快交给消费者进行处理,而不是让其在队列中等待。

        总之,延时队列和优先级队列是 RabbitMQ 的两种常见队列类型,主要用于实现具有时间敏感性、优先级别高的业务场景。在进行具体实现时,需要根据业务需求和场景特点选择合理的队列类型,并合理设计队列结构和消息优先级,以提高系统效率和满足具体业务要求。

RabbitMQ的持久化

RabbitMQ的持久化分为3个部分:交换器的持久化、队列的持久化和消息的持久化。

但是如果将所有消息持久化,将会严重影响rabbitmq的性能。

即使将交换器、队列和消息都设置了持久化,也不能保证消息100%不丢失。

  1. 从消费者端,防止消费者收到消息还没来得及处理就宕机的情况,需要将autoAck设置为false

  2. 从生产者断,防止发送到rabbitmq后还没来的及落盘rabbitmq就宕机的情况, 可以在生产者端引入事务机制或者发送方确认机制来保证消息己经正确地发送并存储RabbitMQ 中 (前提还要保证在调用 channel.basicPublish 方法的时候交换器能够将消息正确路由到相应的队列之中)或者引入镜像队列来保证高可用

        RabbitMQ 是一种可靠的消息中间件,支持消息的持久化。持久化是指 RabbitMQ 可以将消息持久化到磁盘中,即使 RabbitMQ 服务停止或者发生故障,也能够保证消息不会丢失。RabbitMQ 消息的持久化包含以下两个方面:

        1. 持久化交换器(Exchange)和持久化队列(Queue):

        在 RabbitMQ 中,通过将交换器和队列都设置为持久化来保证在 RabbitMQ 服务停止或者发生故障时不会丢失。这样,在 RabbitMQ 服务重新启动时,持久化的交换器和队列会被重新创建,可保留之前的数据。在创建交换器和队列时,需要设置 durable 属性为 true,例如以下代码示例:

  1. channel.exchangeDeclare("exchange_name", "direct", true); // 创建持久化交换器
  2. channel.queueDeclare("queue_name", true, false, false, null); // 创建持久化队列

        2. 持久化消息(Message):

        当使用持久化交换器和持久化队列时,消息发送到队列里的时候,也需要将消息标记为持久化。在发送消息时,需要设置 MessageProperties.PERSISTENT_BASIC 属性为 true,例如以下代码示例:

  1. byte[] messageBodyBytes = "Hello, RabbitMQ!".getBytes("UTF-8");
  2. channel.basicPublish("exchange_name", "routing_key", MessageProperties.PERSISTENT_BASIC, messageBodyBytes);

        需要注意的是,持久化操作会对系统性能产生一些影响。因为将消息持久化到磁盘上需要较多的 I/O 操作。需要根据实际情况,权衡性能和数据可靠性的需求。

总之,RabbitMQ 的持久化功能能够保证消息的可靠性,但需要在建立队列时和发送消息时显式指定消息的持久化状态。在实际应用中,需要根据具体的业务特点和性能需求合理使用消息持久化功能。

消息的有序性?

资料上说rabbitmq可以保证消息的有序,其实是不严谨的。

在不使用任何 RabbitM 高级特性 ,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下

如果有多个生产者同时发送消息,无法确定消息到达 Broker 的前后顺序,也就无法验证消息的顺序性。 需要业务方使用 RabbitMQ 之后做进一步的处理,比如在消息体内添加全局有序标识(类似SequenceID) 来实现。

        实际上,RabbitMQ 默认是不能保证消息的顺序性的,因为 RabbitMQ 的消息是以并发的方式处理的。但是,以下几种方法可以实现消息的有序消费。

        1. 消息发送到同一个队列:将同一类型的消息发送到同一个队列中,并限制同时只有一个消费者才能从该队列中消费消息。这种方式可以保证按照消息发送的顺序来消费消息。

        2. 消息发送到同一个 Exchange,使用 Routing Key 分发消息到不同 Queue:在这种情况下,每个 Queue 只有一个消费者,消费者之间不会造成竞争消费,这将保证按照消息的发送顺序消费。

        3. 消息发送到不同 Exchange,每个 Exchange 对应一个 Consumer:这种方式类似于第二种,但是每个消费者订阅的是不同的 Exchange,所有的 Exchange 都绑定到同一个 Queue 上,这将保证顺序。

        需要注意的是,当消费者在处理一个任务时,并发,以及需要严格按序处理的情况下,需要将相关的任务(可能是一个消息或者多个有序的消息)封装到一个队列或者一个消息组中,让消费者一次性地消费这些有序的任务。

        总结起来,RabbitMQ 并不能直接保证消息的顺序,但是可以通过将消息发送到同一队列、同一 Exchange 的不同 Queue 或者绑定不同 Exchange 的相同 Queue 等方式来保证消息的有序性。同时,为了实现需要严格按序处理的任务,需要将相关任务封装到一个队列或者一个消息组中。

惰性队列

RabbitMQ 3.6.0 版本开始引入了惰性队列 lazy Queue 的概念。

惰性队列会尽可能地将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中, 它的一个重要的设计目标是能够支持更长的队列即支持更多的消息存储 。当消费者由于各种各样的原因(比如消费者下线、宕机或者由于维护而关闭等) 致使长时间内不能消费消息而造成堆积时惰性队列就很有必要了。

        惰性队列(Lazy Queue)是 RabbitMQ 3.8.x 版本引入的一种新的队列类型,可以在需要时才实际分配内存空间,可以有效的提高队列的创建效率,减少内存占用。

在传统的队列中,消息在进入队列时会立即分配内存,这样会在队列被创建时就消耗一定的内存。而对于惰性队列,仅在队列中开始接收消息后才实际分配内存。

惰性队列适用于以下场景:

  1. 队列中存储的消息数量较大,但这些消息并不会立即被消费。

  2. 队列的创建速度需要更快,例如大量队列同时创建时,使用惰性队列可以减少队列的创建时间。

使用惰性队列需要注意以下几点:

        1. 惰性队列需要在队列创建时指定 x-queue-mode 参数,值为 lazy,例如以下代码示例:

channel.queueDeclare("queue_name", true, false, false, Collections.singletonMap("x-queue-mode", "lazy"));

        2. 惰性队列会在队列中的消息达到一定数量或者一定内存占用量后,才会实际分配内存空间,因此可能会出现一定的延迟。

        3.惰性队列虽然能够减少内存占用,但是可能会对队列的吞吐量和性能产生一定的影响。在实际应用中需要根据具体情况进行评估和调优。

        总之,惰性队列是 RabbitMQ 3.8.x 版本引入的一种新的队列类型,能够在需要时才实际分配内存空间,可以有效的提高队列的创建效率,减少内存占用。在实际应用中,需要根据具体业务场景和需求,权衡惰性队列的使用。

RabbitMQ 组件

Broker:一个Broker可以看做一个RabbitMQ服务器

  • Connection:实际的连接(TCP连接)

  • Channel:虚拟连接。一个Connection上可以有多个Channel(信道),信道没有数量限制,消息在信道上传输。

  • Message

  • Virtual host:虚拟broker。其内部含有独立的queue、exchange、binding,以及独立的权限系统

  • Exchange:交换机。生产者将消息发送到交换机,由交换机将消息路由到一个或多个队列中。路由不到时,返回给生产者或直接丢弃(mandatory 为 true,返回消息给生产者;为 false 丢弃消息)

  • Queue:存储消息的数据结构。可以设置长度,处于ready状态的消息会被计数,不统计处于unack的消息。消息重新入队会处于队头。多个消费者可以订阅同一个队列,队列中的消息会平摊给各个消费者处理

        RabbitMQ 是一个开源的消息队列系统,由多个组件构成。以下是 RabbitMQ 的一些核心组件:

  1. Producer:消息生产者,用于向 RabbitMQ 服务器发送消息。Producer 将消息发送到交换器(Exchange)上,并指定消息的 routing key。

  2. Exchange:消息交换器,用于接收来自 Producer 发送的消息,并根据路由键(routing key)将消息路由到一个或多个与之绑定的队列。Exchange 有常用的四种类型:direct、topic、fanout 和 headers。

  3. Queue:消息队列,用于存储消息。Queue 可以由消费者(Consumer)或者自动队列(Auto-delete Queue)等对其进行消费。

  4. Binding:交换器和队列之间的绑定关系。绑定可以由通配符(*和#)或精确的路由键(routing key)来决定消息从交换器路由到队列的规则。

  5. Consumer:消息消费者,用于从 RabbitMQ 服务器获取消息并进行处理。一个消费者可以消费一个或多个队列中的消息。

  6. Virtual Host:虚拟主机,用于将 RabbitMQ 服务器分隔成多个逻辑部分。每个 Virtual Host 都是相互隔离的,拥有自己的队列、交换器和一组权限规则等。

  7. Connection:连接,用于在客户端和 RabbitMQ 服务器之间建立 TCP 连接并进行会话交互。

  8. Channel:通道,用于在连接上创建交换器、队列等,以及发送和接收消息等操作。一个连接可以打开多个通道。

        总之,RabbitMQ 的核心组件包括消息生产者、消息交换器、消息队列、绑定关系、消息消费者、虚拟主机、连接和通道等。每个组件都扮演着不同的角色,共同构成一个完整的消息系统,为分布式系统中的消息传递提供了可靠的解决方案。

RabbitMQ工作模型

  1. 简单队列(Simple Queue):生产者将消息投递到队列里,消息者从队列里取消息

  2. 工作队列(Worker Queue):一个生产者,一个队列,多个消费者,用于消息消费耗时的场景

  3. 发布订阅(fanout)

  4. Direct:消息根据路由键投递相应的队列中

  5. Topic:消息根据路由键和绑定键的匹配,投递到相应的队列中。如果多个键匹配成功,且目标队列是同一个队列,队列只会收到一条消息

  6. Headers:用键值对做匹配,匹配方式有all和any,不常使用

其中3-6是交换机的四种工作模式(四种交换机类型)

        RabbitMQ 是一种 AMQP(Advanced Message Queuing Protocol)客户端,采用的是生产-消费者模型。

在 RabbitMQ 中,消息的传递流程大致如下:

  1. Producer 将消息发送到 Exchange 上,Exchange 根据 Binding(绑定关系)将消息路由到对应的 Queue 上。

  2. Consumer 从指定的 Queue 中获取消息并进行处理。

        RabbitMQ 的工作模型主要包括以下两种:

  • 简单模型:

        在简单模型中,Producer 将消息发送到 Queue 上,而消息消费者(Consumer)则从该 Queue 上获取消息并进行处理。

  • 工作队列模型:

        在工作队列模型中,多个 Consumer 可以同时消费来自同一个队列的消息,Queue 中的消息将在多个 Consumer 之间(轮询)分摊处理。例如,可以使用 RabbitMQ 在实现的消息队列解决任务分发和负载均衡问题。

        需要注意的是,在 RabbitMQ 中,Exchange 和 Binding 决定了消息的路由策略,不同的策略会导致不同的工作模型。例如,使用 Direct Exchange 和 Binding 可以实现简单模型,而使用 Fanout Exchange 和 Binding 则可以实现广播模型。

        总之,RabbitMQ 是一种 AMQP 客户端,采用生产-消费者模型。在 RabbitMQ 中,Exchange 和 Binding 决定了消息的路由策略,不同的策略会导致不同的工作模型,例如简单模型、工作队列模型和广播模型等。

        RabbitMQ 支持多种工作模式(Work Modes),常见的工作模式包括以下几种:

  1. 简单模式(Simplest Mode):一条发送者发送一条消息给一个接收者。

  2. 工作队列模式(Work Queue Mode):一条发送者发送多条消息给多个接收者。

  3. 发布/订阅模式(Publish-Subscribe Mode):一条发送者发送一条消息到多个队列中,多个接收者从不同的队列中接收消息。

  4. 路由模式(Routing Mode):一条发送者发送一条消息到多个队列中,接收者根据不同的路由关键字(或者绑定键)接收不同的消息。

  5. 主题模式(Topic Mode):一条发送者发送一条消息到多个队列中,接收者根据不同的主题(或者绑定键)接收不同的消息。

        需要注意的是,不同的工作模式在实际应用中有不同的场景和用途。同时,不同的工作模式在设计和实现时需要考虑消息的有序性、消息重复、消息丢失等问题。因此,在选择工作模式时需要结合具体的业务需求、系统架构和可靠性要求等因素进行综合评估。

RabbitMQ消费模式包括哪些?

  1. 使用 channel.basicGet 拉消息

  2. 轮询模型,消费者发送 get 请求获取消息,如果队列中没有消息,则获得空的回复

  3. 需要时才去拉取消息,实时性差,耗费资源(短连接)

Qos(质量服务)

消息发送给消费者后,默认是自动确认,如果消费者未能消费成功,则消息丢失。

通过显式确认可以保证只有当消息处理完成并收到Ack后才从队列中删除。

但是存在的问题是:

1)消息太多全部传给消费者,可能造成消费者内存爆满;

2)消息处理慢时,想让别的消费者一起处理,但是这些消息都被原来的消费者接收了,这些消息不会再发送给新添加的消费者

Qos可以解决上述问题,需要开启消息的显式确认,设置每次传输给消费者的消息条数为n, 消费者处理完n条消息后再获取n条消息进行处理;而新增消费者时,消息可以立即发送给新的消费者。

        在 RabbitMQ 中,常见的消费模式包括以下几种:

  1. 消费者确认模式(Consumer Acknowledge Mode):在 Consumer 从队列中取出消息进行处理后,向 RabbitMQ 服务器发送 ACK(确认)消息,告诉服务器该消息已经成功处理。如果消费者没有确认消息,则 RabbitMQ 会认为该消息未被成功处理,会将它重新发送给消费者或其他消费者再次尝试处理该消息。消费者确认模式是 RabbitMQ 中默认的消费模式。

  2. 批量确认模式(Batch Acknowledge Mode):在处理完多个消息后,一次性向 RabbitMQ 服务器发送 ACK(确认)消息。这种方式可以降低网络负载和通信开销,提高系统的吞吐量。

  3. 自动确认模式(Automatic Acknowledge Mode):在 Consumer 从队列中取出消息进行处理后,RabbitMQ 会自动将该消息标记为已处理。这种方式简化了消息处理的步骤,但是可能会导致消息处理不可靠。

  4. 懒人模式(Lazy Acknowledge Mode):在消费者开始处理消息时,先将该消息标记为未确认状态。当 Consumer 完成消息处理后,再发送确认消息。这种方式将确认操作推迟到了消息处理完成后,可以提高消息的可靠性,但也会增加一定的延迟。

        需要注意的是,不同的消费模式在实际应用中有不同的场景和用途。同时,不同的消费模式在设计和实现时需要考虑消息的可靠性、吞吐量、延迟等问题。因此,在选择消费模式时需要结合具体的业务需求、系统架构和可靠性要求等因素进行综合评估。

多个消费者监听一个队列,消息如何分发?

默认轮训策略

公平分发(QoS):给空闲的消费者发送更多的消息(当消费者有x条消息没有响应时,不再给该消费者发消息)

        在 RabbitMQ 中,当多个消费者同时监听一个队列时,消息的分发方式取决于 RabbitMQ 的消费模式和队列属性设置。

        RabbitMQ 的默认消费模式是 Consumer Acknowledge Mode,也就是当一个消费者从队列中拉取出一条消息进行处理时,这条消息会从队列中被移除并发送给该消费者进行处理。当该消费者确认处理消息后,RabbitMQ 会将已确认的消息从队列中删除。因此,同一个消息只会被同时发送给一个消费者进行处理,避免了重复处理和消息丢失的问题。

        另外,在 RabbitMQ 的队列属性中,有两个非常重要的属性:

  1. Durability(持久化):默认情况下,RabbitMQ 中的队列是非持久化的。如果需要使队列持久化,需要在声明队列时设置 durable 属性为 true。持久化的队列存储在磁盘上,当 RabbitMQ 服务重启时也不会丢失。

  2. Message Acknowledgment(消息确认机制):消息确认机制指的是 Consumer 处理消息后向 RabbitMQ 服务器发送 ACK(确认)消息的方式。如果使用自动确认模式(Automatic Acknowledge Mode),则消息会自动确认;如果使用默认的确认模式(Consumer Acknowledge Mode),则需要在处理消息结束后显式确认消息;如果使用 Lazy Acknowledge Mode,则等到 Consumer 完成消息处理后再发送确认消息。

        综上,当多个消费者监听同一个队列时,RabbitMQ 会控制消息的分发流程,避免了消息的重复处理和丢失。不同的消息确认机制和队列属性设置也会对消息的分发方式产生影响。因此,在实际应用中,需要根据具体场景和需求,设置合适的消费模式和队列属性,实现高效稳定的消息处理。

RabbitMQ如何保证消息的可靠性?

生产者

confirm机制:

  1. 生产者生产的消息带有唯一 ID

  2. 消息被投递到目标队列后,发送Ack消息(包含消息的唯一ID)给生产者

  3. 有可能因为网络问题导致Ack消息无法发送到生产者,那么生产者在等待超时后, 会重传消息;或者RabbitMQ内部错误导致消息丢失,则发送nack消息

  4. 生产者收到Ack消息后,认为消息已经投递成功

队列自身不弄丢消息

队列开启持久化,消息的diliveryMode = 2

队列如何将消息可靠投递到消费者?

手动确认:

  1. 队列将消息push给消费者(或消费者来pull消息)

  2. 消费者得到消息并做完业务逻辑

  3. 消费者发送Ack消息给队列 ,通知队列删除该消息(队列会一直等待直到得到ack消息,队列通过消费者的连接是否中断来确认是否需要重新发送消息,只要连接不中断,消费者有足够长的时间来处理消息,保证数据的最终一致性)

  4. 队列将已消费的消息删除

        RabbitMQ 采用多种机制来保证消息的可靠性,其中包括:

  1. 消息持久化:默认情况下,RabbitMQ 中的消息是非持久化的。如果 Producer 希望发送的消息可以在 RabbitMQ 服务器重启后不会丢失,需要在消息发送时将消息的 Delivery Mode 设置为 2,同时也需要将队列的 durable 属性设置为 true。

  2. 消息确认机制:RabbitMQ 的 Consumer Acknowledge Mode(默认确认模式)和其他几种确认模式可以保证消息的可靠性。当一个 Consumer 从队列里取出一条消息进行处理时,必须发送一个 ACK 通知 RabbitMQ 服务器消息已经成功处理。如果 Consumer 没有发送确认消息就断开了与 RabbitMQ 服务器的连接,那么 RabbitMQ 会将未确认的消息重新发送给其他 Consumer 进行处理。

  3. 消息复制:RabbitMQ 采用了多种复制机制(Replication)来保证消息的可靠性。例如,可以在 RabbitMQ 集群中的多个节点之间进行消息复制,或者将消息复制到备份队列中,以防止数据丢失。

  4. 消息过期:RabbitMQ 可以设置消息的过期时间(TTL),使得消息在一定时间内必须被消费者处理。如果消息在过期时间内没有被处理,RabbitMQ 会将其从队列中删除。

  5. 消息备份:RabbitMQ 可以将消息备份到不同的节点、队列和交换器中,以防止数据丢失和消息处理失败。

        综上所述,RabbitMQ 采用多种机制来保证消息的可靠性,包括消息持久化、消息确认机制、消息复制、消息过期和消息备份等。这些机制可以保障 RabbitMQ 在消息传递过程中的可靠性和稳定性,为分布式系统中的消息通信提供了可靠、高效的解决方案。

如何避免消息重复投递或重复消费?

重复投递的原因:等待超时后,需要重试。

避免重复投递:消息生产时,生产者发送的消息携带一个Message ID(全局唯一ID), 作为去重和幂等的依据,避免重复的消息进入队列

重复消费的原因:消费者接收消息后,在确认之前断开了连接或者取消订阅, 消息会被重新分发给下一个订阅的消费者。

避免重复消费:消息消费时,要求消息体中必须要有一个全局唯一ID, 作为去重和幂等的依据,避免同一条消息被重复消费

        在 RabbitMQ 中,避免消息重复投递或重复消费通常需要从以下两个方面来考虑:

  1. Producer 端控制

        (1)消息幂等性:可以通过为每条消息分配唯一的 ID,Producer 在发送消息前,检查该 ID 是否已经存在,避免重复发送相同的消息。

        (2)消息去重:可以使用缓存或者数据库对消息进行去重处理,确保每条消息仅发送一次。

  1. 消费端控制

        (1)Acknowledgement(确认机制):Consumer 在处理完一条消息后,必须发送 ACK(确认)通知 RabbitMQ 服务器,否则 RabbitMQ 会将该消息重新投递给其他 Consumer 进行处理。在确认消息之前,Consumer 应该保证成功处理了消息,避免消息丢失和重复处理。

        (2)Message Deduplication(消息去重):Consumer 在处理消息时,可以通过将已经处理过的消息 ID 缓存到内存中,避免重复处理相同的消息。

        (3)Transaction(事务机制):如果 Consumer 需要对消息进行复杂的处理,可以使用 RabbitMQ 的 Transaction 支持,将多个操作封装在一个事务中,以保证消息的可靠性和一致性。

        需要注意的是,以上方法并不是绝对的,无法 100% 避免消息的重复处理或投递。例如,当 Consumer 处理消息失败时,可能会出现消息重复处理的情况。因此,在实际开发中,需要根据具体的业务需求和系统情况,选择合适的措施,实现高效稳定的消息处理。

RabbitMQ消息的状态

   alpha:消息内容(包括消息体、属性和headers) 和消息索引都存储在内存中

  • beta:消息内容保存在磁盘中,消息索引保存在内存中

  • gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都有

  • delta:消息内容和索引都在磁盘中

        在 RabbitMQ 中,每条消息的状态包括以下几个方面:

  1. Unacked(未确认)状态:Consumer 从队列中取出一条消息进行处理时,该消息会进入 Unacked 状态,表示该消息已经被接收但尚未被确认。

  2. Ready(可接收)状态:当队列中还有消息且 Consumer 处于就绪状态时,这些消息会进入 Ready 状态,表示这些消息可以被 Consumer 接收并进行处理。

  3. Acked(已确认)状态:当 Consumer 成功处理一条消息并发送 ACK(确认)消息给 RabbitMQ 服务器后,该消息会进入 Acked 状态,表示该消息已经成功被处理并确认。

  4. Nack(未确认)状态:如果 Consumer 处理消息失败或需要将消息退回队列,可以发送 Nack(否认)通知 RabbitMQ 服务器,该消息会回到 Ready 状态重新进入队列,等待下一个 Consumer 进行处理。

  5. Expired(过期)状态:RabbitMQ 中的消息可以设置 TTL(过期时间),当消息在过期时间内未被处理时,该消息会进入 Expired 状态,并从队列中被删除。

  6. Deleted(已删除)状态:当消息被 Acked 状态或 Expired 状态后,该消息会从队列中删除,进入 Deleted 状态。

        需要注意的是,在 RabbitMQ 中,消息状态是针对 Consumer 而言的,不同的 Consumer 可能会看到不同的消息状态。同时,消息状态也会受到消息确认机制、队列属性等因素的影响,因此在实际操作中需要根据具体需求和系统架构进行综合考虑和处理。

死信完成延迟队列

存储延迟消息,消息被发送后不想让消费者立即拿到消息,要等待特定时间后才能拿到消息。

实现方式

3.6.x之前,死信队列+TTL过期时间可以实现延迟队列。

3.6.x开始,延迟队列插件

应用场景

x天自动确认收货、自动默认评价。

12306购票支付确认页面,如果30分钟没有付款将自动取消订单。

        死信队列是 RabbitMQ 中的一个功能,用来处理无法处理的消息。在 RabbitMQ 中,当一条消息被消费者拒绝(NACK)并重新加入队列时,这条消息可能会被无限次地重新发送给消费者,这将导致消息被反复处理,因此需要一种机制来处理这种“死循环”问题。为此,RabbitMQ 提供了“死信队列”功能来解决这个问题。

        Dead Letter Queue(DLQ,死信队列)是 RabbitMQ 中的一个特殊队列,用于存储不能被消费者正确处理的消息。当消息被重新加入队列并被多次拒绝后,该消息将被“删除”(也就是发送到 DLQ 中),以确保消息顺利地从队列中排除。

        延迟队列是一种特殊的队列,可以在消息被发送到队列时设置消息的过期时间,当消息过期后才会被消费者接收到。结合死信队列和延迟队列,可以实现“消息延迟”和“死信重发”的功能,即将一条消息放入延迟队列中,在指定的时间过后,若消息没有被正常处理,则可将该消息发送到 DLQ 中,避免消息的“死循环”。

        具体实现步骤如下:

  1. 创建一个普通队列,并为该队列设置 TTL。

  2. 将该队列设置为死信队列,即设置 DLX(Dead Letter Exchange)和 DLK(Dead Letter Routing Key),当消息被拒绝或过期时,将该消息发送到 DLX,并指定 DLK。

  3. 创建一个专门用于处理死信消息的 Consumer,该 Consumer 从 DLQ 中消费消息,并进行相应的处理。

        通过将延迟队列和死信队列结合起来使用,可以消除重复处理和消息丢失的问题,同时也可以实现消息的延迟投递和重试,提高消息的可靠性和稳定性。

RabbitMQ集群

普通集群

每台机器上启动一个RabbitMQ实例,而创建的queue只放在一个实例上,其他实例同步queue的元数据。 消费时如果连接到了另一个实例上,则实例会从queue所在的实例上拉取数据 多个实例服务一个queue

镜像集群

RabbitMQ的元数据和queue里的消息都会存在于多个实例上, 每次消息进入队列,会自动把消息同步到多个实例的队列中

分为一个master和多个slave。所有的操作最终都会到master上操作

  1. 生产者可任意选择一个节点连接,如果该节点不是master,则转发给master。 master向slave发送消息,收到半数以上回复后本地提交,再让slave提交

  2. 消费者可任意选择一个节点连接,如果该节点不是master,则转发给master。 消费者消费后进行 ack 确认,master收到ack后删除,并让slave删除。

  3. 如果master掉线,自动选出一个节点(slave中消息队列最长的节点)作为新的master

        在 RabbitMQ 中,有两种类型的集群:普通集群和镜像集群。

  • 普通集群(Regular Cluster)

        普通集群中,多个 RabbitMQ 节点可以共享队列和交换机等信息,这些信息会被同步到所有节点,所有节点具有相同的数据副本。这样可以提高集群的可用性和可靠性,因为如果某个节点宕机,其他节点可以接管该节点上的队列和交换机,从而确保数据不丢失。

普通集群在 RabbitMQ 中已经被广泛使用,可以通过将多个节点组成一个集群来处理大规模的实时消息,提高消息处理的吞吐量和性能。

  • 镜像集群(Mirror Cluster)

        镜像集群是 RabbitMQ 为了进一步提高高可用性而推出的一种解决方案。镜像集群通过将队列和交换机等信息复制到多个节点上,在所有节点中拥有完全一致的数据副本,以达到故障转移和负载均衡等目的。

        镜像集群中,数据被复制到另一个节点,因此可以进行双重备份。当主节点宕机时,可以立即将备份节点提升为主节点,从而保证消息的可用性和可靠性。

        镜像集群具有更高的高可用性和更好的容错性,可以在处理实时消息时提供更强的保障。然而,在使用镜像集群时,需要注意多个节点之间的数据同步和性能消耗等问题,特别是在高负载和高并发情况下,需要适当调整参数和配置,以充分发挥集群的优势。

RabbitMQ如何保证消息的顺序性?

只有一个消费者,可以保证顺序

  • 多个队列,每个队列对应一个消费者,同一个用户的操作hash到同一个队列上

  • 每个消息有一个全局ID,同时去关联一个parentMsgId,在前一条消息未消费时不处理下一条消息

        在 RabbitMQ 中,为了保证消息在消费者端的有序性,需要在 Producer 和 Consumer 两端同时进行一些控制。

  • Producer 控制

        在 Producer 端,为了保证消息的顺序性,可以通过以下两种方式来控制:

        (1)使用同步发送模式:在消息发送时,将 channel.confirmSelect(),开启 Publisher Confirms,然后在发送完一条消息后,调用 channel.waitForConfirms()。该方法将阻塞等待 RabbitMQ 服务器应答,确认消息已被成功处理。如果出现异常情况,则可以根据异常信息进行相应的处理。

        (2)使用 Routing Key 进行分区:可以将相同“分区”的消息发送到同一个队列中,并使得这些消息按照相同的顺序被消费者处理。例如,可以将相同用户的消息发往同一个 Queue,这样消费者在消费该 Queue 时就可以保证消息的顺序性。

  • Consumer 控制

        在 Consumer 端,为了保证消息的顺序性,可以采用以下两种方式进行控制:

        (1)单线程顺序消费:业务处理器采用单线程处理消息的方式,保证消息在业务处理时按照顺序被处理。

        (2)流水线(Pipeline)方式消费:可以将分区的消息分发给多个 Consumer,每个 Consumer 采用独立的线程进行处理,最后再将处理后的结果合并成一条消息。这种方式需要使用线程池对 Consumer 进行管理,同时需要考虑线程切换和性能消耗的问题,以充分发挥多线程的优势。

        需要注意的是,虽然 RabbitMQ 提供了多种方式来保证消息的顺序性,但是在实际操作中,还需要根据业务逻辑和系统架构进行合理的参数调整和配置,避免消息重复消费或超时丢失等问题。

消息积压在消息队列中会导致什么结果?产生的原因是?如何解决?

原因:消费者消费速度慢,或者出现了问题

导致的结果:1)磁盘空间满了;2)海量消息堆积,消费者需要很长时间消费

解决办法:

  • 磁盘空间满的情况:在其他机器上建立临时的消息队列, 再写一个临时的消费者,把积压的消息放到临时队列里去

  • 海量消息堆积的情况:修复消费者问题,停掉现有的消费者, 临时建立10倍的消息队列,再用一个临时的消费者将消息分发到临时消息队列中, 临时征用10倍的机器部署消费者。等积压消息消费完成后,再恢复成之前的架构

        消息队列是应用程序中常用的一种异步消息传递方式,可以解耦应用程序之间的依赖关系,提高应用程序的性能和可伸缩性。然而,在消息队列中常常会出现消息积压(Backlog)的情况,当消息数量过多时会引发一系列问题,下面是其可能的结果和原因,以及对应的解决方法:

        1. 导致消息延迟和处理效率降低

        如果消息队列中的消息积压过多,消费者可能无法及时处理这些消息,导致消息的延迟处理和队列的阻塞等问题。

        原因: 可能是由于生产者消息发送速度过快,或者消费者无法处理这么多的消息。

        解决办法:

  • 提高消费者端的处理速度,增加消费者数量或者调优消费者端代码;
  • 发送者限流,控制消息的发送速率,避免大量消息发送到队列中。

        2. 容易导致消息丢失

        当队列中的消息数量过多时,消息队列可能会因为内存不足或磁盘空间不足而导致消息丢失。

        原因: 可能是因为 RabbitMQ 对磁盘和内存资源的限制导致存储空间不足。

        解决办法:

  • 合理设置队列容量大小,并使用 TTL(Time To Live)控制消息的有效期,避免过期的消息堆积在队列中;
  • 使用镜像队列(Mirror Queue)或数据备份等方式进行持久化存储,保证消息的持久性和可靠性。

        3. 增加网络流量

        当队列中的消息数量增加,会增加流量,可能会导致网络拥塞,从而影响系统性能。

        原因: 由于消息数量增加,网络通信量增加,可能导致网络拥塞,甚至造成网络故障等问题。

        解决办法:

  • 使用流量控制或限速等方式,避免网络拥塞,提高消息传输的效率;
  • 使用 RabbitMQ 集群或分布式部署,将压力分散到多个节点上,提高系统的可扩展性和负载能力。

        综上所述,在实际使用过程中,我们需要根据业务需求、系统负载、硬件条件等因素,合理配置消息队列参数,确保消息的流畅传递,避免消息积压导致的问题。

Kafka和RabbitMQ的对比

保证消息可靠性

  • Kafka 1)使用acks=all,Leader收到消息后,等待所有ISR列表的Follower返回后再发送ack给生产者(分区副本数和ISR最少副本数配置成大于1) 2)生产者使用同步发送消息(默认是异步,多个请求先放在缓冲区,再合并发送) 3)消费者使用手动提交offset,从而保证消息至少消费一次

  • RabbitMQ:

        Kafka 和 RabbitMQ 都是目前流行的消息队列技术,它们在消息可靠性方面有以下对比:

  • Kafka

        Kafka 是一种分布式发布/订阅消息系统,设计初衷是为了处理大量的实时数据,并保证数据的高可靠性和高吞吐量。Kafka 通过副本复制机制来保证数据可靠性,即将数据复制到多个节点上,以确保数据不会丢失。Kafka 保证每个消息被传递一次且仅一次(At most once),也可以通过配置来进行Exactly-Once保证。Kafka 可以处理大量的消息,可以在分布式系统中实现高性能和扩展性。

  • RabbitMQ

        RabbitMQ 是一种开源的分布式消息队列系统,采用 AMQP(Advanced Message Queuing Protocol)协议,支持多种消息投递模式,包括 点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)和发布/确认(Publish/Confirm)等。RabbitMQ 支持持久化消息、流量控制和限流等功能,并通过 ACK 确认机制来保证消息可靠性。消费者可以自行决定消息的确认 Ack 模式,且保证消息有序处理,可以充分满足业务的可靠性需求。

        综合来看,Kafka 和 RabbitMQ 在保证消息可靠性方面各有优劣。

        在消息流量大的情况下,Kafka 的高吞吐量和分布式集群优势更加明显,能够更好地解决高并发、流量瞬间增长等问题。同时 Kafka 也是典型的日志消息系统,适用于需要大量的数据持久化和离线处理的场景,比如日志处理等。

        RabbitMQ 在多个订阅者和容错方面执行得更好,支持保证消息的Exactly-Once语义操作和自定义确认机制等功能,并支持各种订阅模式灵活使用,适用于企业应用的中间件场景,比如电信、金融等行业的消息传递需求。

        总之,Kafka 和 RabbitMQ 在可靠性方面都有各自独特的优势,可以根据业务场景和需求进行选择。

RabbitMQ的优缺点,Kafka的优缺点

RabbitMQ的优缺点

优点

  • 延迟很低(微秒级)

  • 可靠性:使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。

  • 灵活的路由:对于典型的路由功能,RabbitMQ己经提供了一些内置的交换器来实现。 针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。

  • 管理界面 : RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。

  • 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。

  • 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP、MQTT等多种消息中间件协议。

  • 支持多种语言:如Java、Python、Ruby、PHP、C#、JavaScript等。

  • 插件机制 : RabbitMQ提供了许多插件,以实现从多方面进行扩展,也可以自定义插件。

缺点

  • erlang开发,难以维护

  • 相比较于其他消息中间件,吞吐量较低(万级)

Kafka的优点和缺点

优点:

  • 吞吐量十万级

  • 可用性非常高(一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用)

  • 消息可以做到0丢失

缺点:

  • topic增多会导致吞吐量大幅下降(几百个topic),如果要支持大规模topic,需要更多的机器资源

  • 依赖ZooKeeper进行元数据管理(额外的复杂性)

        RabbitMQ 和 Kafka 都是当前非常流行的消息队列系统,提供了高效的消息传递方式和分布式架构,下面分别介绍它们的优缺点:

        1. RabbitMQ

        优点:

  • RabbitMQ 支持多种消息通信模式,包括点对点模式、发布/订阅模式、RPC模式等,具有很高的灵活性;
  • RabbitMQ 提供了各种功能强大而稳定的插件,包括管理管理界面、消息日志、数据备份等;
  • RabbitMQ 提供AMQP协议支持,保证了良好的跨平台性和兼容性,支持多种编程语言,对开发人员友好;
  • RabbitMQ 能够保证消息的可靠性、消息的持久性和完整性,支持流量控制和限流等功能,用于实时数据处理和高并发系统具有优势;

        缺点:

  • RabbitMQ 的性能和吞吐量略低于 Kafka,尤其是在分布式大规模和高负载的场景下;
  • RabbitMQ 在并发性、高可用性和监控管理等方面还有一定的改进空间;
  • 在消息的Exactly-Once语义操作方面会带来不小的性能消耗。

        2. Kafka

        优点:

  • Kafka 的设计理念是高吞吐量、低延迟,比 RabbitMQ 更快速、更可靠;
  • Kafka 对大数据处理有很好的支持,并具有非常好的扩展性和水平扩展性;
  • Kafka 支持动态扩缩容,可以在不停机的情况下增加或减少节点;
  • Kafka 可以实现Exactly-Once语义操作,即Kafka Producer向Broker提交消息时,确保消息不会重复且不会丢失。

        缺点:

  • Kafka 的部署复杂度比 RabbitMQ 高,因为 Kafka 需要 Zookeeper 来运行;
  • Kafka 不适合实时数据处理,适用于批处理和访问数据存储场景;
  • Kafka 对应用程序开发人员不友好,需要了解复杂的生产者和消费者组的配置;
  • 消费者组的rebalance等操作较为复杂。

        综上所述,RabbitMQ 和 Kafka 都具有牛逼的功能特色,但它们的适用场景和性能特点有所不同,在选择时应根据实际需求进行选择。 RabbitMQ 适合于为分布式系统提供简单而可靠的消息通信服务,同时需要保证数据可靠性和消息可靠性;而 Kafka 更适合用于高吞吐量、低延迟数据处理和批量处理。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/569132
推荐阅读
相关标签
  

闽ICP备14008679号