当前位置:   article > 正文

Kafka 详解

kafka支持多级级联吗

http://kafka.apache.org/

Kafka是Apache开发的一款开源流处理平台(网络信息流,日志流, 采样流), 由Scala和Java编写. Kafka是一种高吞吐量的分布式发布订阅消息系统, 一般用作系统间解耦, 异步通讯, 削峰填谷等作用. 此外还提供了流处理插件 Kaka Streaming(类似Storm, Spark, Flink), 并且运行在应用端. 具有简单 , 入门要求低, 部署方便等优点.

应用场景

异步通讯, 减少接口耗时:

用户发起请求-执行注册服务-执行发送短信服务-响应用户 串联调用导致客户等待响应时间较长. 如果使用消息队列, 将会向MQ中新增一条消息, 然后直接响应给客户. 增快了接口的响应速度.减少了资源消耗.

file

系统解耦 :

如果短信业务出现故障, 将直接反映在用户注册服务上. 如果使用MQ, 即使短信服务不可用, 不会影响用户的注册, 最多会延迟受到短信.

削峰填谷:

file

日志产生的流量是不确定的, 高峰期时候会很高, 低谷时期会很低, 当日志很多时, 高流量可能会让流计算无法适应处理, 为了保护流计算, 通过增加中间件MQ缓冲日志流量. 于此类似, 流计算后方的MQ同样也是为了保护数据执行而存在的.

基本概念

消息队列 Message Queue: 消息队列是一种在分布式和大数据开发中不可或缺的中间件. 在分布式开发或者大数据开发中通常使用消息队列进行缓存, 系统间解耦和削封填谷等业务场景, 常见的消息队列的工作模式大致会分为为两大类:

  • 至多一次: 消息生产者将消息写入到MQ, 消费者负责去MQ中拉取消息, 一旦消息被确认消费, 由消息服务器主动删除队列中的数据, 这种消费模式一般只允许被一个消费者消费,并且不能被重复消费
  • 没有限制: 生产者发布完新消息后, 消息可以被多个消费者消费, 并且同一个消费者可以多次消费消息服务器中的同一记录. 主要是因为消息服务器可以存储海量信息.

image-20210117210758904

Kafka 是没有限制的工作模式.

工作形式

生产者向Kafka集群发送消息(Record), 每个Record 只能属于一个Topic(是消息的分类,比如短信主题, 用户主题), 但是一个消费者可以消费多个Topic. 每个Topic底层都会对应一组分区的日志, 这些分区日志用来持久化Record消息,并分片存储, 默认的分区策略为 hash(msgKey)%集群数量.

集群中的每个Kafka进程又称作Broker, 每个分区的数据将会分散存储到各个Broker中, 并且总有一个充当Leader(负责分区读写), 其他Broker充当Follower(负责分区的数据备份). 如果Leader宕机, Zookeeper就会重新在Follower中选举一个Broker充当新的Leader.

集群中的Leader的监控, 以及Topic的部分元数据是存储在Zookeeper集群中的.

image-20210117212057579

Topic分区&日志

Kafka中所有消息是通过Topic为单位进行管理, 每个Kafka中的Topic通常会有多个订阅者, 负责订阅发送到该Topic中的数据, Kafka负责管理集群中每个Topic的一组日志分区.

生产者将数据发送到相应的Topic, 负责选择将哪条记录发送到哪个Partition. 不仅可以使用轮询平衡负载, 可以根据某些语义进行分区.

每组日志分区是一个有序的不可变的日志队列, 分区中的每个Record都被分配了唯一的序列编号成为offset, Kafka集群会持久化所有发布到Topic中的Record信息, 每个Record的持久化时间由配置: log.retention.hours=168 决定的, 默认是168小时, 也就是7天.

Kafka地层会定期检查日志文件, 然后将过期数据从log移除, 由于Kafka 使用硬盘存储日志文件, 因此使用Kafka缓存一些日志文件不存在问题.

image-20210117220055179

由上图可知:

  1. Topic 包含 多个分区 Partion
  2. 一个Partition分区是一个队列
  3. 分区左侧是最早的消息, 右侧则是最新的消息
  4. Kafka只能保证同一分区中的消息的顺序, 而无法保证不同分区之间的顺序, 如果要保证Topic中的所有顺序, 可以只设置一个分区
  5. 每个Record在分区中都有一个唯一的标志: offset, 值越小, 进入队列的时间就越早

为什么要设置分区:

  1. 打破单机存储的容量限制
  2. 分区数将数据拆分后给多个Broker处理, 从某种程度上,提高了Kafka的写入性能, 所以可以应用到高并法以及大数据方向

生产者&消费者

Kafka是如何记录每一个消费者 读取到哪个偏移量的?

在消费者消费Topic中的数据的时候, 每个消费者会维护本次消费者对应的分区的偏移量, 消费者消费完一个分区的数据后, 会将本次的消费偏移量提交给Kafka集群. 因此每个消费者都可以随意控制更改偏移量. 因此多个消费者之间彼此相互独立.

其中每个分区又分为Leader与Follower, 集群的负载可以得到很好的平衡.

image-20210118143412749

消费者和消费者组:

在Kafka中,消费者使用Consumer Group名称来标记自己, 并且发布到Topic的每条记录都会传递到每个订阅Consumer Group的一个消费者实例. 如果所有的Consumer实例具有不同的ConsumerGroup, 则每条记录将广播到所有的Consumer进程. 如果所有的Consumer示例具有相同的ConsumerGroup, 则Topic中的消息记录会在ConsumerGroup中被多个Conusmer均分消费(如上图, 消费组A内每个Consumer均分消费了两个分区).

每个Topic中含有相对较少的ConsumerGroup, 一个ConsumerGroup可以理解为一个’逻辑订阅者‘, 每个ConsumerGroup又由多个Conusmer组成, 实现了消费者的伸缩以及容错能力, 提高了消费者的可用性. ConsumerGroup中的所有Consumer实例,往往是一个服务集群. 每个消费组中的消费者的数量往往不会超过Topic中的分区的数量, 如果消费组中的Consumer实例超过了该Topic的分区的数量, 那么多余的Consumer会闲着. 但是当ConsumerGroup中的某个服务出现故障, 那么空闲的Consumer就会去顶替该服务进行服务消费.

所以Kafka不仅可以提高消息通讯的写入能力,还可以提升消费者的消费能力, 以及消息存储的分区能力.

顺序写入&ZeroCopy

Kafka具有高吞吐率, 即使工作在普通服务器上, Kafka也可以轻松支持美妙百万的级的写入请求, 超过了绝大多说的消息中间件. 所以他在日志处理等海量数据场景广泛应用. Kafka还会将信息收集到磁盘中,防止数据丢失.为了优化写入速度 Kafka采用了两个技术: 顺序写入 和 MMFile. (Memery Mapped File 内存映射文件) .

顺序写入:

因为硬盘是机械结构, 每次读写都会 寻址-> 写入,其中寻址是一个机械动作. 他是最耗时的. 所以硬盘最讨厌随机I/O. 最喜欢顺序IO. 这样省略了大量内存开销以及节省了IO寻址时间. 但是单独的使用顺序写入, Kafka的内存性能是无法与内存比较的.

零拷贝:

Kafka服务器在响应客户端读取的时候, 底层使用另拷贝技术, 直接将数据通过内核空间传递输出, 数据并没有抵达用户空间.

image-20210120204055459

环境搭建

单机环境

下载Kafka二进制安装文件, 解压到指定路径, 可以看到Kafka目录结构如下:

  1. bin
  2. config
  3. libs
  4. site-docs

在config中配置server.properties:

  1. ############################# Server Basics #############################
  2. # The id of the broker. This must be set to a unique integer for each broker.
  3. # 集群的kafka进程结点, 也就是broker的唯一标志, 由于是单机环境, 所有无需修改
  4. broker.id=0
  5. ############################# Socket Server Settings #############################
  6. # The address the socket server listens on. It will get the value returned from
  7. # java.net.InetAddress.getCanonicalHostName() if not configured.
  8. # FORMAT:
  9. # listeners = listener_name://host_name:port
  10. # EXAMPLE:
  11. # listeners = PLAINTEXT://your.host.name:9092
  12. # kafka服务器底层监听的地址, 这里要填写主机名
  13. listeners=PLAINTEXT://server1:9092
  14. ############################# Log Basics #############################
  15. # A comma separated list of directories under which to store log files
  16. # log.dirs=/tmp/kafka-logs
  17. # kafka 日志存储路径
  18. log.dirs=/var/kafka
  19. ############################# Log Retention Policy #############################
  20. # The following configurations control the disposal of log segments. The policy can
  21. # be set to delete segments after a period of time, or after a given size has accumulated.
  22. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
  23. # from the end of the log.
  24. # The minimum age of a log file to be eligible for deletion due to age
  25. # 日志会保留168小时,也就是七天
  26. log.retention.hours=168
  27. ############################# Zookeeper #############################
  28. # Zookeeper connection string (see zookeeper docs for details).
  29. # This is a comma separated host:port pairs, each corresponding to a zk
  30. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
  31. # You can also append an optional chroot string to the urls to specify the
  32. # root directory for all kafka znodes.
  33. # 配置zookeeper服务, 单台
  34. zookeeper.connect=server1:2181
  35. # Timeout in ms for connecting to zookeeper
  36. zookeeper.connection.timeout.ms=18000

启动Kafka, 运行./bin/kafka-server-start.sh -daemon config/server.properties命令 , 其中 -daemon 代表后台运行服务. 运行./bin/kafka-server-stop.sh即可优雅关闭kafka服务.

非后态启动可以看日志,方便看报错信息

集群环境

同单机环境类似, 但是需要修改Brokerid, 比如四台机器, 其Brokerid为 0,1,2,3, 依次启动每个结点的BrokerId即可启动Kafka集群环境.

Topic管理

创建Topic

  1. # kafka-topics.sh 该命令用于Topic管理
  2. # --bootstrap-server 用来指定kafka服务地址
  3. # --create 创建操作
  4. # --topic 目标主题
  5. # --partitions 3 三个分区, 因为这里是三台机器
  6. # --replication-factor 2, 副本因子为2,每个分区在整个集群中共有两个备份, 并且是分散的
  7. bin/kafka-topics.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --create --topic Topic01 --partitions 3 --replication-factor 2
  8. Created topic Topic01. # 返回此信息代表创建成功

查看集群中Topic列表

bin/kafka-topics.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --list

查看分区备份

  1. # 进入kafaka日志目录,这里我配置的是 /var/kafka-logs/
  2. cd /var/kafka-logs/
  3. # borker0
  4. ll
  5. drwxr-xr-x 2 root root 141 124 18:20 Topic01-0
  6. drwxr-xr-x 2 root root 141 124 18:20 Topic01-2
  7. # broker1
  8. drwxr-xr-x 2 root root 141 124 18:20 Topic01-0
  9. drwxr-xr-x 2 root root 141 124 18:20 Topic01-1
  10. # broker2
  11. drwxr-xr-x 2 root root 141 124 18:20 Topic01-1
  12. drwxr-xr-x 2 root root 141 124 18:20 Topic01-2

查看主题的详细信息

  1. bin/kafka-topics.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --describe --topic Topic01
  2. # Topic01 共有3个分区, 两个副本因子
  3. Topic: Topic01 PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
  4. # 分区0, 的备份的主结点也就是master 存储在 broker2, 共有两个副本, broker0 和 broker2
  5. Topic: Topic01 Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
  6. # 分区1, 的备份的主结点也就是master 存储在 broker1, 共有两个副本, broker1和 broker2
  7. Topic: Topic01 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
  8. # 分区2, 的备份的主结点也就是master 存储在 broker0, 共有两个副本, broker0 和 broker1
  9. Topic: Topic01 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1

修改Topic01分区数量

  1. # 将topic01的分区数量由3改为2
  2. bin/kafka-topics.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --alter --topic Topic01 --partitions 2
  3. Error while executing topic command : Topic currently has 3 partitions, which is higher than the requested 2.
  4. [2021-01-24 20:56:06,670] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 2.
  5. (kafka.admin.TopicCommand$)

注意, Topic的分区数量只能增大, 不能减小.

删除Topic

bin/kafka-topics.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --delete --topic Topic01

发布与订阅消息

组 g1 订阅消息

./bin/kafka-console-consumer.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --group g1 --topic Topic01 --property print.key=true --property print.value=true --property key.separator=,

稍后会进入阻塞状态,用来接受Topic01的消息.

  • --group g1 指定消费组
  • --property print.key=true 打印消息的key
  • --property print.value=true 打印消息的alue
  • --property key.separator=,,指定key和alue的分割符为逗号

发布消息到主题 Topic01

./bin/kafka-console-producer.sh --broker-list s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --topic Topic01

会进入控制台输入状态, 发送消息内容.

消费者组列表查看

  1. ./bin/kafka-consumer-groups.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --list
  2. g1

查看消费者组g1组的详细信息

  1. ./bin/kafka-consumer-groups.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --describe --group g1
  2. # 组名 | 主题 | 分区 | 数据消费情况:当前位置 | 数据消费情况:下一个位置 | 上两个之值之间的差值,代表未消费的数量 | 消费者id | 消费者主机ip | 客户端id
  3. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  4. g1 Topic01 0 1 1 0 consumer-g1-1-cffe946e-eb01-48ae-99c9-28d2499bca41 /192.168.1.112 consumer-g1-1
  5. g1 Topic01 1 2 2 0 consumer-g1-1-cffe946e-eb01-48ae-99c9-28d2499bca41 /192.168.1.112 consumer-g1-1
  6. g1 Topic01 2 0 0 0 consumer-g1-1-cffe946e-eb01-48ae-99c9-28d2499bca41 /192.168.1.112 consumer-g1-1

消费偏移量offset控制

初始订阅消费偏移量策略

当消费者第一次订阅某个Topic时, 在系统中并不存在该消费者的消费分区的偏移量。Kafka通过属性auto.offset.reset属性来确定第一次的偏移量:

  • latest, 默认值,将偏移量重置为最新的偏移量(也就是订阅时,分区的长度),设置此模式时,消费者只能消费到订阅后的消息
  • earliest, 自动将偏移量设置为最早的偏移量, 设置此模式时, 新消费者会从头消费指定消费分区的消息
  • none,如果未找到消费者之前的偏移量,则抛出异常
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

消费者自动提交offset维护偏移量

Kafka消费者在消费数据时,默认会提交消费的偏移量, 这样就可以保证所有消息至少可以被消费者消费一次, 可以通过以下两个参数对偏移量同步进行配置:

  1. enable.auto.commit = true // 自动提交
  2. auto.commit.interval.ms = 5000 // 每隔5s提交一次offset
  1. prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  2. prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

手动提交偏移量

关闭自动提交策略:

prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交偏移量:

  1. // 此次消费完毕
  2. Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  3. offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
  4. // 消费者异步提交offset,需要一个OffsetCommitCallback回调
  5. consumer.commitAsync(offsets, (offsets1, exception) -> System.out.println("完成:" + record.offset() + "提交!"));

Acks应答与Retries

Kafka生产者在发送完一个的消息之后要求Broker在规定的额时间Ack应答,如果没有在规定时间内应答,Kafka生产者会尝试n次重新发送消息。

Acks应答策略

  • acks=0: 生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区中并视为已发送。在这种情况下,不能保证服务器已收到记录。(一个不确认都行,性能高,比如日志收集)
  • acks=1: Leader会将Record写到其本地日志中,但会在不等待所有Follower的完全确认的情况下做出响应。在这种情况下,如果Leader在确认记录后立即失败,但在Follower复制记录之前失败,则记录将丢失。(有个Leader确认就行了)
  • acks=all:意味着Leader将等待全套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。这等效于acks = -1设置。

Retries重试策略

如果生产者在规定的时间内,并没有得到Kafka的Leader的Ack应答,Kafka可以开启reties机制, 生产者开始重试发送。默认策略如下:

  1. # 默认,如果brokers的leader在30s内不给回复
  2. request.timeout.ms = 30000
  3. # 生产者就会重试2147483647次,持续发送
  4. retries = 2147483647

1611992740179

acks&retries作用

用来最大程度的保证生产者的数据发送到Broker上。但是可能会导致重复数据的产生,以及数据发送的效率。

配置acks和retries策略

  1. prop.put(ProducerConfig.ACKS_CONFIG, "all"); // Ack策略
  2. prop.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1"); // 如果1ms内没有ack回应,则重试
  3. prop.put(ProducerConfig.RETRIES_CONFIG, "10"); // 重试十次

保证幂等性

HTTP/1.1中对幂等性的定义是:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

Kafka在0.11.0.0版本支持增加了对幂等的支持。幂等是针对生产者角度的特性。幂等可以保证生产者发送的消息,不会丢失(acks和retries),而且不会重复(唯一标识)。实现幂等的关键点就是服务端可以区分请求是否重复,过滤掉重复的请求。要区分请求是否重复的有两点:

唯一标识

要想区分请求是否重复,请求中就得有唯一标识。例如支付请求中,订单号就是唯一标识。记录下已处理过的请求标识:光有唯一标识还不够,还需要记录下那些请求是已经处理过的,这样当收到新的请求时,用新请求中的标识和处理记录进行比较,如果处理记录中有相同的标识,说明是重复记录,拒绝掉

Kafka幂等

幂等又称为exactly once。要停止多次处理消息,必须仅将其持久化到Kafka Topic中仅仅一次。在初始化期间,kafka会给生产者生成一个唯一的ID称为Producer ID或PID。PID和序列号与消息捆绑在一起,然后发送给Broker。由于序列号从零开始并且单调递增,因此,仅当消息的序列号比该PID / TopicPartition对中最后提交的消息正好大1时,Broker才会接受该消息。如果不是这种情况,则Broker认定是生产者重新发送该消息。

1611994338386

配置Kafka幂等

  1. # 默认不开启,为false
  2. # enable.idempotence= false
  3. enable.idempotence= true
  4. # 开启幂等,必须设置acks为all以及开启重试
  1. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等性
  2. props.put(ProducerConfig.ACKS_CONFIG, "all"); // acks机制必须设置为all
  3. props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试必须大于0,3是代表不算第一次,如果第一次失败了就再试三次
  4. props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 生产者在发送数据时如果多于n个记录未被应答,则客户端(生产者)会被阻塞。 这个值默认是5. kafka幂等配置需要此数值小于等于5

Kafka-Eagle 监控

安装文档:

安装

下载kafka-eagle安装包,解压到目标路径,这里是 /root/kafka-eagle-web-1.4.0

  1. 设置环境变量: export KE_HOME=/root/kafka-eagle-web-1.4.0, export PATH=$PATH:$KE_HOME/bin

  2. 配置conf/system-config.properties, kafka连接的zookeeper集群,用来读取kafka集群信息:

    1. kafka.eagle.zk.cluster.alias=cluster1
    2. cluster1.zk.list=s1.svc.com:2181,s2.svc.com:2181,s3.svc.com:2181,s3.svc.com:2181
  3. 开启报表图,需要kafka开启jmx端口,让其监听:

    kafka.eagle.metrics.charts=true

    修改kafka的 kafka-server-start.sh,开启jmx端口,让其检测性能指标使用:

    1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    2. export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    3. # 增加此行
    4. export JMX_PORT="7788"
    5. fi

    重启kafka。

  4. 配置topic删除密码:

    kafka.eagle.topic.token=keadmin
  5. 配置kafka-eagle连接的数据库, 默认使用sqllite,可以替换成mysql:

    1. kafka.eagle.driver=org.sqlite.JDBC
    2. kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
    3. kafka.eagle.username=root
    4. kafka.eagle.password=www.kafka-eagle.org
    5. ######################################
    6. # kafka mysql jdbc driver address
    7. ######################################
    8. #kafka.eagle.driver=com.mysql.jdbc.Driver
    9. #kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNu
    10. ll
    11. #kafka.eagle.username=root
    12. #kafka.eagle.password=123456

启动与使用

ke.sh start

访问界面即可,日志会打印访问地址。 默认用户名密码为admin, 123456

Dashboard:

image-20210131132646280

Topic:

image-20210131133301115

Consumers:

image-20210131133452946

Cluster:

image-20210131133626556

image-20210131134006447
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/1009605
推荐阅读
相关标签
  

闽ICP备14008679号