当前位置:   article > 正文

消息中间件-kafka_kafka_zookeeper_connect

kafka_zookeeper_connect

一、Kafka概述

1.1 简介

Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。
Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。

1.2 特点

  • 同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
  • 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘防止数据丢失。
  • 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
  • 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
  • 支持online和offline的场景。

1.3 Kafka架构图


相关概念

  • Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
  • Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  • Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
  • Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
  • Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
  • Broker:Kafka集群中的一台或多台服务器统称为broker。

二、Kafka的安装

2.1 单机安装(原生方式)

1) 官网下载kafka

https://kafka.apache.org/downloads.html

2)安装Zookeeper(略)
3)解压安装包

tar -zxvf kafka_2.12-3.1.0.tgz -C /usr/local/software

4)修改配置文件

vim /usr/local/software/kafka_2.12-3.1.0/config/server.properties

修改如下:
broker.id=0  #broker节点id
port=9092  #端口号, 默认9092
log.dirs=/usr/local/kafka_2.12-3.1.0/kafka-logs
num.partitions=2
zookeeper.connect=zookeeper集群地址

5)创建日志文件夹

mkdir -p /usr/local/kafka_2.12-3.1.0/kafka-logs

6)启动kafka

/usr/local/kafka_2.12-3.1.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.1.0/config/server.properties

2.2 单机安装(Docker)

  1. version: '3.1'
  2. services:
  3. zk:
  4. image: zookeeper
  5. container_name: zk
  6. restart: always
  7. ports:
  8. - 2181:2181
  9. kafka:
  10. image: wurstmeister/kafka
  11. container_name: kafka
  12. restart: always
  13. environment:
  14. KAFKA_BROKER_ID: 0
  15. KAFKA_ZOOKEEPER_CONNECT: 192.168.195.188:2181/kafka
  16. #kafka发布到Zookeeper供客户端使用的服务地址
  17. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.195.188:9092
  18. #定义Kafka Broker的服务监听地址,0.0.0.0表示kafka服务会监听所有网络接口
  19. KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
  20. ports:
  21. - 9092:9092
  22. kafka-manager:
  23. image: sheepkiller/kafka-manager
  24. restart: always
  25. environment:
  26. ZK_HOSTS: zk:2181/kafka
  27. ports:
  28. - 9000:9000
  29. kafka-console-ui:
  30. image: wdkang/kafka-console-ui
  31. container_name: kafka-console-ui
  32. ports:
  33. - 7766:7766
  34. volumes:
  35. - ./data:/app/data
  36. - ./log:/app/log
  37. privileged: true
  38. user: root

2.3 集群安装(Docker)

  1. version: '3.1'
  2. services:
  3. zk:
  4. image: zookeeper
  5. restart: always
  6. hostname: zk
  7. ports:
  8. - 2181:2181
  9. container_name: zk
  10. kafka1:
  11. image: wurstmeister/kafka
  12. ports:
  13. - 9092:9092
  14. restart: always
  15. environment:
  16. KAFKA_ADVERTISED_HOST_NAME: 192.168.91.188 ## 修改:宿主机IP
  17. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.91.188:9092 ## 修改:宿主机IP
  18. KAFKA_ZOOKEEPER_CONNECT: zk:2181/kafka
  19. KAFKA_ADVERTISED_PORT: 9092
  20. KAFKA_BROKER_ID: 1
  21. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  22. depends_on:
  23. - zk
  24. container_name: kafka1
  25. kafka2:
  26. image: wurstmeister/kafka
  27. ports:
  28. - 9093:9092
  29. restart: always
  30. environment:
  31. KAFKA_ADVERTISED_HOST_NAME: 192.168.91.188 ## 修改:宿主机IP
  32. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.91.188:9093 ## 修改:宿主机IP
  33. KAFKA_ZOOKEEPER_CONNECT: zk:2181/kafka
  34. KAFKA_ADVERTISED_PORT: 9093
  35. KAFKA_BROKER_ID: 2
  36. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  37. depends_on:
  38. - zk
  39. container_name: kafka2
  40. kafka3:
  41. image: wurstmeister/kafka
  42. ports:
  43. - 9094:9092
  44. restart: always
  45. environment:
  46. KAFKA_ADVERTISED_HOST_NAME: 192.168.91.188 ## 修改:宿主机IP
  47. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.91.188:9094 ## 修改:宿主机IP
  48. KAFKA_ZOOKEEPER_CONNECT: zk:2181/kafka
  49. KAFKA_ADVERTISED_PORT: 9094
  50. KAFKA_BROKER_ID: 3
  51. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  52. depends_on:
  53. - zk
  54. container_name: kafka3
  55. # kafka-manager:
  56. # image: sheepkiller/kafka-manager
  57. # restart: always
  58. # environment:
  59. # ZK_HOSTS: zk:2181/kafka
  60. # ports:
  61. # - 9000:9000
  62. kafka-console-ui:
  63. image: wdkang/kafka-console-ui
  64. container_name: kafka-console-ui
  65. ports:
  66. - 7766:7766
  67. volumes:
  68. - ./data:/app/data
  69. - ./log:/app/log
  70. privileged: true
  71. user: root

三、相关命令

主题相关

  1. #docker容器的默认路径
  2. /opt/kafka_2.13-2.8.1/bin
  3. #查看kafka所有主题Topic
  4. ./kafka-topics.sh --zookeeper zk访问地址/kafka --list
  5. #创建kafka主题Topic
  6. ./kafka-topics.sh --create --zookeeper zk访问地址/kafka --topic 主题名称 --partitions 分区数量 --replication-factor 副本数
  7. #删除kafka主题Topic
  8. ./kafka-topics.sh --delete --zookeeper zk访问地址/kafka --topic 主题名称
  9. #查看主题Topic详情
  10. ./kafka-topics.sh --topic mytopic --zookeeper 192.168.195.188:2181/kafka --describe

生产者相关

  1. #生产者发送消息
  2. ./kafka-console-producer.sh --topic 主题名称 --broker-list kafka地址:9092

消费者相关

  1. #消费者订阅主题
  2. ./kafka-console-consumer.sh --bootstrap-server kafka地址:9092 --topic 主题名称 --from-beginning
  3. #查看消费组的消费进度
  4. ./kafka-consumer-groups.sh --bootstrap-server kafka地址:9092 --describe --group 消费者组名

四、Java代码操作Kafka

添加依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.6.0</version>
  5. </dependency>

4.1 生产端

  1. package com.qf.kafka;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import java.util.Properties;
  7. /**
  8. * 生产者
  9. */
  10. public class Provider {
  11. public static void main(String[] args) {
  12. Properties properties = new Properties();
  13. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.195.188:9092,192.168.195.188:9093");
  14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  15. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  16. properties.put(ProducerConfig.RETRIES_CONFIG, 10);//重试次数
  17. KafkaProducer producer = new KafkaProducer(properties);
  18. ProducerRecord producerRecord = new ProducerRecord("mytopic", Math.random() * 1000000 + "");
  19. //发送消息
  20. producer.send(producerRecord);
  21. producer.close();
  22. }
  23. }

4.2 消费端

  1. package com.qf.kafka;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import java.time.Duration;
  8. import java.util.Collections;
  9. import java.util.Properties;
  10. public class Consumer {
  11. public static void main(String[] args) {
  12. Properties prop = new Properties();
  13. prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.195.188:9092");
  14. prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  15. prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  16. // prop.put("session.timeout.ms","30000");
  17. // //消费者是否自动提交偏移量,默认是true 避免出现重复数据 设为false
  18. // prop.put("enable.auto.commit","true");
  19. // prop.put("auto.commit.interval.ms","1000");
  20. // //auto.offset.reset 消费者在读取一个没有偏移量的分区或者偏移量无效的情况下的处理
  21. // //earliest 在偏移量无效的情况下 消费者将从起始位置读取分区的记录
  22. // //latest 在偏移量无效的情况下 消费者将从最新位置读取分区的记录
  23. prop.put("auto.offset.reset","earliest");
  24. // 设置组名
  25. prop.put(ConsumerConfig.GROUP_ID_CONFIG,"testgroup");
  26. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
  27. consumer.subscribe(Collections.singletonList("mytopic"));
  28. while (true){
  29. // 每隔一秒读取Kafka数据
  30. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  31. for (ConsumerRecord<String,String> red:records){
  32. // System.out.printf("offset:%d,key:%s,value:%s",red.offset(),red.key(),red.value());
  33. System.out.println(red.partition() + " " + red.offset() + " " + red.key() + " " + red.value());
  34. }
  35. }
  36. }
  37. }

4.3 创建主题

  1. package com.qf.demo1;
  2. import org.apache.kafka.clients.admin.AdminClient;
  3. import org.apache.kafka.clients.admin.AdminClientConfig;
  4. import org.apache.kafka.clients.admin.CreateTopicsResult;
  5. import org.apache.kafka.clients.admin.NewTopic;
  6. import java.util.Collections;
  7. import java.util.Properties;
  8. import java.util.concurrent.ExecutionException;
  9. public class TopicDemo {
  10. public static void main(String[] args) {
  11. Properties properties = new Properties();
  12. properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "119.91.150.61:9092");
  13. AdminClient adminClient = AdminClient.create(properties);
  14. //创建主题
  15. NewTopic newTopic = new NewTopic("mytopic", 2, (short)1);
  16. CreateTopicsResult topics = adminClient.createTopics(Collections.singletonList(newTopic));
  17. try {
  18. //执行创建
  19. topics.all().get();
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. } catch (ExecutionException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }

4.4 查看主题

  1. package com.qf.demo1;
  2. import org.apache.kafka.clients.admin.*;
  3. import java.util.Collections;
  4. import java.util.Properties;
  5. import java.util.concurrent.ExecutionException;
  6. public class TopicDemo {
  7. public static void main(String[] args) {
  8. Properties properties = new Properties();
  9. properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "119.91.150.61:9092");
  10. AdminClient adminClient = AdminClient.create(properties);
  11. //查看所有主题列表
  12. ListTopicsResult result = adminClient.listTopics();
  13. try {
  14. result.listings().get().stream().forEach(topicListing -> {
  15. System.out.println("主题名称:" + topicListing.name() + " 是否为内置主题:" + topicListing.isInternal());
  16. System.out.println("主题的相关属性:");
  17. //查看主题的详细信息
  18. DescribeTopicsResult tResult = adminClient.describeTopics(Collections.singleton(topicListing.name()));
  19. tResult.topicNameValues().entrySet().stream().forEach(stringKafkaFutureEntry -> {
  20. try {
  21. TopicDescription topicDescription = stringKafkaFutureEntry.getValue().get();
  22. String name = topicDescription.name();
  23. System.out.println("名称:" + name);
  24. System.out.println("分区属性:");
  25. topicDescription.partitions().stream().forEach(topicPartitionInfo -> {
  26. System.out.println("分区名称:" + topicPartitionInfo.partition());
  27. System.out.println("leader节点:" + topicPartitionInfo.leader());
  28. System.out.println("副本节点:" + topicPartitionInfo.replicas());
  29. System.out.println("ISR节点:" + topicPartitionInfo.isr());
  30. });
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. } catch (ExecutionException e) {
  34. e.printStackTrace();
  35. }
  36. });
  37. });
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. } catch (ExecutionException e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }

五、Kafka生产者

5.1 生产者相关属性

必要属性

bootstrap.servers(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) - 该参数用来指定生产者客户端连接Kafka集群所需要的broker地址清单。可以设置一个或者多个,这里并非需要所有的broker地址,因为生产者会从给定的broker里面查找到其他的broker信息。不过建议至少设置两个以上的broker地址信息,避免单点故障
key.serializer & value.serializer(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG & ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) - broker端接收的消息必须是byte[]的类型,所以需要通过这两个参数指定key-value的序列化器,用于将其他类型的数据序列化成byte[]

方法参数

//ProduceRecord构造方法:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
参数一:topic(必填) - 发送的主题名称
参数二:partition - 发送的分区号
参数三:timestamp - 时间戳
参数四:key - 发送消息的key,如果没有指定分区号,则会根据key计算消息发送的分区
参数五:value(必填) - 发送的消息
参数六:headers -

5.2 发送消息的模式

1)发后既忘

Kafka发送消息默认就是这种模式,发送消息后,可能会造成消息丢失,性能最好,可靠性最差

2)同步消息

通过send方法的返回值调用get方法阻塞等待kafka的响应。同步发送的可靠性高,但是性能相对较差
需要等待一条处理完才能发送下一条

  1. try {
  2. //调用get()方法,同步等待kafka响应
  3. RecordMetadata metadata = producer.send(producerRecord).get();
  4. //获取相关元数据
  5. System out.println(metadata.topic() + " - " + metadata.partition() + ": " + metadata.offset());
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. } catch (ExecutionException e) {
  9. e.printStackTrace();
  10. }

3)异步消息

通过send方法的第二个Callback回调函数,来设置异步发送结果的回调

  1. //发送异步消息
  2. producer.send(producerRecord, new Callback() {
  3. @Override
  4. public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
  5. if (exception != null) {
  6. System.out.println("发生异常!");
  7. } else {
  8. System.out.println("发送成功!");
  9. }
  10. }
  11. });

注意

  • 如果异常exception!=null说明发送出现了异常,如果没有异常,则recordMetadata参数就不会为null。这两个参数是互斥的。
  • 对于同一个分区而言,如果消息1与消息2先发送,KafkaProduce可以保证对应的callback1会在callback2之前调用,也就是说回调函数也是分区有序的。
  • 发送消息的异常类型:
    • 可重试异常
    • 不可重试异常

  • 如果发送消息时抛出不可重试异常,则不会进行重试,直接抛出异常。
    对于可重试异常,如果配置了retries参数,那么只要在规定的次数内自行恢复,就不会抛出异常。
    retries参数的默认值为0,配置方式如下:
    properties.put(ProducerConfig.RETRIES_CONFIG, 10);//重试次数
    重试指定的次数后,如果还没有恢复,那么仍然会抛出异常。

5.3 序列化器

//序列化接口
org.apache.kafka.common.serialization.Serializer

抽象方法

  1. //用来配置当前类
  2. default void configure(Map<String, ?> configs, boolean isKey) {}
  3. //用来执行序列化操作
  4. byte[] serialize(String topic, T data);
  5. //用来关闭当前的序列化器,一般为空方法,如果需要重写,必须保证方法的幂等性,可能会被重复执行
  6. default void close() {}

5.4 分区器

分区器用来确定发送的消息最终会发往主题的哪个分区

  1. //分区器的接口为
  2. org.apache.kafka.clients.producer.Partitioner
  3. //提供方默认的分区器为
  4. org.apache.kafka.clients.producer.internals.DefaultPartitioner

抽象方法

  1. //实现分区的主要逻辑
  2. public int partition(String topic,
  3. Object key,
  4. byte[] keyBytes,
  5. Object value,
  6. byte[] valueBytes,
  7. Cluster cluster);
  8. //一般是空方法
  9. public void close();

设置分区器

  1. //设置自定义和分区器:
  2. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "自定义分区器全限定路径名");

注意

  • 如果发送消息时指定了参数partition,那么就不需要分区器了,因为partition就是最终要发往的分区号。
  • 如果指定了key,分区器就会根据key确定消息最终要发送的分区,确保在分区不变的情况下,相同key的消息会发往相同的分区。
  • 如果key为null,则消息会轮询发往各个分区。

5.5 生产者拦截器

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一定制化的需求,比如统计类工作。

  1. //拦截的接口
  2. org.apache.kafka.clients.producer.ProducerInterceptor

抽象方法

  1. //KafkaProducer在将消息序列化和计算分区前会调用生产者拦截器onSend()方法来对消息进行相应的定制化操作
  2. public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
  3. //KafkaProducer会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的Callback之前执行。
  4. public void onAcknowledgement(RecordMetadata metadata, Exception exception);
  5. public void close();

设置拦截器

  1. //设置自定义的拦截器:
  2. properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "自定义拦截器的路径");
  3. //拦截器链的设置:
  4. properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "自定义拦截器的路径1,自定义拦截器的路径2...");

注意

  • 在拦截器链中,如果某个拦截执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器之后继续执行
  • 拦截器的顺序即为编写的顺序

5.6 生产者发送消息的流程

  • 整个生产者客户端由两个线程协调运行,分别是主线程Sender线程(发送线程)。在主线程中,由KafkaProducer创建消息,通过拦截器、序列化器、分区器的作用之后缓存到消息累加器(RecordAccumulator,也称之为消息收集器)中。Sender线程负责从消息累加器中获取消息并且发送到Kafka
  • RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。消息累加器缓存的大小通过生产者客户端参数buffer.memory配置,默认32MB。如果生产者发送消息的速度超过sender线程发往服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么阻塞,要么抛出异常,取决于参数max.block.ms的配置,默认为60秒
  • 主线程中发送过来的消息会被追加到消息累加器的某个双端队列中,在消息累加器的内部每个分区都维护一个双端队列,队列中的内容就是ProducerBatch。消息写入缓存时,追加到双端队列的尾部;sender读取消息,从头部读取。一个ProducerBatch包含一个或者多个ProducerRecord。通俗的讲,ProducerRecord是生产者中创建的消息,而ProducerBatch是指一个消息批次。
  • 消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放比较耗费资源,在消息收集器的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,其他大小的ByteBuffer不会缓存进BufferPool中,这个特定大小由参数batch.size参数指定,默认16kb
  • Sender线程从消息累加器中获取缓存的消息后,会进一步将原本<分区, 队列>的保存形式转换成<Node, List>的形式,其中Node表示Kafka集群的broker节点。因为对于网络来说,关心的是生产者具体与哪个broker节点创建连接,并不关心消息属于哪个分区,所以这里要做一个应用逻辑层面到网络I/O层面的转换。在转换完成之后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request指Kafka的各种协议请求
  • 请求从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId, Deque>,主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号),并且可以通过配置参数现在每个连接最多缓存的请求数,这个配置参数为max.in.flight.requests.per.connection,默认为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送请求了,除非有缓存的请求收到响应

5.7 生产者参数总结

  • acks - 指定分区中必须要多少个副本收到这条消息,生产者才会认为这条消息成功写入。
    • 1 - 默认值,只要分区的leader成功写入消息,服务器返回成功的响应。如果消息成功写入leader,其他follower同步消息之前,leader崩溃,那么此消息还是会丢失。
    • 0 - 生产者发送消息之后不需要等待服务器的响应。
    • -1 或者 all - 生产者在消息发送之后,需要等待ISR中所有副本都成功写入消息之后,服务器才会返回成功响应。

  • max.request.size - 限制生产者发送消息的最大值,默认1MB。
  • retries - 指定生产者重试的次数,默认为0。
  • retry.backoff.ms - 指定两次重试的间隔时间,默认为100ms。
  • compression.type - 指定消息的压缩方式,默认值为"none",默认情况下不会被压缩。可以配置为"gzip","snappy"和"lz4"。
  • connections.max.idle.ms - 指定在多久之后关闭空闲的连接,默认9分钟。
  • Iinger.ms - 指定生产者发送ProducerBatch之前等待更多消息加入ProducerBatch的时间,默认值为0。生产者会在ProducerBatch被填满或者等待时间超过linger.ms值时发送出去,增大这个参数会增加消息的延迟,但是能同时提升一定的吞吐量。
  • receive.buffer.bytes - 设置Socket接收消息缓存区大小,默认32KB。如果Producer与Kafka处于不同的机房,可以适当调大这个参数值
  • send.buffer.bytes - 设置Socket发送消息缓冲区大小,默认128KB。
  • buffer.memory - 设置消息累加器缓存的大小,默认32MB
  • request.timeout.ms - 配置Producer等待请求响应的最长时间,默认30s。请求超时之后可以选择进行重试。
  • max.block.ms - 消息累加器空间不足时,生产者最大阻塞时间,超过后抛异常。

六、Kafka消费者

6.1 消费者与消费者组

消费者 - 负责订阅Kafka中的主题,并且从订阅的Topic上拉取消息。
消费者组 - 每个消费者都有一个对应的消费者组,消息发布到Topic上之后,只会被订阅它的每个消费者组中的一个消费者所消费
指定消费者组:通过参数group.id配置,默认值为空字符串

消费者&消费者组&Topic&Partition之间的关系

6.2 订阅主题与分区

消费者订阅的三种方式

  1. //根据主题名称订阅,可以订阅多个主题
  2. public void subscribe(Collection<String> topics);
  3. //根据正则表达式订阅,如果匹配上后续新增的主题,也能自动实现订阅
  4. public void subscribe(Pattern pattern);
  5. //根据主题名称以及分区号订阅
  6. public void assign(Collection<TopicPartition> partitions);

消费者获取主题分区信息

  1. //获取主题的分区信息
  2. List<PartitionInfo> testtopic = consumer.partitionsFor("主题名称");
  3. //PartitionInfo类描述
  4. public class PartitionInfo {
  5. //当前分区所属的主题名称
  6. private final String topic;
  7. //当前分区编号
  8. private final int partition;
  9. //分区的leader所在的位置
  10. private final Node leader;
  11. //分区的AR集合列表
  12. private final Node[] replicas;
  13. //分区的ISR集合列表
  14. private final Node[] inSyncReplicas;
  15. //分区的OSR集合列表
  16. private final Node[] offlineReplicas;
  17. ....
  18. }

取消订阅

  1. //取消所有订阅的主题,可以重复执行subscribe方法达到覆盖订阅的效果
  2. consumer.unsubscribe();

ConsumerRebalanceListener参数

这个参数是一个监听器,主要是用来监听消费者再均衡的事件。

注意

如果前后两次订阅了不同的主题,那么消费者以最后一次的为准。
consumer.subscribe(Arrays.asList("TopicA"));
consumer.subscribe(Arrays.asList("TopicB"));
对于以上两行代码,最终消费者订阅的是TopicB,不是TopicA,也不是TopicA和TopicB的并集。

6.3 消费消息

消费的两种模式

  • 推模式 - 服务端自动将消息推送给消费者
  • 拉模式 - 消费者主动发起请求从服务端拉取消息

Kafka的消费模式是基于拉模式

拉取消息的方法

  1. //参数timeout代表超时时间,在消费者的缓冲区里没有可用数据时发生阻塞。
  2. //如果将timeout设置为0,poll方法会立刻返回,不管是否已经拉取到了消息。
  3. //如果当前线程唯一的工作就是从Kafka中拉取并消费消息,则可以设置为最大值Long.MAX_VALUE。
  4. ConsumerRecords<K, V> poll(Duration timeout);
  5. //ConsumerRecord对象 代表消费者获取到的一条消息
  6. public class ConsumerRecord<K, V> {
  7. //消息所属的主题
  8. private final String topic;
  9. //消息所属的分区
  10. private final int partition;
  11. //消息所属分区的偏移量
  12. private final long offset;
  13. //时间戳
  14. private final long timestamp;
  15. //时间戳的类型,有两种类型:
  16. //CreateTime - 创建时间戳
  17. //LogAppendTime - 消息追加到日志的时间戳
  18. private final TimestampType timestampType;
  19. //key序列化后的大小
  20. private final int serializedKeySize;
  21. //value序列化后的大小
  22. private final int serializedValueSize;
  23. //消息的头部内容
  24. private final Headers headers;
  25. //消息的key
  26. private final K key;
  27. //消息的value
  28. private final V value;
  29. //leaderEoch代表 Leader 的纪元信息(epoch),初始值为0。每当 Leader 变更一次,leader epoch 的值就会加1,相当于为 Leader增设了一个版本号
  30. private final Optional<Integer> leaderEpoch;
  31. ...
  32. }

ConsumerRecords对象的方法

  1. //用来获取指定主题分区的消息,消费者可能订阅多个主题,则返回的ConsumerRecords对象中,可能包含多种主题分区的消息
  2. public List<ConsumerRecord<K, V>> records(TopicPartition partition);
  3. //用来获取指定主题的消息
  4. public Iterable<ConsumerRecord<K, V>> records(String topic);
  5. //拉取消息的数量
  6. public int count();

6.4 位移提交

offset

  • 每条消息在分区中都有一个唯一的offset,代表这个消息的偏移量
  • 消费者使用offset表示消费到了分区中某个消息的所在位置,称之为“消费位移

位移提交

  • 消费者每次调用poll()方法,获取到的是还没有消费过的消息集合
  • 为了当前消费者或者新的消费者知道以前的消费情况,需要持久化保存消费位移的信息
  • 老版本的kafka消费移位是保存在zookeeper上的
  • 新版本的kafka消费位移是保存在kafka内部主题_consumer_offsets中
  • 消费者在消费完消息后,需要执行消费位移提交的操作(持久化消费位移信息)
  • 消费者提交消费位移时,提交的是当前消费消息的offset+1(也就是下一次需要获取的消息偏移量)
  • 默认消费者位移是自动提交的

Kafka中关于位移提交的方法

  1. //获取消费者在当前主题分区中,下一次拉取消息的offset(下一条消费的消息)
  2. public long position(TopicPartition partition);
  3. //获取消费者在当前主题分区中,已经提交过的消费位移(提交的是最后消费消息的offset+1)
  4. public long committed(TopicPartition partition);
  5. //同步手动提交消费位移,会根据poll方法拉取的最新消息偏移量来进行提交
  6. public void commitSync();
  7. //根据主题分区,同步手动提交,可以手动设置提交的消费位移
  8. public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets);
  9. //异步手动提交消费位移,提交时不会阻塞当前的消费者线程
  10. public void commitAsync();
  11. //设置异步手动提交的回调方法
  12. public void commitAsync(OffsetCommitCallback callback);

注意

  • 消费者位移的提交,实际开发中往往需要手动提交,自动提交可能会导致消息的重复消费消息丢失的问题
  • 通过参数 enable.auto.commit设置是否自动提交位移,默认为true
  • 通过参数auto.commit.interval.ms设置自动提交位移的周期,默认5s,自动提交位移必须打开

6.5 控制/关闭消费

相关方法

  1. //暂停某个主题分区的数据拉取
  2. public void pause(Collection<TopicPartition> partitions);
  3. //恢复某个主题分区的数据拉取
  4. public void resume(Collection<TopicPartition> partitions);
  5. //返回被暂停的主题分区集合
  6. public Set<TopicPartition> paused();
  7. //其他线程可以通过调用该方法使消费者线程抛出WakeupException异常,从而终止循环拉取
  8. public void wakeup();
  9. //手动释放运行过程中占用的各种系统资源,需要显式的在跳出循环后调用
  10. public void close();

6.6 指定位移消费

  • 如果kafka服务器找不到某个消费者所记录的消费位移时,
    就会根据消费者客户端的配置auto.offset.reset来决定从何处开始消费。
  • auto.offset.reset可选项
    • latest - 从分区末尾开始消费消息(默认值)
    • earliest - 从分区头部开始消费消息
    • none - 抛出NoOffsetForPartitionException异常

  • 如果需要精准掌控消费者消费的起始位置,可以借助Kafka提供的更细粒度的指定位移消费的方法

指定位移消费的相关方法

  1. //用于指定主题分区从什么位置开始消费
  2. void seek(TopicPartition partition, long offset);
  3. //用来获得消费者所分配的分区信息
  4. Set<TopicPartition> assignment();
  5. //用来获得指定主题分区尾部消息的偏移量(返回最后一条消息的offset+1)
  6. Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
  7. //用来获取指定主题分区头部消息的偏移量(一般是0,但是有些特殊情况不一定)
  8. Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
  9. //从指定主题分区开始的位置消费
  10. public void seekToBeginning(Collection<TopicPartition> partitions);
  11. //从指定主题分区尾部位置消费
  12. public void seekToEnd(Collection<TopicPartition> partitions);
  13. //根据具体的时间戳获得对应消息的偏移量
  14. Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);

注意

  • seek方法只能设置当前消费者分配到的分区消费位置,而分区分配是通过poll方法的调用中实现的。所以在调用seek方法之前,必须先调用一次poll方法
  • seek方法定位的位置,如果无法在实际分区中找到,就会发生所谓的位移越界,此时就会根据配置auto.offset.reset的配置进行相应的处理

6.7 消费者再均衡

指分区的所属权从一个消费者转移到另一个消费者的情况,为消费者组具备高可用性和伸缩性提供了保障,可以更安全方便的删除或者添加消费者组类的成员。


再均衡的相关方法

  1. //订阅主题的第二个参数,为再均衡事件的监听器,可以在再均衡动作前后做一些收尾的动作
  2. void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);

ConsumerRebalanceListener监听器的方法

  1. //再均衡开始之前和消费者停止读取消息之后调用,可以通过这个回调方法来处理消费位移的提交,避免消息的重复消费,参数为再均衡前所分配到的分区
  2. void onPartitionsRevoked(Collection<TopicPartition> partitions);
  3. //在重新分配分区之后和消费者开始读取消费者之前被调用,参数表示再均衡后所分配到的分区
  4. void onPartitionsAssigned(Collection<TopicPartition> partitions);

再均衡监听器的用法示例

  1. //准备一个主题-分区-消费位移的缓存Map集合
  2. Map<TopicPartition, OffsetAndMetadata> currentOffsets =new HashMap<>() ;
  3. //订阅主题,并且设置消费者再均衡监听器
  4. consumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {
  5. @Override
  6. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  7. //再均衡之前触发,手动提交消费位移,防止重复消费消息
  8. consumer.commitSync(currentOffsets);
  9. //清空缓存Map
  10. currentOffsets.clear();
  11. }
  12. @Override
  13. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  14. }
  15. });
  16. while (true) {
  17. //每隔1秒拉取一次消息
  18. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  19. //循环处理消息
  20. for (ConsumerRecord<String, String> record : records) {
  21. //---执行处理消息的逻辑----
  22. //缓存当前消费的消息的相关信息 (主题-分区-offset)
  23. currentOffsets.put(
  24. new TopicPartition(record.topic(), record.partition()),
  25. new OffsetAndMetadata(record.offset() + 1));
  26. }
  27. }

注意

  • 再均衡发生期间,整个消费者组将停止消费
  • 再均衡可能会引起消息的重复消费

6.8 消费者拦截器

实现接口

org.apache.kafka.clients.consumer.ConsumerInterceptor

ConsumerInterceptor核心方法

  1. //consumer会在poll方法返回之前调用该方法,对消息进行相应的定制化操作,比如修改消息内容,过滤消息等;
  2. //该方法抛出异常不影响后续业务的执行
  3. public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  4. //提交完消费位移后调用该方法,可以使用这个方法几率跟踪所提交的位移信息,
  5. //比如调用了commitSync方法后,不知道提交消费位移的具体细节,可以在该方法中获取
  6. public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

设置消费者拦截器

properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,  "消费者拦截器的类路径");

6.9 多线程消费

  • KafkaProducer是线程安全的,而KafkaConsumer却是非线程安全
  • KafkaConsumer中只有wakeup方法是其他线程可以调用的。

实现消费端多线程消费的方式

方式一:每个线程实例化一个KafkaConsumer对象,线程数量应该要小于等于主题的分区数量

方式二:多个消费线程并发拉取同一个分区,通过assign、seek等方法实现,但是位移提交和顺序控制将变得复杂,不推荐

方式三:单线程拉取分区的消息,多线程并发处理拉取的消息

6.10 消费者参数总结

  • enable.auto.commit - 是否自动提交消费位移,默认为true
  • auto.commit.interval.ms - 自动提交消费位移的周期,默认5s,前提是上面的参数为true
  • auto.offset.reset - 如果消费者没有消费位置记录(或者位移越界),会根据该配置决定消费的位置,可选值:
    • latest   - 从分区末尾开始消费消息(默认值)
    • earliest  - 从分区头部开始消费消息
    • none - 抛出NoOffsetForPartitionException异常

  • request.timeout.ms - 用来设置消费者某些方法的统一超时时间,默认30s
  • fetch.min.bytes - Consumer在一次poll方法中能从Kafka中拉取的最小数据量,默认为1B
  • fetch.max.bytes - Consumer在一次poll方法中能拉取的最大数据量,默认为50MB
  • fetch.max.wait.ms - 如果kafka中没有足够多的消息而满足不了fetch.min.bytes参数的要求,等待的超时时间,默认500ms
  • max.partition.fetch.bytes - 配置从每个分区返回给Consumer的最大数据量,默认为1MB
  • max.poll.records - 配置Consumer在一次拉取中拉取的最大消息数,默认500条
  • connections.max.idle.ms - 指定多久后关闭空闲的连接,默认9分钟,设置为-1表示不关闭连接
  • exclude.internal.topics - kafka有两个内部主题,consumer_offsets和transaction_state, 该参数用来指定kafka中的内部主题是否可以向消费者公开,默认为true
  • receive.buffer.bytes - 用来设置Socket接收消息缓冲区的大小,默认为64KB,如果设置为-1,表示用操作系统的默认值,如果Consumer和Kafka服务器在不同的机房可以适当调大这个参数
  • send.buffer.bytes - 设置Socket发送消息缓存区的大小,默认128KB,设置为-1,表示用操作系统的默认值
  • request.timeout.ms - 配置Consumer等待请求响应的最长时间,默认30s
  • metadata.max.age.ms - 配置元数据最大过期时间,默认5分钟,过了这个时间会强制更新元数据信息
  • reconnect.backoff.ms - 配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁的连接主机,默认50ms
  • retry.backoff.ms - 配置尝试重新发送失败的请求到指定主题分区之前的退避时间,避免某些故障下频繁的重复发送,默认值为100ms
  • isolation.level - 用来配置消费者的事务隔离级别,可选值:
    • read_uncommitted(默认值):不会忽略事务未提交的消息,能消费到HW处的位置
    • read_committed:消费者会忽略事务未提交的消息,只能消费到LEO的位置

七、Kafka分区管理

7.1 分区/副本的相关概念

AR(Assigned Replicas)- 分区中所有的副本统称为AR
ISR(In-Sync Replicas)  - 所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成 ISR
OSR(Out-of-Sync Replicas)-  所有与leader副本同步滞后过多的副本(不包括leader副本在内)组成OSR
HW(High Watermark)- 高水位,标识了一个特定的消息偏移量,消费者只能拉取到这个offset之前的消息(只管ISR中的副本)
LEO(Log End Offset)- 标识当前分区中下一条待写入消息的offset



注意

  • leader 副本负责维护和跟踪 ISR 集合中所有 follower 的滞后状态, follower 副本落后或失效时, leader 副本会把它从 ISR 集合中剔除 ,OSR 集合中有 follower 副本 "追上"leader 副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合
  • broker端的参数replica.lag.time.max.ms(默认10000ms)来决定哪些副本将被移到OSR集合,当ISR集合中的一个follower副本滞后leader副本的时间超过这个值,就会被判定为失效副本
  • 如果手动新增分区副本,则新增的副本默认就在OSR集合中
  • ISR集合中的所有follower副本都同步完之后,才能被认为已经提交,才会更新分区的HW,消费者才能消费到最新消息
  • 当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader
  • 只有leader副本对外提供读写服务,而follower副本只负责在内部进行消息的同步

7.2 分区平衡

分区失衡问题

kafka集群中,如果一旦某个机器发生宕机,将导致该机器上的主题分区的leader节点转移到其他的机器节点上,如果该机器节点回复后重新加入集群,它只是变成一个新的follower节点,而不再对外提供服务。这就导致了分区负载失衡的情况。


分区自动平衡

  1. //broker端参数
  2. //是否开启分区自动平衡,默认为true
  3. auto.leader.rebalance.enable
  4. //如果某个broker节点分区不平衡的比例超过这个值,就会触发分区平衡的动作,默认10%
  5. //broker 中的不平衡率=非优先副本的 leader数/分区总数
  6. leader.imbalance.per.broker.percentage
  7. //定时扫描分区平衡的周期时间,默认300秒
  8. leader.imbalance.check.interval.seconds

分区手动平衡

  1. #kafka中的kafka-perferred-replica-election.sh脚本提供了对分区leader副本进行重新平衡的功能
  2. #命令如下:
  3. ./kafka-preferred-replica-election.sh --zookeeper zookeeper地址

注意

  • 生产环境中不建议开启分区自动平衡,因为自动平衡的触发时机不可控,如果在业务高峰期触发,将导致业务阻塞等风险
  • 分区平衡并不意味着负载均衡,因为每个分区的leader承担的消息数可能并不均衡

7.3 分区分配策略

Kafka允许消费者与订阅主题之间的分区分配策略,不同的分区分配策略,代表了消费者组的消费者与主题分区之间的对应方式。
消费者可以通过partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略

RangeAssignor策略(默认)

RangeAssignor策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

当消费者组订阅两个主题,并且每个主题有3个分区时


RoundRobinAssignor分配策略

RoundRobinAssignor 分配策略的原理是将消费组内所有消费者消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。

两个消费者订阅不同的主题时


StickyAssignor分配策略

StickyAssignor分配策略,大致和RoundRobinAssignor策略一致,但是有两个目标:

  • 分区的分配要尽可能均匀
  • 分区的分配尽可能与上次分配保持相同

第一目标优先级要高于第二目标

八、Kafka进阶

8.1 消息的过期时间(TTL)

Kafka本身没有提供消息过期时间的功能,但是开发者可以根据消费者拦截器自定义过期消息的实现


代码实现

生产端

  1. public class ProducerTTL {
  2. public static void main(String[] args) {
  3. Properties properties = new Properties();
  4. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.195.135:9092");
  5. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  6. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  7. KafkaProducer producer = new KafkaProducer(properties);
  8. //消息头部信息
  9. RecordHeaders headers = new RecordHeaders();
  10. headers.add("ttl", ByteUtils.long2Bytes(5000));
  11. ProducerRecord producerRecord = new ProducerRecord("my-topic", null, System.currentTimeMillis(), "ttl-msg", "5秒过期时间的消息", headers);
  12. producer.send(producerRecord, new Callback() {
  13. @Override
  14. public void onCompletion(RecordMetadata metadata, Exception exception) {
  15. if (metadata != null) {
  16. System.out.println("消息发送成功!" + metadata.topic() + " " + metadata.partition());
  17. }
  18. }
  19. });
  20. producer.close();
  21. }
  22. }

ByteUtils.java

  1. package com.qf.demo2;
  2. public class ByteUtils {
  3. /**
  4. * long 转byte 数组
  5. * @param num
  6. * @return
  7. */
  8. public static byte[] long2Bytes(long num){
  9. byte[] bytes = new byte[8];
  10. for (int i = 0; i < 8; i++) {
  11. int offset = 64 - (i + 1) * 8;
  12. bytes[i] = (byte)((num >> offset) & 0xff);
  13. }
  14. return bytes;
  15. }
  16. /**
  17. * byte数组转long类型
  18. */
  19. public static long byte2Long(byte[] bytes){
  20. long values = 0;
  21. for (int i = 0; i < 8; i++) {
  22. values <<= 8;
  23. values |= (bytes[i] & 0xff);
  24. }
  25. return values;
  26. }
  27. }

消费端拦截器

  1. package com.qf.demo2;
  2. import org.apache.kafka.clients.consumer.ConsumerInterceptor;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.common.TopicPartition;
  6. import org.apache.kafka.common.header.Header;
  7. import org.apache.kafka.common.header.Headers;
  8. import java.util.*;
  9. /**
  10. * description:
  11. * author: Ken
  12. * 公众号:Java架构栈
  13. */
  14. public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {
  15. @Override
  16. public ConsumerRecords onConsume(ConsumerRecords<String, String> records) {
  17. System.out.println("拦截器触发!");
  18. //获取当前时间
  19. long nowTime = System.currentTimeMillis();
  20. Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
  21. //循环所有主题分区
  22. Set<TopicPartition> partitions = records.partitions();
  23. for (TopicPartition partition : partitions) {
  24. //获取每个主题分区中的消息
  25. List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
  26. //新建的主题分区的消息
  27. List<ConsumerRecord<String, String>> newPartitionRecords = new ArrayList<>();
  28. //循环消息
  29. for (ConsumerRecord<String, String> record : partitionRecords) {
  30. //获得消息头
  31. Headers headers = record.headers();
  32. //过期时间
  33. long ttl = -1;
  34. //循环消息头
  35. for (Header header : headers) {
  36. if (header.key().equals("ttl")) {
  37. ttl = ByteUtils.byte2Long(header.value());
  38. break;
  39. }
  40. }
  41. if (ttl != -1 && nowTime > record.timestamp() + ttl) {
  42. //已经超时
  43. System.out.println("消息已经过期:" + record.value());
  44. continue;
  45. }
  46. //消息未超时 或者 没有超时时间
  47. newPartitionRecords.add(record);
  48. }
  49. if (!newPartitionRecords.isEmpty()) {
  50. newRecords.put(partition, newPartitionRecords);
  51. }
  52. }
  53. return new ConsumerRecords(newRecords);
  54. }
  55. @Override
  56. public void close() {
  57. }
  58. @Override
  59. public void onCommit(Map offsets) {
  60. }
  61. @Override
  62. public void configure(Map<String, ?> configs) {
  63. }
  64. }

8.2 延迟队列

kafka同样没有提供延迟队列的实现方式,需要开发者进一步进行封装实现

方案一


方案二

 

九、Kafka Stream - 流式计算

9.1 什么是Kafka Stream?

Kafka Stream是Apache Kafka从0.10版本引入的一个新功能,它可以对存储在Kafka内的数据进行流式处理和分析

特点:

  • Kafka Stream提供了一个很是简单而轻量的Library,它能够很是方便地嵌入任意Java应用中,也能够任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 经过可容错的state store实现高效的状态操做(如windowed join和aggregation)
  • 支持正好一次处理语义(exactly once)
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操做,而且可处理晚到的数据(late arrival of records)

9.2 什么是流式计算?

批量计算:一般先有全量数据集,然后定义计算逻辑,并将计算应用于全量数据。特点是全量计算,并且计算结果一次性全量输出。

流式计算:输入是持续的,一般先定义目标计算,然后数据到来之后将计算逻辑应用于数据,往往用增量计算代替全量计算。

9.3 相关概念

9.3.1 KTable 和 KStream

KTable - 表明一个完整的数据集,能够理解为数据库中的表。因为每条记录都是Key-Value对,这里能够将Key理解为数据库中的Primary Key,而Value能够理解为一行记录。能够认为KTable中的数据都是经过Update only的方式进入的。也就意味着,若是KTable对应的Topic中新进入的数据的Key已经存在,那么从KTable只会取出同一Key对应的最后一条数据,相当于新的数据更新了旧的数据。

KStream - 是一个数据流,能够认为全部记录都经过Insert only的方式插入进这个数据流里。

9.3.2 State store

流式处理中,部分操做是无状态的,例如过滤操做(Kafka Stream DSL中用filer方法实现)。而部分操做是有状态的,须要记录中间状态,如Window操做和聚合计算。
State store被用来存储中间状态。它能够是一个持久化的Key-Value存储,也能够是内存中的HashMap,或者是数据库。Kafka提供了基于Topic的状态存储

9.3.3 时间

在流式数据处理中,时间是数据的一个很是重要的属性。从Kafka 0.10开始,每条记录除了Key和Value外,还增长了timestamp属性。

目前Kafka Stream支持三种时间:

  • 事件发生时间:事件发生的时间包含在数据记录中。发生时间由Producer在构造ProducerRecord时指定。而且须要Broker或者Topic将message.timestamp.type设置为CreateTime(默认值)才能生效。
  • 消息接收时间:也即消息存入Broker的时间。当Broker或Topic将message.timestamp.type设置为LogAppendTime时生效。此时Broker会在接收到消息后,存入磁盘前,将其timestamp属性值设置为当前机器时间。通常消息接收时间比较接近于事件发生时间,部分场景下可代替事件发生时间。
  • 消息处理时间:也即Kafka Stream处理消息时的时间。

9.3.4 窗口(window)

流式数据是在时间上无界的数据。而聚合操做只能做用在特定的数据集,也即有界的数据集上。所以须要经过某种方式从无界的数据集上按特定的语义选取出有界的数据。窗口是一种很是经常使用的设定计算边界的方式

Kafka Stream支持的窗口以下:

  • Hopping Time Window - 它有两个属性,一个是Window size(窗口时长),一个是Advance interval(时间间隔)。Window size指定了窗口的大小,也即每次计算的数据集的大小。而Advance interval定义输出的时间间隔。一个典型的应用场景是,每隔5秒钟输出一次过去1个小时内网站的PV或者UV(浏览量 和 访客数)
  • Tumbling Time Window - 能够认为它是Hopping Time Window的一种特例,也即Window size和Advance interval相等。它的特色是各个窗口之间彻底不相交
  • Sliding Window - 该窗口只用于2个KStream进行Join计算时。该窗口的大小定义了Join两侧KStream的数据记录被认为在同一个窗口的最大时间差。假设该窗口的大小为5秒,则参与Join的2个KStream中,记录时间差小于5的记录被认为在同一个窗口中,能够进行Join计算。
  • Session Window - 该窗口用于对Key作Group后的聚合操做中。它须要对Key作分组,而后对组内的数据根据业务需求定义一个窗口的起始点和结束点。一个典型的案例是,通过Session Window计算某个用户访问网站的时间。对于一个特定的用户(用Key表示)而言,当发生登陆操做时,该用户(Key)的窗口即开始,当发生退出操做或者超时时,该用户(Key)的窗口即结束。窗口结束时,可计算该用户的访问时间或者点击次数等。

9.3.5 Join

kafka Stream因为包含KStream和Ktable两种数据集,所以提供以下Join计算

  • KTable Join KTable - 结果仍为KTable。任意一边有更新,结果KTable都会更新。
  • KStream Join KStream - 结果为KStream。必须带窗口操做,不然会形成Join操做一直不结束。
  • KStream Join KTable / GlobalKTable - 结果为KStream。只有当KStream中有新数据时,才会触发Join计算并输出结果。KStream无新数据时,KTable的更新并不会触发Join计算,也不会输出数据。而且该更新只对下次Join生效。一个典型的使用场景是,KStream中的订单信息与KTable中的用户信息作关联计算。

注意

对于Join操做,若是要获得正确的计算结果,须要保证参与Join的KTable或KStream中Key相同的数据被分配到同一个Task,具体做法如下:

  • 参与Join的KTable或KStream的Key类型相同(实际上,业务含意也应该相同)
  • 参与Join的KTable或KStream对应的Topic的Partition数相同
  • Partitioner(分区器)策略的最终结果等效(实现不须要彻底同样,只要效果同样便可),也即Key相同的状况下,被分配到ID相同的Partition内  

9.3.6 聚合与乱序处理

聚合操做可应用于KStream和KTable。当聚合发生在KStream上时必须指定窗口,从而限定计算的目标数据集聚合操做的结果确定是KTable。由于KTable是可更新的,能够在晚到的数据到来时(也即发生数据乱序时)更新结果KTable。

这里举例说明。假设对KStream以5秒为窗口大小,进行Tumbling Time Window上的Count操做。而且KStream前后出现时间为1秒, 3秒, 5秒的数据,此时5秒的窗口已达上限,Kafka Stream关闭该窗口,触发Count操做并将结果3输出到KTable中(假设该结果表示为<1-5,3>)。若1秒后,又收到了时间为2秒的记录,因为1-5秒的窗口已关闭,若直接抛弃该数据,则可认为以前的结果<1-5,3>不许确。而若是直接将完整的结果<1-5,4>输出到KStream中,则KStream中将会包含该窗口的2条记录,<1-5,3>, <1-5,4>,也会存在肮数据。所以Kafka Stream选择将聚合结果存于KTable中,此时新的结果<1-5,4>会替代旧的结果<1-5,3>。用户可获得完整的正确的结果。这种方式保证了数据准确性,同时也提升了容错性。

但须要说明的是,Kafka Stream并不会对全部晚到的数据都从新计算并更新结果集,而是让用户设置一个retention period(保留期),将每一个窗口的结果集在内存中保留必定时间,该窗口内的数据晚到时,直接合并计算,并更新结果KTable。超过retention period后,该窗口结果将从内存中删除,而且晚到的数据即便落入窗口,也会被直接丢弃。

9.3.7 容错

Kafka Stream从以下几个方面进行容错:

  • 高可用的Partition保证无数据丢失。每一个Task计算一个Partition,而Kafka数据复制机制保证了Partition内数据的高可用性,故无数据丢失风险。同时因为数据是持久化的,即便任务失败,依然能够从新计算。
  • 状态存储实现快速故障恢复和从故障点继续处理。对于Join和聚合及窗口等有状态计算,状态存储可保存中间状态。即便发生Failover或Consumer Rebalance,仍然能够经过状态存储恢复中间状态,从而能够继续从Failover或Consumer Rebalance前的点继续计算。
  • KTable与retention period提供了对乱序数据的处理能力。

9.4 Kafka Stream的执行架构

十、SpringBoot集成Kafka

SpringBoot集成Kafka依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

10.1 生产端

配置application.yml

  1. spring:
  2. kafka:
  3. #指定kafka集群地址
  4. bootstrap-servers: kafka地址:9092
  5. #设置生产者属性
  6. producer:
  7. #发布重试次数
  8. retries: 0
  9. #应答级别(可选01、all/-1
  10. acks: 1
  11. #批量发送的大小 - 16kb
  12. batch-size: 16384
  13. #生产缓冲区大小 - 32M
  14. buffer-memory: 33554432
  15. #设置key、value序列化方式
  16. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  17. value-serializer: org.apache.kafka.common.serialization.StringSerializer

创建Topic

  1. /**
  2. * 创建主题
  3. * @return
  4. */
  5. @Bean
  6. public NewTopic getNewTopic(){
  7. return new NewTopic("boottopic", 2, (short)1);
  8. }

发布消息

  1. @Autowired
  2. private KafkaTemplate kafkaTemplate;
  3. public void run(String... args) throws Exception {
  4. //发送消息
  5. kafkaTemplate.send("Topic名称", "消息内容");
  6. }

10.2 消费端

配置application.yml

  1. spring:
  2. kafka:
  3. bootstrap-servers: kafka地址:9092
  4. consumer:
  5. #是否自动提交offset
  6. enable-auto-commit: false
  7. #自动提交offset周期,开启自动提交才有效
  8. auto-commit-interval: 1000
  9. #当kafka中没有初始offset或者offset超出范围是自动重置offset
  10. #earliest:重置为分区中最小的offset
  11. #latest:重置为分区中最新的offset(默认)
  12. #none: 只要有一个分区不存在已经提交的offset,就抛出异常
  13. auto-offset-reset: latest
  14. #设置key和value反序列化方式
  15. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  16. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  17. listener:
  18. #消息的处理方式
  19. # single - 逐条消息处理
  20. # batch - 批处理消息
  21. type: single
  22. #应答模式 (提交位移)
  23. # record - 逐条应答
  24. # batch - 批量应答(对应poll方法 一次拉取的消息)
  25. # time - 周期性的自动应答 ack-time配置有关
  26. # count - 消息的数量,当消费到一定的消息数量后,就会自动的提交位移 ack-count有关
  27. # count_time - count和time谁先达到条件,就会触发一个位移提交
  28. # manual - 手动提交位移(批量)
  29. # manual_immediate - 手动提交位移(逐条)
  30. ack-mode: manual_immediate
  31. #并发数 最好和主题的分区数 对应
  32. concurrency: 2

配置消费端监听器

  1. package com.qf.kafka.consumer.application;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.kafka.support.Acknowledgment;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class ConsumerListener {
  8. /**
  9. * 可填参数:
  10. * ConsumerRecord<K,V> - 接收的消息
  11. * Acknowledgment - 手动确认
  12. * Consumer - 消费者对象
  13. *
  14. * @param record
  15. */
  16. @KafkaListener(topics = "Topic名称", groupId = "消费组名称")
  17. public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment){
  18. System.out.println("获得消息: 分区-" + record.partition() + " 偏移量-" + record.offset() +
  19. " key-" + record.key() + " value-" + record.value());
  20. //手动确认消息
  21. acknowledgment.acknowledge();
  22. }
  23. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/514975
推荐阅读
相关标签
  

闽ICP备14008679号