当前位置:   article > 正文

Kafka_kafka配置用户名密码

kafka配置用户名密码

1.Kafka简介

消息队列

  • 消息队列——用于存放消息的组件
  • 程序员可以将消息放入到队列中,也可以从消息队列中获取消息
  • 很多时候消息队列不是一个永久性的存储,是作为临时存储存在的(设定一个期限:设置消息在MQ中保存10天)
  • 消息队列中间件:消息队列的组件,例如:Kafka、Active MQ、RabbitMQ、RocketMQ、ZeroMQ

Kafka的应用场景

  • 异步处理
    • 可以将一些比较耗时的操作放在其他系统中,通过消息队列将需要进行处理的消息进行存储,其他系统可以消费消息队列中的数据
    • 比较常见的:发送短信验证码、发送邮件

  • 系统解耦
    • 原先一个微服务是通过接口(HTTP)调用另一个微服务,这时候耦合很严重,只要接口发生变化就会导致系统不可用
    • 使用消息队列可以将系统进行解耦合,现在第一个微服务可以将消息放入到消息队列中,另一个微服务可以从消息队列中把消息取出来进行处理。进行系统解耦

  • 流量削峰
    • 因为消息队列是低延迟、高可靠、高吞吐的,可以应对大量并发

  • 日志处理
    • 可以使用消息队列作为临时存储,或者一种通信管道

消息队列的两种模型

  • 生产者、消费者模型
    • 生产者负责将消息生产到MQ中
    • 消费者负责从MQ中获取消息
    • 生产者和消费者是解耦的,可能是生产者一个程序、消费者是另外一个程序
  • 消息队列的模式
    • 点对点:一个消费者消费一个消息
    • 发布订阅:多个消费者可以消费一个消息

Kafka集群搭建

  • 使用docker安装kafka
  1. version: '3'
  2. services:
  3. zookeeper-kafka:
  4. image: wurstmeister/zookeeper
  5. restart: unless-stopped
  6. hostname: zookeeper-kafka
  7. container_name: zookeeper-kafka
  8. ports:
  9. - 2181:2181
  10. volumes:
  11. - ./data:/data
  12. - ./datalog:/datalog
  13. kafka:
  14. image: wurstmeister/kafka:2.12-2.4.1
  15. depends_on:
  16. - zookeeper-kafka
  17. container_name: kafka
  18. ports:
  19. - 9092:9092
  20. environment:
  21. KAFKA_ADVERTISED_HOST_NAME: kafka
  22. KAFKA_BROKER_ID: 0
  23. KAFKA_ZOOKEEPER_CONNECT: 192.168.200.135:2181
  24. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.135:9092
  25. KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
  26. restart: unless-stopped
  27. volumes:
  28. - ./docker.sock:/var/run/docker.sock

ip要改成自己的ip 云服务器需要开放端口 出现报错可以查看docker容器日志排查

  • 搭建集群过程

1. 将Kafka的安装包上传到虚拟机,并解压

cd /export/software/

tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/

cd /export/server/kafka_2.12-2.4.1/

2. 修改 server.properties

cd /export/server/kafka_2.12-2.4.1/config

vim server.properties

# 指定broker的id

broker.id=0

# 指定Kafka数据的位置

log.dirs=/export/server/kafka_2.12-2.4.1/data

# 配置zk的三个节点

zookeeper.connect=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181

3. 将安装好的kafka复制到另外两台服务器

cd /export/server

scp -r kafka_2.12-2.4.1/ node2.itcast.cn:$PWD

scp -r kafka_2.12-2.4.1/ node3.itcast.cn:$PWD

修改另外两个节点的broker.id分别为1和2

---------node2.itcast.cn--------------

cd /export/server/kafka_2.12-2.4.1/config

vim erver.properties

broker.id=1

--------node3.itcast.cn--------------

cd /export/server/kafka_2.12-2.4.1/config

vim server.properties

broker.id=2

4. 配置KAFKA_HOME环境变量

vim /etc/profile

export KAFKA_HOME=/export/server/kafka_2.12-2.4.1

export PATH=:$PATH:${KAFKA_HOME}

分发到各个节点

scp /etc/profile node2.itcast.cn:$PWD

scp /etc/profile node3.itcast.cn:$PWD

每个节点加载环境变量

source /etc/profile

5. 启动服务器

# 启动ZooKeeper

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

# 启动Kafka

cd /export/server/kafka_2.12-2.4.1

nohup bin/kafka-server-start.sh config/server.properties &

# 测试Kafka集群是否启动成功

bin/kafka-topics.sh --bootstrap-server node1.itcast.cn:9092 --list

  • Kafka集群是必须要有ZooKeeper的

注意:

  • 每一个Kafka的节点都需要修改broker.id(每个节点的标识,不能重复)
  • log.dir数据存储目录需要配置

kafka的目录结构:

目录名称

说明

bin

Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等

config

Kafka的所有配置文件

libs

运行Kafka所需要的所有JAR包

logs

Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息

site-docs

Kafka的网站帮助文件

Kafka的生产者/消费者/工具

  • 安装Kafka集群,可以测试以下
    • 创建一个topic主题(消息都是存放在topic中,类似mysql建表的过程)
    • 基于kafka的内置测试生产者脚本来读取标准输入(键盘输入)的数据,并放入到topic中
    • 基于kafka的内置测试消费者脚本来消费topic中的数据
  • 推荐大家开发的使用Kafka Tool
    • 浏览Kafka集群节点、多少个topic、多少个分区
    • 创建topic/删除topic
    • 浏览ZooKeeper中的数据
  • 工具下载

Offset Explorer

工具使用步骤

Kafka的基准测试工具

  • Kafka中提供了内置的性能测试工具
    • 生产者:测试生产每秒传输的数据量(多少条数据、多少M的数据)5000000 records sent, 11825.446943 records/sec (11.28 MB/sec), 2757.61 ms avg latency
    • 消费者:测试消费每条拉取的数据量
  • 对比生产者和消费者:消费者的速度更快
  • 参考文档进行测试,根电脑性能有关,一共有三种方式.

Kafka的Eagle监控工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等。

官网: EFAK

采用Docker进行安装

  1. 拉取镜像

docker pull nickzurich/efak:latest

  1. 使用以下命令启动容器(记得开放相关端口)
docker run -d --name kafka-eagle -p 8048:8048 -e EFAK_CLUSTER_ZK_LIST="116.62.237.97:2181" nickzurich/efak:latest

EFAK_CLUSTER_ZK_LIST:配置的是zookeeper服务的地址

  1. 在游览器中输入http://ip:8048 即可访问,初始账号密码: admin/123456
  1. 开启Kafka JMX端口

JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户可以在任何Java应用程序中使用这些代理和服务实现管理。很多的一些软件都提供了JMX接口,来实现一些管理、监控功能。

在启动Kafka的脚本前,添加:

  1. cd ${KAFKA_HOME}
  2. export JMX_PORT=9988
  3. nohup bin/kafka-server-start.sh config/server.properties &
  1. 安装Kafka-Eagle

安装JDK,并配置好JAVA_HOME。

将kafka_eagle上传,并解压到 /export/server 目录中。

  1. cd cd /export/software/
  2. tar -xvzf kafka-eagle-bin-1.4.6.tar.gz -C ../server/
  3. cd /export/server/kafka-eagle-bin-1.4.6/
  4. tar -xvzf kafka-eagle-web-1.4.6-bin.tar.gz
  5. cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6

配置kafka_eagle 环境变量。

  1. vim /etc/profile
  2. export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
  3. export PATH=$PATH:$KE_HOME/bin
  4. source /etc/profile

配置kafka_eagle。使用vi打开conf目录下的system-config.properties

  1. vim conf/system-config.properties
  2. # 修改第4行,配置kafka集群别名
  3. kafka.eagle.zk.cluster.alias=cluster1
  4. # 修改第5行,配置ZK集群地址
  5. cluster1.zk.list=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181
  6. # 注释第6
  7. #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
  8. # 修改第32行,打开图标统计
  9. kafka.eagle.metrics.charts=true
  10. kafka.eagle.metrics.retain=30
  11. # 注释第69行,取消sqlite数据库连接配置
  12. #kafka.eagle.driver=org.sqlite.JDBC
  13. #kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
  14. #kafka.eagle.username=root
  15. #kafka.eagle.password=www.kafka-eagle.org
  16. # 修改第77行,开启mys
  17. kafka.eagle.driver=com.mysql.jdbc.Driver
  18. kafka.eagle.url=jdbc:mysql://node1.itcast.cn:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
  19. kafka.eagle.username=root
  20. kafka.eagle.password=123456

配置JAVA_HOME

  1. cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin
  2. vim ke.sh
  3. # 在第24行添加JAVA_HOME环境配置
  4. export JAVA_HOME=/export/server/jdk1.8.0_241

修改Kafka eagle可执行权限

  1. cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin
  2. chmod +x ke.sh

启动kafka_eagle。

./ke.sh start

访问Kafka eagle,默认用户为admin,密码为:123456

http://ip:8048/ke

Kafka度量指标

topic list

点击Topic下的List菜单,就可以展示当前Kafka集群中的所有topic。

指标

意义

Brokers Spread

broker使用率

Brokers Skew

分区是否倾斜

Brokers Leader Skew

leader partition是否存在倾斜

生产者消息总计

Kafka Java API开发

  1. <!-- kafka客户端工具 -->
  2. <dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>2.4.1</version>
  6. </dependency>

小tips 将日志输出到文件

  1. 导入log4J的依赖

<!-- SLF桥接LOG4J日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>
<!-- SLOG4J日志-->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>

  1. 将log4j.properties配置文件放入到resources文件夹中

log4j.rootLogger=INFO,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n

生产者程序开发

kafka 2.4.0 API

  1. 创建连接
    • bootstrap.servers:Kafka的服务器地址
    • acks:表示当生产者生产数据到Kafka中,Kafka中会以什么样的策略返回
    • key.serializer:Kafka中的消息是以key、value键值对存储的,而且生产者生产的消息是需要在网络上传到的,这里指定的是StringSerializer方式,就是以字符串方式发送(将来还可以使用其他的一些序列化框架:Google ProtoBuf、Avro)
    • value.serializer:同上
  1. 创建一个生产者对象KafkaProducer
  2. 调用send方法发送消息(ProducerRecor,封装是key-value键值对)
  3. 调用Future.get表示等带服务端的响应
  4. 关闭生产者
  1. // 1. 创建用于连接Kafka的Properties配置
  2. Properties props = new Properties();
  3. props.put("bootstrap.servers", "116.62.237.97:9092");
  4. props.put("acks", "all");
  5. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  6. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  7. // 2. 创建一个生产者对象KafkaProducer
  8. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
  9. // 3. 发送1-100的消息到指定的topic中
  10. for (int i = 0; i < 100; ++i) {
  11. // 构建一条消息,直接new ProducerRecord
  12. ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
  13. Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
  14. // 调用Future的get方法等待响应
  15. future.get();
  16. System.out.println("第" + i + "条消息写入成功!");
  17. }
  18. // 4.关闭生产者
  19. kafkaProducer.close();

消费者程序开发

  • group.id:消费者组的概念,可以在一个消费组中包含多个消费者。如果若干个消费者的group.id是一样的,表示它们就在一个组中,一个组中的消费者是共同消费Kafka中topic的数据。
  • Kafka是一种拉消息模式的消息队列,在消费者中会有一个offset,表示从哪条消息开始拉取数据
  • kafkaConsumer.poll:Kafka的消费者API是一批一批数据的拉取
  1. /**
  2. * 消费者程序
  3. * <p>
  4. * 1.创建Kafka消费者配置
  5. * Properties props = new Properties();
  6. * props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
  7. * props.setProperty("group.id", "test");
  8. * props.setProperty("enable.auto.commit", "true");
  9. * props.setProperty("auto.commit.interval.ms", "1000");
  10. * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  11. * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  12. * <p>
  13. * 2.创建Kafka消费者
  14. * 3.订阅要消费的主题
  15. * 4.使用一个while循环,不断从Kafka的topic中拉取消息
  16. * 5.将将记录(record)的offset、key、value都打印出来
  17. */
  18. public class KafkaConsumerOne {
  19. public static void main(String[] args) {
  20. // 1.创建Kafka消费者配置
  21. Properties props = new Properties();
  22. props.setProperty("bootstrap.servers", "116.62.237.97:9092");
  23. // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
  24. // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
  25. props.setProperty("group.id", "test");
  26. // 自动提交offset
  27. props.setProperty("enable.auto.commit", "true");
  28. // 自动提交offset的时间间隔
  29. props.setProperty("auto.commit.interval.ms", "1000");
  30. // 拉取的keyvalue数据的
  31. props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  32. props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  33. // 2.创建Kafka消费者
  34. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
  35. // 3. 订阅要消费的主题
  36. // 指定消费者从哪个topic中拉取数据
  37. kafkaConsumer.subscribe(Arrays.asList("test"));
  38. // 4.使用一个while循环,不断从Kafka的topic中拉取消息
  39. while (true) {
  40. // Kafka的消费者一次拉取一批的数据
  41. ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
  42. // 5.将将记录(record)的offset、keyvalue都打印出来
  43. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  44. // 主题
  45. String topic = consumerRecord.topic();
  46. // offset:这条消息处于Kafka分区中的哪个位置
  47. long offset = consumerRecord.offset();
  48. // key\value
  49. String key = consumerRecord.key();
  50. String value = consumerRecord.value();
  51. System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
  52. }
  53. }
  54. }
  55. }

生产者使用异步方式生产消息

  • 获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他操作.
  • 使用匿名内部类实现Callback接口,该接口中表示Kafka服务器响应给客户端,会自动调用onCompletion方法
    • metadata:消息的元数据(属于哪个topic、属于哪个partition、对应的offset是什么)
    • exception:这个对象Kafka生产消息封装了出现的异常,如果为null,表示发送成功,如果不为null,表示出现异常。
  1. // 二、使用异步回调的方式发送消息
  2. ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
  3. kafkaProducer.send(producerRecord, new Callback() {
  4. @Override
  5. public void onCompletion(RecordMetadata metadata, Exception exception) {
  6. // 1. 判断发送消息是否成功
  7. if(exception == null) {
  8. // 发送成功
  9. // 主题
  10. String topic = metadata.topic();
  11. // 分区id
  12. int partition = metadata.partition();
  13. // 偏移量
  14. long offset = metadata.offset();
  15. System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset);
  16. }
  17. else {
  18. // 发送出现错误
  19. System.out.println("生产消息出现异常!");
  20. // 打印异常消息
  21. System.out.println(exception.getMessage());
  22. // 打印调用栈
  23. System.out.println(exception.getStackTrace());
  24. }
  25. }
  26. });

Kafka中的重要概念

  • broker
    • Kafka服务器进程,生产者、消费者都要连接broker
    • 一个集群由多个broker组成,功能实现Kafka集群的负载均衡、容错

  • producer:生产者
  • consumer:消费者
  • topic:主题,一个Kafka集群中,可以包含多个topic。一个topic可以包含多个分区
    • 是一个逻辑结构,生产、消费消息都需要指定topic
  • partition:Kafka集群的分布式就是由分区来实现的。一个topic中的消息可以分布在topic中的不同partition中
  • replica:副本,实现Kafkaf集群的容错,实现partition的容错。一个topic至少应该包含大于1个的副本
  • consumer group:消费者组,一个消费者组中的消费者可以共同消费topic中的分区数据。每一个消费者组都一个唯一的名字。配置group.id一样的消费者是属于同一个组中
  • offset:偏移量。相对消费者、partition来说,可以通过offset来拉取数据

消费者组

  • 一个消费者组中可以包含多个消费者,共同来消费topic中的数据
  • 一个topic中如果只有一个分区,那么这个分区只能被某个组中的一个消费者消费
  • 有多少个分区,那么就可以被同一个组内的多少个消费者消费

幂等性

  • 生产者消息重复问题
    • Kafka生产者生产消息到partition,如果直接发送消息,kafka会将消息保存到分区中,但Kafka会返回一个ack给生产者,表示当前操作是否成功,是否已经保存了这条消息。如果ack响应的过程失败了,此时生产者会重试,继续发送没有发送成功的消息,Kafka又会保存一条一模一样的消息

  • 在Kafka中可以开启幂等性
props.put("enable.idempotence",true);
    • 当Kafka的生产者生产消息时,会增加一个pid(生产者的唯一编号)和sequence number(针对消息的一个递增序列)
    • 发送消息,会连着pid和sequence number一块发送
    • kafka接收到消息,会将消息和pid、sequence number一并保存下来
    • 如果ack响应失败,生产者重试,再次发送消息时,Kafka会根据pid、sequence number是否需要再保存一条消息
    • 判断条件:生产者发送过来的sequence number 是否小于等于 partition中消息对应的sequence

事务编程

  • Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。

  • 开启事务的条件
    • 生产者
  1. // 开启事务必须要配置事务的ID
  2. props.put("transactional.id", "dwd_user");
    • 消费者
  1. // 配置事务的隔离级别
  2. props.put("isolation.level","read_committed");
  3. // 关闭自动提交,一会我们需要手动来提交offset,通过事务来维护offset
  4. props.setProperty("enable.auto.commit", "false");
    • 生产者
      • 初始化事务 initTransactions
      • 开启事务 beginTransaction
      • 需要使用producer来将消费者的offset提交到事务中 sendOffsetsToTransaction
      • 提交事务 commitTransaction
      • 如果出现异常回滚事务 abortTransaction

如果使用了事务,不要使用异步发送

  1. public class TransactionProgram {
  2. public static void main(String[] args) {
  3. // 1. 调用之前实现的方法,创建消费者、生产者对象
  4. KafkaConsumer<String, String> consumer = createConsumer();
  5. KafkaProducer<String, String> producer = createProducer();
  6. // 2. 生产者调用initTransactions初始化事务
  7. producer.initTransactions();
  8. // 3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
  9. while(true) {
  10. try {
  11. // (1) 生产者开启事务
  12. producer.beginTransaction();
  13. // 这个Map保存了topic对应的partition的偏移量
  14. Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
  15. // 从topic中拉取一批的数据
  16. // (2) 消费者拉取消息
  17. ConsumerRecords<String, String> concumserRecordArray = consumer.poll(Duration.ofSeconds(5));
  18. // (3) 遍历拉取到的消息,并进行预处理
  19. for (ConsumerRecord<String, String> cr : concumserRecordArray) {
  20. //1转换为男,0转换为女
  21. String msg = cr.value();
  22. String[] fieldArray = msg.split(",");
  23. // 将消息的偏移量保存
  24. // 消费的是ods_user中的数据
  25. String topic = cr.topic();
  26. int partition = cr.partition();
  27. long offset = cr.offset();
  28. int i = 1 / 0;
  29. // offset + 1:offset是当前消费的记录(消息)对应在partition中的offset,而我们希望下一次能继续从下一个消息消息
  30. // 必须要+1,从能消费下一条消息
  31. offsetMap.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset + 1));
  32. // 将字段进行替换
  33. if(fieldArray != null && fieldArray.length > 2) {
  34. String sexField = fieldArray[1];
  35. if(sexField.equals("1")) {
  36. fieldArray[1] = "男";
  37. }
  38. else if(sexField.equals("0")){
  39. fieldArray[1] = "女";
  40. }
  41. }
  42. // 重新拼接字段
  43. msg = fieldArray[0] + "," + fieldArray[1] + "," + fieldArray[2];
  44. // (4) 生产消息到dwd_user topic中
  45. ProducerRecord<String, String> dwdMsg = new ProducerRecord<>("dwd_user", msg);
  46. // 发送消息
  47. Future<RecordMetadata> future = producer.send(dwdMsg);
  48. try {
  49. future.get();
  50. } catch (Exception e) {
  51. e.printStackTrace();
  52. producer.abortTransaction();
  53. }
  54. // new Callback()
  55. // {
  56. // @Override
  57. // public void onCompletion(RecordMetadata metadata, Exception exception) {
  58. // // 生产消息没有问题
  59. // if(exception == null) {
  60. // System.out.println("发送成功:" + dwdMsg);
  61. // }
  62. // else {
  63. // System.out.println("生产消息失败:");
  64. // System.out.println(exception.getMessage());
  65. // System.out.println(exception.getStackTrace());
  66. // }
  67. // }
  68. // });
  69. }
  70. producer.sendOffsetsToTransaction(offsetMap, "ods_user");
  71. // (6) 提交事务
  72. producer.commitTransaction();
  73. }catch (Exception e) {
  74. e.printStackTrace();
  75. // (7) 捕获异常,如果出现异常,则取消事务
  76. producer.abortTransaction();
  77. }
  78. }
  79. }
  80. // 一、创建一个消费者来消费ods_user中的数据
  81. private static KafkaConsumer<String, String> createConsumer() {
  82. // 1. 配置消费者的属性(添加对事务的支持)
  83. Properties props = new Properties();
  84. props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
  85. props.setProperty("group.id", "ods_user");
  86. // 配置事务的隔离级别
  87. props.put("isolation.level","read_committed");
  88. // 关闭自动提交,一会我们需要手动来提交offset,通过事务来维护offset
  89. props.setProperty("enable.auto.commit", "false");
  90. // 反序列化器
  91. props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  92. props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  93. // 2. 构建消费者对象
  94. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
  95. // 3. 订阅一个topic
  96. kafkaConsumer.subscribe(Arrays.asList("ods_user"));
  97. return kafkaConsumer;
  98. }
  99. // 二、编写createProducer方法,用来创建一个带有事务配置的生产者
  100. private static KafkaProducer<String, String> createProducer() {
  101. // 1. 配置生产者带有事务配置的属性
  102. Properties props = new Properties();
  103. props.put("bootstrap.servers", "node1.itcast.cn:9092");
  104. // 开启事务必须要配置事务的ID
  105. props.put("transactional.id", "dwd_user");
  106. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  107. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  108. // 2. 构建生产者
  109. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
  110. return kafkaProducer;
  111. }
  112. }

Kafka中的分区副本机制

生产者的分区写入策略

  • 轮询(按照消息尽量保证每个分区的负载)策略,消息会均匀地分布到每个partition
    • 写入消息的时候,key为null的时候,默认使用的是轮询策略
    • 默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区

  • 随机策略(不使用)
  • 按key写入策略,key.hash() % 分区的数量

  • 自定义分区策略(类似于MapReduce指定分区)

乱序问题

  • 在Kafka中生产者是有写入策略,如果topic有多个分区,就会将数据分散在不同的partition中存储
  • 当partition数量大于1的时候,数据(消息)会打散分布在不同的partition中
  • 如果只有一个分区,消息是有序的

自定义分区策略:

  1. 创建自定义分区器
  1. public class KeyWithRandomPartitioner implements Partitioner {
  2. private Random r;
  3. @Override
  4. public void configure(Map<String, ?> configs) {
  5. r = new Random();
  6. }
  7. @Override
  8. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  9. // cluster.partitionCountForTopic 表示获取指定topic的分区数量
  10. return r.nextInt(1000) % cluster.partitionCountForTopic(topic);
  11. }
  12. @Override
  13. public void close() {
  14. }
  15. }
  1. 在Kafka生产者配置中,自定使用自定义分区器的类名
  1. props.
  2. put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyWithRandomPartitioner.class.getName());

消费组Consumer Group Rebalance机制

  • 再均衡:在某些情况下,消费者组中的消费者消费的分区会产生变化,会导致消费者分配不均匀(例如:有两个消费者消费3个,因为某个partition崩溃了,还有一个消费者当前没有分区要削峰),Kafka Consumer Group就会启用rebalance机制,重新平衡这个Consumer Group内的消费者消费的分区分配。
  • 触发时机
    • 消费者数量发生变化
      • 某个消费者crash
      • 新增消费者
    • topic的数量发生变化
      • 某个topic被删除
    • partition的数量发生变化
      • 删除partition
      • 新增partition
  • 不良影响
    • 发生rebalance,所有的consumer将不再工作,共同来参与再均衡,直到每个消费者都已经被成功分配所需要消费的分区为止(rebalance结束)

消费者的分区分配策略

分区分配策略:保障每个消费者尽量能够均衡地消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费的分区特别少

  • Range分配策略(范围分配策略):Kafka默认的分配策略
    • n:分区的数量 / 消费者数量
    • m:分区的数量 % 消费者数量
    • 前m个消费者消费n+1个分区
    • 剩余的消费者消费n个分区

  • RoundRobin分配策略(轮询分配策略)
    • 消费者挨个分配消费的分区
    • 配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor。

  • Striky粘性分配策略
    • 在没有发生rebalance跟轮询分配策略是一致的
    • 发生了rebalance,轮询分配策略,重新走一遍轮询分配的过程。而粘性会保证跟上一次的尽量一致,只是将新的需要分配的分区,均匀的分配到现有可用的消费者中即可
    • 减少上下文的切换
    • Striky粘性分配策略,保留rebalance之前的分配结果。

副本的ACK机制

producer是不断地往Kafka中写入数据,写入数据会有一个返回结果,表示是否写入成功。这里对应有一个ACKs的配置。

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers""node1.itcast.cn:9092");
  3. props.put("acks""all");
  4. props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
  5. props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
  • acks = 0:生产者只管写入,不管是否写入成功,可能会数据丢失。性能是最好的

  • acks = 1:生产者会等到leader分区写入成功后,返回成功,接着发送下一条

  • acks = -1/all:确保消息写入到leader分区、还确保消息写入到对应副本都成功后,接着发送下一条,性能是最差的

根据业务情况来选择ack机制,是要求性能最高,一部分数据丢失影响不大,可以选择0/1。如果要求数据一定不能丢失,就得配置为-1/all。

分区中是有leader和follower的概念,为了确保消费者消费的数据是一致的,只能从分区leader去读写消息,follower做的事情就是同步数据,Backup。

高级API(High-Level API)、低级API(Low-Level API)

  • 高级API就是直接让Kafka帮助管理、处理分配、数据
    • offset存储在ZK中
    • 由kafka的rebalance来控制消费者分配的分区
    • 开发起来比较简单,无需开发者关注底层细节
    • 无法做到细粒度的控制
  1. /**
  2.  * 消费者程序:从test主题中消费数据
  3.  */
  4. public class _2ConsumerTest {
  5.     public static void main(String[] args) {
  6.         // 1. 创建Kafka消费者配置
  7.         Properties props = new Properties();
  8.         props.setProperty("bootstrap.servers""192.168.88.100:9092");
  9.         props.setProperty("group.id""test");
  10.         props.setProperty("enable.auto.commit""true");
  11.         props.setProperty("auto.commit.interval.ms""1000");
  12.         props.setProperty("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
  13.         props.setProperty("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
  14.         // 2. 创建Kafka消费者
  15.         KafkaConsumer<StringString> consumer = new KafkaConsumer<>(props);
  16.         // 3. 订阅要消费的主题
  17.         consumer.subscribe(Arrays.asList("test"));
  18.         // 4. 使用一个while循环,不断从Kafka的topic中拉取消息
  19.         while (true) {
  20.             // 定义100毫秒超时
  21.             ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
  22.             for (ConsumerRecord<StringString> record : records)
  23.                 System.out.printf("offset = %d, key = %s, value = %s%n"record.offset(), record.key(), record.value());
  24.         }
  25.     }
  26. }
  • 低级API:由编写的程序自己控制逻辑
    • 自己来管理Offset,可以将offset存储在ZK、MySQL、Redis、HBase、Flink的状态存储
    • 指定消费者拉取某个分区的数据
    • 可以做到细粒度的控制
    • 原有的Kafka的策略会失效,需要我们自己来实现消费机制
  1. String topic = "test";
  2. TopicPartition partition0 = new TopicPartition(topic, 0);
  3. TopicPartition partition1 = new TopicPartition(topic, 1);
  4. consumer.assign(Arrays.asList(partition0, partition1));

Kafka原理

leader和follower

  • Kafka中的leader和follower是相对分区有意义,不是相对broker
  • Kafka在创建topic的时候,会尽量分配分区的leader在不同的broker中,其实就是负载均衡
  • leader职责:读写数据
  • follower职责:同步数据、参与选举(leader crash之后,会选举一个follower重新成为分区的leader
  • 注意和ZooKeeper区分
    • ZK的leader负责读、写,follower可以读取
    • Kafka的leader负责读写、follower不能读写数据(确保每个消费者消费的数据是一致的),Kafka一个topic有多个分区leader,一样可以实现数据操作的负载均衡.

AR\ISR\OSR

  • AR表示一个topic下的所有副本
  • ISR:In Sync Replicas,正在同步的副本(可以理解为当前有几个follower是存活的)
  • OSR:Out of Sync Replicas,不再同步的副本
  • AR = ISR + OSR

leader选举

  • Controller:controller是kafka集群的老大,是针对Broker的一个角色
    • Controller是高可用的,是用过ZK来进行选举
  • Leader:是针对partition的一个角色
    • Leader是通过ISR来进行快速选举
  • 如果Kafka是基于ZK来进行选举,ZK的压力可能会比较大。例如:某个节点崩溃,这个节点上不仅仅只有一个leader,是有不少的leader需要选举。通过ISR快速进行选举。
  • leader的负载均衡
    • 如果某个broker crash之后,就可能会导致partition的leader分布不均匀,就是一个broker上存在一个topic下不同partition的leader
    • 通过以下指令,可以将leader分配到优先的leader对应的broker,确保leader是均匀分配的
bin/kafka-leader-election.sh --bootstrap-server node1.itcast.cn:9092 --topic test --partition=2 --election-type preferred

Kafka读写流程

  • 写流程
    • 通过ZooKeeper找partition对应的leader,leader是负责写的
    • producer开始写入数据
    • ISR里面的follower开始同步数据,并返回给leader ACK
    • 返回给producer ACK
  • 读流程
    • 通过ZooKeeper找partition对应的leader,leader是负责读的
    • 通过ZooKeeper找到消费者对应的offset
    • 然后开始从offset往后顺序拉取数据
    • 提交offset(自动提交——每隔多少秒提交一次offset、手动提交——放入到事务中提交)

Kafka的物理存储

  • Kafka的数据组织结构
    • topic
    • partition
    • segment
      • .log数据文件
      • .index(稀疏索引)
      • .timeindex(根据时间做的索引)
  • 深入了解读数据的流程
    • 消费者的offset是一个针对partition全局offset
    • 可以根据这个offset找到segment段
    • 接着需要将全局的offset转换成segment的局部offset
    • 根据局部的offset,就可以从(.index稀疏索引)找到对应的数据位置
    • 开始顺序读取

消息传递的语义性

Flink里面有对应的每种不同机制的保证,提供Exactly-Once保障(二阶段事务提交方式)

  • At-most once:最多一次(只管把数据消费到,不管有没有成功,可能会有数据丢失)
  • At-least once:最少一次(有可能会出现重复消费)
  • Exactly-Once:仅有一次(事务性性的保障,保证消息有且仅被处理一次)

Kafka的消息不丢失

  • broker消息不丢失:因为有副本relicas的存在,会不断地从leader中同步副本,所以,一个broker crash,不会导致数据丢失,除非是只有一个副本。
  • 生产者消息不丢失:ACK机制(配置成ALL/-1)、配置0或者1有可能会存在丢失
  • 消费者消费不丢失:重点控制offset
    • At-least once:一种数据可能会重复消费
    • Exactly-Once:仅被一次消费

数据积压

  • 数据积压指的是消费者因为有一些外部的IO、一些比较耗时的操作(Full GC——Stop the world),就会造成消息在partition中一直存在得不到消费,就会产生数据积压
  • 在企业中,我们要有监控系统,如果出现这种情况,需要尽快处理。虽然后续的Spark Streaming/Flink可以实现背压机制,但是数据累积太多一定对实时系统它的实时性是有说影响的

数据清理&配额限速

  • 数据清理
    • Log Deletion(日志删除):如果消息达到一定的条件(时间、日志大小、offset大小),Kafka就会自动将日志设置为待删除(segment端的后缀名会以 .delete结尾),日志管理程序会定期清理这些日志
      • 默认是7天过期
    • Log Compaction(日志合并)
      • 如果在一些key-value数据中,一个key可以对应多个不同版本的value
      • 经过日志合并,就会只保留最新的一个版本
  • 配额限速
    • 可以限制Producer、Consumer的速率
    • 防止Kafka的速度过快,占用整个服务器(broker)的所有IO资源
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/951052
推荐阅读
相关标签
  

闽ICP备14008679号