当前位置:   article > 正文

Kafka复习_下器云e5c17c

下器云e5c17c

1. Kafka好的博文

https://mp.weixin.qq.com/s/oFEv5c5zO7NAMA3YYB3CrQ
https://www.jianshu.com/p/e5c17cd5989b
https://segmentfault.com/a/1190000038766024 kafkastream好的案例

2. Apache Kafka简介

Kafka is a distributed,partitioned,replicated commit logservice。

Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使你能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。

Kafka 是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。
在这里插入图片描述

  • (1)生产者和消费者(producer和consumer):消息的发送者叫Producer,消息的使用者和接受者是Consumer,生产者将数据保存到Kafka集群中,消费者从中获取消息进行业务的处理。
  • (2)broker:Kafka集群中有很多台Server,其中每一台Server都可以存储消息,将每一台Server称为一个kafka实例,也叫做broker。
  • (3)主题(topic):一个topic里保存的是同一类消息,相当于对消息的分类,每个producer将消息发送到kafka中,都需要指明要存的topic是哪个,也就是指明这个消息属于哪一类。
  • (4)分区(partition):每个topic都可以分成多个partition,每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部。为什么要进行分区呢?最根本的原因就是:kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。
  • (5)偏移量(Offset):一个分区对应一个磁盘上的文件,而消息在文件中的位置就称为offset(偏移量),offset为一个long型数字,它可以唯一标记一条消息。由于kafka并没有提供其他额外的索引机制来存储offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行“随机读写”。

总结一下Kafka的几个要点:

  • kafka是一个基于发布-订阅的分布式消息系统(消息队列)
  • Kafka面向大数据,消息保存在主题中,而每个topic有分为多个分区
  • kafak的消息数据保存在磁盘,每个partition对应磁盘上的一个文件,消息写入就是简单的文件追加,文件可以在集群内复制备份以防丢失
  • 即使消息被消费,kafka也不会立即删除该消息,可以通过配置使得过一段时间后自动删除以释放磁盘空间
  • kafka依赖分布式协调服务Zookeeper,适合离线/在线信息的消费,与storm和saprk等实时流式数据分析常常结合使用

3. kafka的应用场景

  • 消息系统。
    • Kafka 作为一款优秀的消息系统,具有高吞吐量、内置的分区、备份冗余分布式等特点,为大规模消息处理提供了一种很好的解决方案。
  • 应用监控。
    • 利用Kafka 采集应用程序和服务器健康相关的指标,如CPU 占用率、IO 、内存、连接数、TPS 、QPS 等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用Kafka 与ELK (Elastic Search 、Logstash 和Kibana)整合构建应用服务监控系统。
  • 网站用户行为追踪。
    • 为了更好地了解用户行为、操作习惯,改善用户体验,进而对产品升级改进,将用户操作轨迹、内容等信息发送到Kafka 集群上,通过Hadoop 、Spark 或Strom等进行数据分析处理,生成相应的统计报告,为推荐系统推荐对象建模提供数据源,进而为每个用户进行个性化推荐。
  • 流处理。
    • 需要将己收集的流数据提供给其他流式计算框架进行处理,用Kafka 收集流数据是一个不错的选择。
  • 持久性日志。
    • Kafka 可以为外部系统提供一种持久性日志的分布式系统。日志可以在多个节点间进行备份, Kafka 为故障节点数据恢复提供了一种重新同步的机制。同时, Kafka很方便与HDFS 和Flume 进行整合,这样就方便将Kafka 采集的数据持久化到其他外部系统。

4. Kafka基本原理

通过之前的介绍,我们对kafka有了一个简单的理解,它的设计初衷是建立一个统一的信息收集平台,使其可以做到对信息的实时反馈。Kafka is a distributed,partitioned,replicated commit logservice。接下来我们着重从几个方面分析其基本原理。

4.1. 分布式和分区(distributed、partitioned)

我们说kafka是一个分布式消息系统,所谓的分布式,实际上我们已经大致了解。消息保存在Topic中,而为了能够实现大数据的存储,一个topic划分为多个分区,每个分区对应一个文件,可以分别存储到不同的机器上,以实现分布式的集群存储。另外,每个partition可以有一定的副本,备份到多台机器上,以提高可用性。

总结起来就是:一个topic对应的多个partition分散存储到集群中的多个broker上,存储方式是一个partition对应一个文件,每个broker负责存储在自己机器上的partition中的消息读写。

4.2. 副本(replicated )

kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性,备份的数量可以通过配置文件指定。

这种冗余备份的方式在分布式系统中是很常见的,那么既然有副本,就涉及到对同一个文件的多个备份如何进行管理和调度。kafka采取的方案是:每个partition选举一个server作为“leader”,由leader负责所有对该分区的读写,其他server作为follower只需要简单的与leader同步,保持跟进即可。如果原来的leader失效,会重新选举由其他的follower来成为新的leader。

至于如何选取leader,实际上如果我们了解ZooKeeper,就会发现其实这正是Zookeeper所擅长的,Kafka 使用 ZK 在 Broker 中选出一个 Controller,用于 Partition 分配和 Leader 选举。

另外,这里我们可以看到,实际上作为leader的server承担了该分区所有的读写请求,因此其压力是比较大的,从整体考虑,从多少个partition就意味着会有多少个leader,kafka会将leader分散到不同的broker上,确保整体的负载均衡。

4.3. 整体数据流程

kafka 的总体数据流满足下图,该图可以说是概括了整个kafka的基本原理。
在这里插入图片描述

(1)数据生产过程(Produce)

对于生产者要写入的一条记录,可以指定四个参数:分别是topic、partition、key和value,其中topic和value(要写入的数据)是必须要指定的,而key和partition是可选的。

对于一条记录,先对其进行序列化,然后按照 Topic 和 Partition,放进对应的发送队列中。如果 Partition 没填,那么情况会是这样的:a、**Key 有填。**按照 Key 进行哈希,相同 Key 去一个 Partition。b、**Key 没填。**Round-Robin 来选 Partition。
在这里插入图片描述

producer将会和Topic下所有partition leader保持socket连接,消息由producer直接通过socket发送到broker。其中partition leader的位置(host:port)注册在zookeeper中,producer作为zookeeper client,已经注册了watch用来监听partition leader的变更事件,因此,可以准确的知道谁是当前的leader。

producer端采用异步发送:将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。

(2)数据消费过程(Consume)

对于消费者,不是以单独的形式存在的,每一个消费者属于一个consumer group,一个group包含多个consumer。特别需要注意的是:订阅Topic是以一个消费组来订阅的,发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费

如果所有的Consumer都具有相同的group,那么就像是一个点对点的消息系统;如果每个consumer都具有不同的group,那么消息会广播给所有的消费者。

具体说来,这实际上是根据partition来分的,一个 Partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费,消费组里的每个消费者是关联到一个partition的,因此有这样的说法:对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。

同一个消费组的两个消费者不会同时消费一个partition。

在这里插入图片描述

在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset)。

partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的。当消息被consumer接收之后,需要保存 Offset 记录消费到哪,以前保存在 ZK 中,由于 ZK 的写性能不好,以前的解决方法都是 Consumer 每隔一分钟上报一次,在 0.10 版本后,Kafka 把这个 Offset 的保存,从 ZK 中剥离,保存在一个名叫 consumeroffsets topic 的 Topic 中,由此可见,consumer客户端也很轻量级。

4.4. 消息传送机制

Kafka 支持 3 种消息投递语义,在业务中,常常都是使用 At least once 的模型。

  • **At most once:**最多一次,消息可能会丢失,但不会重复。
  • **At least once:**最少一次,消息不会丢失,可能会重复。
  • **Exactly once:**只且一次,消息不丢失不重复,只且消费一次。

Kafka的消息传输保障机制非常直观。当producer向broker发送消息时,一旦这条消息被commit,由于副本机制(replication)的存在,它就不会丢失。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经提交(commit)。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以retry多次,确保消息已经正确传输到broker中,所以目前Kafka实现的是at least once。consumer从broker中读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然也可以将consumer设置为autocommit,即consumer一旦读取到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了exactly once, 但是如果由于前面producer与broker之间的某种原因导致消息的重复,那么这里就是at least once。考虑这样一种情况,当consumer读完消息之后先commit再处理消息,在这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于at most once了。读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了,这就对应于at least once。

要做到exactly once就需要引入消息去重机制。Kafka文档中提及GUID(Globally Unique Identifier)的概念,通过客户端生成算法得到每个消息的unique id,同时可映射至broker上存储的地址,即通过GUID便可查询提取消息内容,也便于发送方的幂等性保证,需要在broker上提供此去重处理模块,目前版本尚不支持。针对GUID, 如果从客户端的角度去重,那么需要引入集中式缓存,必然会增加依赖复杂度,另外缓存的大小难以界定。不只是Kafka, 类似RabbitMQ以及RocketMQ这类商业级中间件也只保障at least once, 且也无法从自身去进行消息去重。所以我们建议业务方根据自身的业务特点进行去重,比如业务消息本身具备幂等性,或者借助Redis等其他产品进行去重处理。

5. API

5.1. Producer API

消息发送流程

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。

main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。
在这里插入图片描述

相关参数:
batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。

用到的类:

KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象

ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象

5.2.Consumer API

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。

需要用到的类:
KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象

5.3 数据漏消费和重复消费分析

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。

在这里插入图片描述

自定义存储 offset

Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在 Kafka的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。

offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。

当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先***获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。***

要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。

5.4 命令API

创建topic myStreamOut:

`kafka-topics.sh --create --zookeeper zhoujing:2181,node1:2181,node2:2181 --topic myStreamOut --partitions 1 --replication-factor 1` 

  • 1
  • 2

生产消息写入到myStreamOut:

`kafka-console-producer.sh --topic myStreamOut --broker-list node1:9092`
  • 1

消费myStreamOut里的数据:

`kafka-console-consumer.sh --topic myStreamOut --bootstrap-server node1:9092 --from-beginning` 
  • 1

6. kafka manager部署与使用

Kafka Manager下载地址 https://github.com/yahoo/kafka-manager/releases

编译:

tar zxvf CMAK-1.3.3.17.tar.gz
 cd CMAK-1.3.3.17
 ./sbt clean dist
  • 1
  • 2
  • 3

编译成功
在这里插入图片描述

编译好的包拿出来进行安装

cd /opt/software/CMAK-1.3.3.17/target/universal
拷贝安装包并解压

#修改配置文件
vim application.conf
#新增项,http访问服务的端口
http.port=19000
#修改成自己的zk机器地址和端口
kafka-manager.zkhosts="zhoujing:2181,node1:2181,node2:2181"

cd bin/
启动服务
./kafka-manager -Dconfig.file=../conf/application.conf
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

http://zhoujing:19000/

7. Kafka Stream

7.1 Kafka Stream概念及初识高层架构图

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,Kafka Stream就是一个用来做流计算的类库,与Storm、Spark Streaming、Flink的作用类似,但要轻量得多。

Kafka Stream的基本概念:

  • Kafka Stream是处理分析存储在Kafka数据的客户端程序库(lib)
  • 由于Kafka Streams是Kafka的一个lib,所以实现的程序不依赖单独的环境
  • Kafka Stream通过state store可以实现高效的状态操作
  • 支持原语Processor和高层抽象DSL

Kafka Stream的高层架构图:

在这里插入图片描述

  • Partition的数据会分发到不同的Task上,Task主要是用来做流式的并行处理
  • 每个Task都会有自己的state store去记录状态
  • 每个Thread里会有多个Task

7.2 Kafka Stream 核心概念

Kafka Stream关键词:

  • 流和流处理器:流指的是数据流,流处理器指的是数据流到某个节点时对其进行处理的单元
  • 流处理拓扑:一个拓扑图,该拓扑图展示了数据流的走向,以及流处理器的节点位置
  • 源处理器及Sink处理器:源处理器指的是数据的源头,即第一个处理器,Sink处理器则反之,是最终产出结果的一个处理器

在这里插入图片描述

Kafka Stream完整的高层架构图:
在这里插入图片描述

从上图中可以看到,Consumer对一组Partition进行消费,这组Partition可以在一个Topic中或多个Topic中。然后形成数据流,经过各个流处理器后最终通过Producer输出到一组Partition中,同样这组Partition也可以在一个Topic中或多个Topic中。这个过程就是数据流的输入和输出。

因此,我们在使用Stream API前需要先创建两个Topic,一个作为输入,一个作为输出。到服务器上使用命令行创建两个Topic:

由于之前依赖的kafka-clients包中没有Stream API,所以需要另外引入Stream的依赖包。在项目中添加如下依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.Properties;

public class TestStream {
    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";
    public static Properties config = null;

    @Before
    public void before(){
        config = new Properties();
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    }

    @Test
    public void createKafkaStreams(){

        // 构建流结构拓扑
        StreamsBuilder builder = new StreamsBuilder();

        // 构建wordcount的processor
        wordCountStream(builder);

        Topology topology = builder.build();

        // 构建KafkaStreams
        KafkaStreams streams  = new KafkaStreams(topology, config);

        // 启动该Stream
        streams.start();
    }

    private void wordCountStream(StreamsBuilder builder) {
        // 不断的从INPUT_TOPIC上获取新的数据,并追加到流上的一个抽象对象
        KStream<String, String> source = builder.stream(INPUT_TOPIC);

        KTable<String, Long> count = source.flatMapValues(
                // 以空格为分隔符将字符串进行拆分
                v -> Arrays.asList(v.toLowerCase().split(" "))
                // 按value进行分组统计
        ).groupBy((k, v) -> v).count();

        KStream<String, Long> sink = count.toStream();
        sink.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));


    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

KTable和KSteam

KTable和KSteam是Kafka中非常重要的概念,在此分析一下二者区别。

  • KStream是一个数据流,可以认为所有的记录都通过Insert only的方式插入进这个数据流中。
  • KTable代表一个完整的数据集,可以理解为数据库中的表。每条记录都是KV键值对,key可以理解为数据库中的主键,是唯一的,而value代表一条记录。我们可以认为KTable中的数据时通过Update only的方式进入的。如果是相同的key,会覆盖掉原来的那条记录。
  • 综上来说,KStream是数据流,来多少数据就插入多少数据,是Insert only;KTable是数据集,相同key只允许保留最新的记录,也就是Update only

运行以上代码,然后到服务器中使用kafka-console-producer.sh脚本命令向input-topic生产一些数据,如下:

 kafka-console-producer.sh --broker-list node1:9092 --topic input-topic
>Hello World Java
>Hello World Kafka
>Hello Java Kafka
>Hello Java
  • 1
  • 2
  • 3
  • 4
  • 5

然后再运行kafka-console-consumer.sh脚本命令从output-topic中消费数据,并进行打印。具体如下:

kafka-console-consumer.sh --bootstrap-server 172.21.0.10:9092 --topic output-topic --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
  • 1

控制台输出的结果:

world   2
hello   3
java    2
kafka   2
hello   4
java    3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

窗口

流式数据在时间上无界的,但是聚合操作只能作用在特定(有界)的数据集,咋整???这时候就有了窗口的概念,在时间无界的数据流中定义一个边界来用于计算。Kafka支持的窗口如下:

  • 1)Hopping Time Window:举一个典型的应用场景,每隔5秒钟输出一次过去1个小时内网站的PV或者UV。里面有两个时间1小时和5秒钟,1小时指定了窗口的大小(Window size),5秒钟定义输出的时间间隔(Advance interval)。
  • 2)Tumbling Time Window:可以认为是Hopping Time Window的一种特例,窗口大小=输出时间间隔,它的特点是各个Window之间完全不相交。
  • 3)Sliding Window该窗口只用于2个KStream进行Join计算时。该窗口的大小定义了Join两侧KStream的数据记录被认为在同一个窗口的最大时间差。假设该窗口的大小为5秒,则参与Join的2个KStream中,记录时间差小于5的记录被认为在同一个窗口中,可以进行Join计算。
  • 4)Session Window该窗口用于对Key做Group后的聚合操作中。它需要对Key做分组,然后对组内的数据根据业务需求定义一个窗口的起始点和结束点。一个典型的案例是,希望通过Session Window计算某个用户访问网站的时间。对于一个特定的用户(用Key表示)而言,当发生登录操作时,该用户(Key)的窗口即开始,当发生退出操作或者超时时,该用户(Key)的窗口即结束。窗口结束时,可计算该用户的访问时间或者点击次数等。
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号