当前位置:   article > 正文

【新星计划】Kafka分布式发布订阅消息系统_kafka发布订阅

kafka发布订阅

Kafka分布式发布订阅消息系统

 

目录

Kafka分布式发布订阅消息系统

1. 概述

1.1 点对点消息传递模式

1.2 发布-订阅消息传递模式

1.3 Kafka特点

1.4 kafka拓扑图

2. Kafka工作原理

2.1 Kafka核心组件介绍

2.2 Kafka工作流程分析

2.2.1 生产者生产消息过程

2.2.2 消费者消费消息过程

2.2.3 Kafka Topics

2.2.4 Kafka Partition

2.2.4 Kafka Partition offset

2.2.5 offset存储机制

2.2.6 Consumer group

3. kafka安装

3.1 修改配置文件

3.2 分发安装包

3.3 再次修改文件

3.4 启动集群​​​​​​​

4. Kafka常用操作命令

5. Kafka API操作

6. Kafka Streams概述

7. Kafka Streams开发单词计数


1. 概述

Kafka是由Apache软件基金会开发的一个开源流处理平台,它由Scala和Java语言编写,是一个基于Zookeeper系统的分布式发布订阅消息系统,该项目的设计初衷是为实时数据提供一个统一、高通量、低等待的消息传递平台。

主要应用场景是:日志收集系统和消息系统。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。

有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。

大部分的消息系统选用发布-订阅模式。

image-20220415165306944

1.1 点对点消息传递模式

在点对点消息系统中,消息持久化到一个queue(队列)中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。

image-20220415165426911

1.2 发布-订阅消息传递模式

在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic(分类),消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。

image-20220415165500991

1.3 Kafka特点

高吞吐率。即使在廉价的商用机器上也能做到单机支持每秒100000条消息的传输。

支持消息分区,及分布式消费,同时保证每个分区内消息顺序传输。

同时支持离线数据处理和实时数据处理。

Scale out:支持在线水平扩展

 

1.4 kafka拓扑图

image-20220415170030877

一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干Broker(Kafka支持水平扩展,一般broker(代理)数量越多,集群吞吐率越高),若干Consumer,以及一个Zookeeper集群。

Kafka通过Zookeeper管理集群配置,选举Leader,以及在Consumer发生变化时进行rebalance。Producer使用push模式将消息发布到Broker,Consumer使用pull模式从Broker订阅并消费消息。

Broker:Kafka集群包含一个或多个服务实例,这些服务实例被称为Broker。

Producer:负责发布消息到Kafka Broker。

Consumer:消息消费者,从Kafka Broker读取消息的客户端。

2. Kafka工作原理

2.1 Kafka核心组件介绍

组件名称相关说明
Topic特定类别消息流称为主题,数据存在主题中,主题被拆分成分区
Partition主题的数据分割为一个或多个分区,每个分区的数据使用多个segment文件存储,分区中的数据是有序的
Offset每个分区消息具有的唯一序列标识
Replica副本只是一个分区的备份,它们用于防止数据丢失
Producer生产者即数据发布者,该角色将消息发布到Kafka集群主题中
组件名称相关说明
Consumer消费者可从Broker中读取数据,可消费多个主题数据
Broker每个Kafka服务节点都为Broker,Broker接收消息后,将消息追加到segment文件中
Leader负责分区的所有读写操作
Follower跟随领导指令,若Leader发生故障则选一个Follower为新Leader
Consumer Group实现一个主题消息的广播和单播的手段

2.2 Kafka工作流程分析

2.2.1 生产者生产消息过程

生产者向Kafka集群中生产消息。Producer是消息的生产者,通常情况下,数据消息源可是服务器日志、业务数据及Web服务数据等,生产者采用推送的方式将数据消息发布到Kafka的主题中,主题本质就是一个目录,而主题是由Partition Logs(分区日志)组成,每条消息都被追加到分区中。

image-20220415170514059

  1. Producer先读取Zookeeper的“/brokers/.../state”节点中找到该Partition的Leader。
  2. Producer将消息发送给Leader。
  3. Leader负责将消息写入本地分区Log文件中。
  4. Follower从Leader中读取消息,完成备份操作。
  5. Follower写入本地Log文件后,会向Leader发送Ack,每次发送消息都会有一个确认反馈机制,以确保消息正常送达。
  6. Leader收到所有Follower发送的Ack后,向Producer发送Ack,生产消息完成。
2.2.2 消费者消费消息过程

Kafka采用拉取模型,由消费者记录消费状态,根据主题、Zookeeper集群地址和要消费消息的偏移量,每个消费者互相独立地按顺序读取每个分区的消息,消费者消费消息的流程图如下所示。

image-20220415170757232

2.2.3 Kafka Topics

每条发布到Kafka的消息都有一个类别,这个类别被称为Topic,也可以理解为一个存储消息的队列。例如:天气作为一个Topic,每天的温度消息就可以存储在“天气”这个队列里。

image-20220415171031826

每条发布到Kafka的消息都有一个类别,这个类别被称为Topic,也可以理解为一个存储消息的队列。例如:天气作为一个Topic,每天的温度消息就可以存储在“天气”这个队列里。

图片中的蓝色框为Kafka的一个Topic,即可以理解为一个队列,每个格子代表一条消息。生产者产生的消息逐条放到Topic的末尾。消费者从左至右顺序读取消息,使用Offset来记录读取的位置。

2.2.4 Kafka Partition

为了提高Kafka的吞吐量,物理上把Topic分成一个或多个Partition,每个Partition都是有序且不可变的消息队列。每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。

image-20220415171152326

每个topic被分成多个partition(区),每个partition在存储层面对应一个log文件,log文件中记录了所有的消息数据。

引入Partition机制,保证了Kafka的高吞吐能力,因为Topic的多个Partition分布在不同的Kafka节点上,这样一来多个客户端(Producer和Consumer)就可以并发访问不同的节点对一个Topic进行消息的读写。

2.2.4 Kafka Partition offset

每条消息在文件中的位置称为offset (偏移量),offset是一个long型数字,它唯一标记一条消息。消费者通过 (offset、partition、topic) 跟踪记录。

image-20220415171256089

任何发布到此Partition的消息都会被直接追加到log文件的尾部。

2.2.5 offset存储机制

Consumer在从broker读取消息后,可以选择commit,该操作会在Kakfa中保存该Consumer在该Partition中读取的消息的offset。

该Consumer下一次再读该Partition时会从下一条开始读取。

通过这一特性可以保证同一消费者从Kafka中不会重复消费数据。

2.2.6 Consumer group

每个consumer都属于一个consumer group,每条消息只能被consumer group中的一个Consumer消费,但可以被多个consumer group消费。即组间数据是共享的,组内数据是竞争的。

image-20220415171432108

Consumer group A有两个消费者来读取4个Partition中数据;Consumer group B 有四个消费者来读取4个Partition中数据。

Topic的Partition数量可以在创建时配置。

Partition数量决定了每个Consumer group中并发消费者的最大数量。

3. kafka安装

  1. #上传并解压
  2. [root@master apps]# tar -zxvf kafka_2.12-1.1.0.tgz
  3. #修改文件名
  4. [root@master apps]# mv kafka_2.12-1.1.0 kafka

3.1 修改配置文件

server.properties

  1. zookeeper.connect=master:2181,slave01:2181,slave02:2181
  2. server.properties参考
  3. -----------------------------------------------------------------
  4. broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
  5. port=19092 #当前kafka对外提供服务的端口默认是9092
  6. host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
  7. num.network.threads=3 #这个是borker进行网络处理的线程数
  8. num.io.threads=8 #这个是borker进行I/O处理的线程数
  9. log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
  10. socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
  11. socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
  12. socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
  13. num.partitions=1 #默认的分区数,一个topic默认1个分区数
  14. log.retention.hours=168 #默认消息的最大持久化时间,168小时,7
  15. message.max.byte=5242880 #消息保存的最大值5M
  16. default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
  17. replica.fetch.max.bytes=5242880 #取消息的最大直接数
  18. log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
  19. log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
  20. log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
  21. zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

3.2 分发安装包

  1. [root@master apps]$ scp -r kafka hd@slave01:/home/hd/apps
  2. [root@master apps]$ scp -r kafka hd@slave02:/home/hd/apps

3.3 再次修改文件

依次修改各服务器上配置文件的的broker.id,分别是1,2,3不得重复。

3.4 启动集群
​​​​​​​

  1. 先启动zookeeper
  2. 第一台:zkServer.sh start
  3. 第二台:zkServer.sh start
  4. 第三台:zkServer.sh start
  5. 启动kafka服务
  6. 第一台:[root@master bin]$./kafka-server-start.sh -daemon ../config/server.properties
  7. 第二台:[root@master bin]$./kafka-server-start.sh -daemon ../config/server.properties
  8. 第三台:[root@master bin]$./kafka-server-start.sh -daemon ../config/server.properties
  9. 关闭kafka服务
  10. 第一台:[root@master bin]$./kafka-server-stop.sh
  11. 第二台:[root@master bin]$./kafka-server-stop.sh
  12. 第三台:[root@master bin]$./kafka-server-stop.sh

4. Kafka常用操作命令

  1. #创建一个kafka类别
  2. [root@master bin]# ./kafka-topics.sh --create --zookeeper master:2181,slave01:2181,slave02:2181 --replication-factor 1 --partitions 1 --topic newtitle
  3. -------------------------------------------------------------------------------
  4. #--replication-factor 2 #复制两份
  5. #--partitions 1 #创建1个分区
  6. #--topic #主题
  7. -------------------------------------------------------------------------------
  8. #修改一个kafka类别
  9. [root@master bin]# ./kafka-topics.sh --alter --topic newtitle --zookeeper master:2181,slave01:2181,slave02:2181 --partitions 3
  10. #删除类别
  11. [root@master bin]# .kafka-topics.sh --delete --topic newtitle --zookeeper master:2181,slave01:2181,slave02:2181
  12. #不能删除
  13. #彻底删除一个topic,需要在server.properties中配置delete.topic.enable=true,否则只是标记删除
  14. #查看所有的类别
  15. [root@master bin]# ./kafka-topics.sh --list --zookeeper master:2181,slave01:2181,slave02:2181
  16. [root@master bin]# ./kafka-topics.sh --describe --topic newtitle --zookeeper master:2181,slave01:2181,slave02:2181
  17. -------------------------------------------------------------------------------
  18. Topic:newtitle PartitionCount:3 ReplicationFactor:1 Configs:
  19. Topic: newtitle Partition: 0 Leader: 2 Replicas: 2 Isr: 2
  20. Topic: newtitle Partition: 1 Leader: 3 Replicas: 3 Isr: 3
  21. Topic: newtitle Partition: 2 Leader: 1 Replicas: 1 Isr: 1
  22. PartitionCount:topic对应的partition的个数
  23. ReplicationFactor:topic对应的副本因子,说白就是副本个数
  24. Partition:partition编号,从0开始递增
  25. Leader:当前partition起作用的breaker.id
  26. Replicas: 当前副本数据坐在的breaker.id,是一个列表,排在最前面的其作用
  27. Isr:当前kakfa集群中可用的breaker.id列表
  28. -------------------------------------------------------------------------------
  29. #生产者
  30. [root@master bin]# ./kafka-console-producer.sh --broker-list master:9092 --topic newtitle
  31. #消费者
  32. [root@master bin]# ./kafka-console-consumer.sh --zookeeper master:2181,slave01:2181,slave02:2181 --topic newtitle --from-beginning

5. Kafka API操作

kafkaProducer常用API

方法名称相关说明
abortTransaction()终止正在进行的事物
close()关闭这个生产者
flush()调用此方法使所有缓冲的记录立即发送
partitionsFor (java.lang.String topic)获取给定主题的分区元数据
send (ProducerRecord<K,V> record)异步发送记录到主题

KafkaConsumer常用API

方法名称相关说明
subscribe(java.util.Collection<java.lang.String> topics)订阅给定主题列表以获取动态分区
close()关闭这个消费者
wakeup()唤醒消费者
metrics()获取消费者保留的指标
listTopics()获取有关用户有权查看的所有主题的分区的元数据

 

导入Maven依赖
​​​​​​​

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka_2.12</artifactId>
  4. <version>1.0.0</version>
  5. </dependency>

ProducerTest.java

  1. package org.example;
  2. import org.apache.kafka.clients.producer.Callback;
  3. import org.apache.kafka.clients.producer.KafkaProducer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.clients.producer.RecordMetadata;
  6. import java.util.Properties;
  7. public class ProducerTest {
  8. public static void main(String[] args) throws InterruptedException {
  9. Properties props = new Properties();
  10. props.put("bootstrap.servers","192.168.20.52:9092,192.168.20.39:9092,192.168.20.56:9092");
  11. props.put("acks", "all"); // 记录完整提交,最慢的但是最大可能的持久化
  12. props.put("retries", 3); // 请求失败重试的次数
  13. props.put("batch.size", 16384); // batch的大小
  14. props.put("linger.ms", 1);
  15. // 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
  16. props.put("buffer.memory", 33554432); // 提供给生产者缓冲内存总量
  17. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 序列化的方式,
  18. // ByteArraySerializer或者StringSerializer
  19. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  20. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  21. for (int i = 0; i < 10000; i++) {
  22. // 三个参数分别为topic, key,value,send()是异步的,添加到缓冲区立即返回,更高效。
  23. producer.send(new ProducerRecord<String, String>("mytopic", "key" + i, "value" + i));
  24. System.out.println("===>正在发送key->"+i);
  25. Thread.sleep(1000);
  26. }
  27. producer.close();
  28. }
  29. }

ConsumerTest.java

  1. package org.example;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.util.Arrays;
  6. import java.util.Properties;
  7. public class ConsumerTest {
  8. public static void main(String[] args) {
  9. Properties props = new Properties();
  10. props.put("bootstrap.servers","192.168.20.52:9092,192.168.20.39:9092,192.168.20.56:9092");
  11. props.put("group.id", "test"); // cousumer的分组id
  12. props.put("enable.auto.commit", "true"); // 自动提交offsets
  13. props.put("auto.commit.interval.ms", "1000"); // 每隔1s,自动提交offsets
  14. props.put("session.timeout.ms", "30000"); // Consumer向集群发送自己的心跳,超时则认为Consumer已经死了,kafka会把它的分区分配给其他进程
  15. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 反序列化器
  16. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  17. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  18. consumer.subscribe(Arrays.asList("mytopic")); // 订阅的topic,可以多个
  19. while (true) {
  20. ConsumerRecords<String, String> records = consumer.poll(100);
  21. for (ConsumerRecord<String, String> record : records) {
  22. System.out.printf(
  23. "offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
  24. System.out.println();
  25. }
  26. }
  27. }
  28. }

6. Kafka Streams概述

Kafka Streams是Apache Kafka开源的一个流处理框架,基于Kafka的生产者和消费者,为开发者提供流式处理能力,具有低延迟性、高扩展性、弹性、容错的特点,易于集成到现有应用程序中。它是一套处理分析Kafka中存储数据的客户端类库,处理完的数据可重新写回Kafka,也可发送给外部存储系统。

在流式计算框架模型中,通常需要构建数据流的拓扑结构,例如生产数据源、分析数据的处理器及处理完后发送的目标节点,Kafka流处理框架同样将“输入主题自定义处理器输出主题”抽象成一个DAG拓扑图。

 

生产者作为数据源不断生产和发送消息至Kafka的testStreams1主题中,通过自定义处理器对每条消息执行相应计算逻辑,最后将结果发送到Kafka的testStreams2主题中供消费者消费消息数据。

7. Kafka Streams开发单词计数

  1. 打开pom.xml文件,添加Kafka Streams依赖。
  2. 创建LogProcessor类,并继承Streams API中的Processor接口,实现单词计数业务逻辑。
  3. 单词计数的业务功能开发完成后,Kafka Streams需要编写一个运行主程序的类App,用来测试LogProcessor业务程序。
  4. 在hadoop01节点创建testStreams1和testStreams2主题。
  5. 分别在hadoop01和hadoop02节点启动生产者服务和消费者服务。
  6. 运行App主程序类。在生产者服务节点(hadoop01)中输入测试语句,返回消费者服务节点(hadoop02)中查看执行结果。

 

Pom.xml

添加依赖的时候注意版本号,避免兼容性问题

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.0.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka-streams</artifactId>
  9. <version>2.0.0</version>
  10. </dependency>x

LogProcessor.java

public void init (ProcessorContext processorContext) : 初始化上下文对象

public void process(byte[] key, byte[] value) :接收到一条消息时,会调用该方法处理并更新状态进行存储

public void close() :资源清理

  1. import org.apache.kafka.streams.processor.Processor;
  2. import org.apache.kafka.streams.processor.ProcessorContext;
  3. import java.util.HashMap;
  4. public class LogProcessor implements Processor<byte[],byte[]> {
  5. //上下文对象
  6. private ProcessorContext processorContext;
  7. @Override
  8. public void init(ProcessorContext processorContext) {
  9. //初始化方法
  10. this.processorContext=processorContext;
  11. }
  12. @Override
  13. public void process(byte[] key, byte[] value) {
  14. //处理一条消息
  15. String inputOri = new String(value);
  16. HashMap <String,Integer> map = new HashMap<String,Integer>();
  17. int times = 1;
  18. if(inputOri.contains(" ")){
  19. //截取字段
  20. String [] words = inputOri.split(" ");
  21. for (String word : words){
  22. if(map.containsKey(word)){
  23. map.put(word,map.get(word)+1);
  24. }else{
  25. map.put(word,times);
  26. }
  27. }
  28. }
  29. inputOri = map.toString();
  30. processorContext.forward(key,inputOri.getBytes());
  31. }
  32. @Override
  33. public void close() {}
  34. }
 
      

App.java

  1. import org.apache.kafka.streams.KafkaStreams;
  2. import org.apache.kafka.streams.StreamsConfig;
  3. import org.apache.kafka.streams.Topology;
  4. import org.apache.kafka.streams.processor.Processor;
  5. import org.apache.kafka.streams.processor.ProcessorSupplier;
  6. import java.util.Properties;
  7. public class App {
  8. public static void main(String[] args) {
  9. //声明来源主题
  10. String fromTopic = "testStreams1";
  11. //声明目标主题
  12. String toTopic = "testStreams2";
  13. //设置参数
  14. Properties props = new Properties();
  15. props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");
  16. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave01:9092,slave02:9092");
  17. //实例化StreamsConfig
  18. StreamsConfig config = new StreamsConfig(props);
  19. //构建拓扑结构
  20. Topology topology = new Topology();
  21. //添加源处理节点,为源处理节点指定名称和它订阅的主题
  22. topology.addSource("SOURCE",fromTopic)
  23. //添加自定义处理节点,指定名称,处理器类和上一个节点的名称
  24. .addProcessor("PROCESSOR", new ProcessorSupplier() {
  25. @Override
  26. public Processor get() {//调用这个方法,就知道这条数据用哪个process处理,
  27. return new LogProcessor();
  28. }
  29. },"SOURCE")
  30. //添加目标处理节点,需要指定目标处理节点的名称,和上一个节点名称。
  31. .addSink("SINK",toTopic,"PROCESSOR");//最后给SINK
  32. //实例化KafkaStreams
  33. KafkaStreams streams = new KafkaStreams(topology,config);
  34. streams.start();
  35. }
  36. }

执行测试

代码编写完成之后,在master节点创建testStreams1,testStreams2主题

#创建来源主题

[root@master bin]# ./kafka-topics.sh --create --zookeeper master:2181,slave01:2181,slave02:2181  --replication-factor 1 --partitions 1 --topic testStreams1

#创建目标主题

[root@master bin]# ./kafka-topics.sh --create --zookeeper master:2181,slave01:2181,slave02:2181  --replication-factor 1 --partitions 1 --topic testStreams2

创建好主题之后,分别在master节点和slave01节点启动生产者服务和消费者服务。

#启动生产者服务命令

[root@master bin]# ./kafka-console-producer.sh --broker-list master:2181,slave01:2181,slave02:2181 --topic   testStreams1 
1

#启动消费者服务命令

[root@master bin]# ./kafka-console-consumer.sh --zookeeper master:2181,slave01:2181,slave02:2181  --topic testStreams2 --from-beginning

最后,运行App主程序类,至此我们完成kafka streams所需的环境测试。

在生产者服务节点输入“hello kafka hello spark hello hadoop ” ,返回消费者节点查看运行结果。

关注博主下篇更精彩

一键三连!!!

一键三连!!!

一键三连!!!
感谢一键三连!!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/416243
推荐阅读
相关标签
  

闽ICP备14008679号