赞
踩
Raft 协议是一种分布式一致性算法,它用于在分布式系统中的多个节点之间达成一致性。Raft 协议的目标是提供一种相对简单、易于理解和实现的方法,以确保在网络分区、节点故障等情况下,系统仍然能够保持一致性和可用性。
应用服务对于请求的处理流程图:
以下是 Raft 协议的核心架构组件和流程:
1、节点角色:
当一个节点启动的时候,需要将自身节点信息注册到集群中Leader节点
2、领导者选举(Leader Election):
3、日志复制(Log Replication):
4、日志压缩(Log Compaction):
5、安全性和一致性:
6、成员变更(Membership Changes):
7、心跳和超时:
8、日志一致性:
Raft 协议的架构设计强调了简单性和易于理解,同时提供了强大的一致性和容错能力。这种设计使得 Raft 成为了许多分布式系统和数据库的首选一致性算法。
角色转换
这幅图是领袖、候选人和群众的角色切换图,我先简单总结一下:
Raft 协议通过这些机制解决了分布式系统中的一致性问题,特别是在领导者选举和日志复制方面。它被广泛应用于各种分布式系统和服务中,例如 etcd(一个分布式键值存储系统),它被用作 Kubernetes 的后端存储。Raft 协议的设计使得它在实际应用中既高效又可靠。
Raft 协议作为一种分布式一致性算法,被广泛应用于需要在多个节点间保持数据一致性的分布式系统场景中。以下是一些典型的 Raft 协议应用场景:
1、分布式存储系统:
Raft 协议被用于分布式存储系统中,以确保数据在多个节点间的一致性和可用性。例如,分布式键值存储(如 etcd、Consul)和分布式数据库(如 TiKV)都采用了 Raft 协议。
2、配置管理服务:
在配置管理服务中,Raft 用于确保集群中的所有节点都能访问到最新的配置信息。例如,Consul 提供了一个服务发现和配置的工具,它使用 Raft 来保证配置的一致性。
3、服务发现和注册:
服务发现和注册系统(如 etcd)使用 Raft 来维护服务实例的注册信息,确保客户端能够发现和连接到正确的服务实例。
4、分布式锁服务:
分布式锁服务需要在多个节点间协调资源的访问,Raft 协议可以帮助实现一个高可用和一致性的分布式锁。
5、分布式任务调度:
在分布式任务调度系统中,Raft 可以用来选举任务调度器的领导者,确保任务分配的一致性和顺序执行。
6、分布式状态机:
Raft 协议可以用来构建分布式状态机,其中每个节点都维护一个状态机的副本,Raft 保证这些状态机的状态一致。
7、分布式日志系统:
分布式日志系统(如 Apache Kafka)可以使用 Raft 来保证日志数据在多个副本之间的一致性。
8、集群管理:
在集群管理工具中,Raft 可以用于选举集群领导者,管理集群状态,以及处理集群成员的加入和退出。
9、分布式事务:
虽然 Raft 本身不直接处理分布式事务,但它可以作为分布式事务协议的一部分,用于保证事务日志的一致性。
Raft 协议因其易于理解和实现,以及在实践中的高效性和可靠性,成为了构建分布式系统时的首选一致性算法之一。在这些应用场景中,Raft 协议帮助系统在面对网络分区、节点故障等分布式系统常见问题时,仍然能够保持数据的一致性和系统的可用性。
Kafka Raft(KRaft)与 Apache ZooKeeper 是两种不同的分布式协调服务,它们在 Kafka 集群中扮演着不同的角色。以下是 KRaft 与 ZooKeeper 的对比:
1、依赖性:
2、一致性协议:
3、性能和可伸缩性:
4、部署和管理:
5、可靠性和可用性:
6、未来发展:
KRaft 模式的主要优势包括:
去中心化:Kafka 集群不再依赖于外部的 ZooKeeper 集群,简化了部署和运维。
性能提升:由于不再需要与 ZooKeeper 进行通信,Kafka 集群的性能得到了提升。
扩展性:KRaft 模式允许 Kafka 集群更灵活地扩展,不再受到 ZooKeeper 集群规模的限制。
一致性和可用性:Raft 协议确保了即使在部分控制器节点失败的情况下,集群的元数据仍然能够保持一致性和可用性。
简化的故障恢复:在 KRaft 模式下,Kafka 集群的故障恢复过程更加简单和直接。
KRaft 模式在 Kafka 3.3.1 版本中被标记为可以在生产环境中使用。这意味着 Kafka 用户现在可以选择 KRaft 模式来部署他们的 Kafka 集群,以获得更好的性能和更简单的运维体验。然而,需要注意的是,KRaft 模式目前仍然是一个相对较新的功能,因此在生产环境中使用时,建议密切关注 Kafka 社区的更新和最佳实践。
关于更多为啥会抛弃Zookeeper的原因可以参考我这篇文章:为何Kafka在2.8版本开始会“抛弃”Zookeeper?
首先来看一下KRaft在系统架构层面和之前的版本有什么区别。KRaft模式提出来去zookeeper后的kafka整体架构入下图是前后架构图对比:
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
修改kafka目录下的config/kraft/server.properties文件。三个服务器都需要修改。 特别注意:每个服务器(broker)上的配置里的node.id必须是数字,并且不能重复。
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This configuration file is intended for use in KRaft mode, where
# Apache ZooKeeper is not present. See config/kraft/README.md for details.
#
############################# Server Basics #############################
# The role of this server. Setting this puts us in KRaft mode
# 节点角色(修改)
process.roles=broker,controller
# The node id associated with this instance's roles
# 节点ID,和节点所承担的角色想关联(修改)
node.id=1
# The connect string for the controller quorum
# 配置标识有哪些节点是 **Quorum** 的投票者节点
controller.quorum.voters=1@192.168.182.110:9093,2@192.168.182.111:9093,3@192.168.182.112:9093
############################# Socket Server Settings #############################
# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://:9092
# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
# 这里我修改了日志文件的路径,默认是在/tmp目录下的
log.dirs=/data/kraft-combined-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
三个broker的配置基本都和上面的配置一样,不同的地方就是node.id:
kraft1:node.id=1
kraft2:node.id=2
kraft3:node.id=3
另外还有两处需要修改。
Process.Roles:
每个Kafka服务器现在都有一个新的配置项,叫做Process.Roles, 这个参数可以有以下值:
如前所述,目前不能在不重新格式化目录的情况下在ZooKeeper模式和KRaft模式之间来回转换。同时充当Broker和Controller的节点称为“组合”节点。
对于简单的场景,组合节点更容易运行和部署,可以避免多进程运行时,JVM带来的相关的固定内存开销。关键的缺点是,控制器将较少地与系统的其余部分隔离。例如,如果代理上的活动导致内存不足,则服务器的控制器部分不会与该OOM条件隔离。
Quorum Voters
系统中的所有节点都必须设置 controller.quorum.voters
配置。这个配置标识有哪些节点是 Quorum 的投票者节点。所有想成为控制器的节点都需要包含在这个配置里面。这类似于在使用ZooKeeper时,使用ZooKeeper.connect配置时必须包含所有的ZooKeeper服务器。
然而,与ZooKeeper配置不同的是,controller.quorum.voters
配置需要包含每个节点的id。格式为: id1@host1:port1,id2@host2:port2。
随便找一个服务器,进入kafka目录,使用kafka-storage.sh生成一个uuid,一个集群只能有一个uuid!!!
./bin/kafka-storage.sh random-uuid
# 这个ID就可以作为集群的ID
# AxAUvePAQ364y4mxggF35w
三个机器上都需要执行
#./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties
./bin/kafka-storage.sh format -t AxAUvePAQ364y4mxggF35w -c config/kraft/server.properties
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
./bin/kafka-topics.sh --create --topic kafkaraftTest --partitions 1 --replication-factor 1 --bootstrap-server 192.168.182.110:9092
查看topic
./bin/kafka-topics.sh --list --bootstrap-server 192.168.182.110:9092
./bin/kafka-topics.sh --describe --topic kafkaraftTest --bootstrap-server 192.168.182.110:9092
有任何疑问也可关注我公众号:大数据与云原生技术分享,进行技术交流,如本篇文章对您有所帮助,麻烦帮忙一键三连(点赞、转发、收藏)~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。