当前位置:   article > 正文

2021-05-20:kafka使用总结_kafka-console-producer

kafka-console-producer

一:windows环境使用kafka

      1:安装zookeeper

1.1:官网下载zookeeper

1.2:将zookeeper解压到本地目录,例:D:\Users\zk\apache-zookeeper-3.5.8-bin

1.3:将/conf文件夹下的zoo_sample.cfg文件复制一份,命名为zoo.cfg,此为zookeeper的配置文件

            

1.4:修改zoo.cfg配置文件中的dataDir和dataLogDir的值(注意:对应的目录需先创建),此分别为zk保存数据和日志的目录

1.5:启动zookeeper(请先确保本机已安装jdk),进入zk的/bin目录,双击:zkServer.cmd启动zk服务端,在双击zkCli.cmd启动zk客户端

注意:zkServer.cmd启动后不要关闭,zkCli.cmd启动后若有"Welcome to ZooKeeper",则表示zk启动正常

如果zkServer.cmd或者zkCli.cmd双击后一闪而逝,则可打开相应的cmd脚本,在文件末尾处加上pause,保存后再次双击启动,即可看到对应的错误信息


     2:安装kafka

      2.1:官网下载kafka(以0.9.0.1版本为例): Apache Kafka 

2.2:下载完成后,将压缩包解压到本地目录,例:D:\Users\kafka\kafka_2.10-0.9.0.1,可在和bin同级目录下新建log目录,用于存放kafka数据

2.3修改kafka配置文件,进入/config目录

    2.3.1:打开server.properties文件,修改如下配置

     配置说明:port---kafka端口号,host.name---本机地址, log.dirs---kafka数据存放地址, num.partitions---kafka的topic中的partitions数量

     zookeeper.connect---连接zk的地址,zookeeper.connection.timeout.ms---连接zk的超时时间

    2.3.2:打开consumer.properties,修改如下配置

      zookeeper.connect=127.0.0.1:2181  ---消费者的zk连接地址

    2.3.3:打开producer.properties,修改如下配置

      metadata.broker.list=localhost:9092  ---生产者向此broker地址发送消息

      2.4:启动kafka

          2.4.1:进入kafka的/bin/windows目录(D:\Users\kafka\kafka_2.10-0.9.0.1\bin\windows):

   在此处打开命名行窗口,输入:  .\bin\windows\kafka-server-start.bat .\config\server.properties ,启动kafka服务

   2.4.2: 进入kafka的/bin/windows目录(D:\Users\kafka\kafka_2.10-0.9.0.1\bin\windows),在此处打开命令行窗口,

   输入: kafka-console-consumer.bat --zookeeper 127.0.0.1:2181 --topic kafka_test_topic  (具体的zk地址和topic使用实际的配置),启动消费者

          2.4.3: 进入kafka的/bin/windows目录(D:\Users\kafka\kafka_2.10-0.9.0.1\bin\windows),在此处打开命令行窗口,

          输入: kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic kafak_test_topic  (具体的broker地址和topic使用实际的配置),启动生产者

          2.4.4: 可在生产者窗口中输入信息,查看消费者窗口中是否显示生产者中输入的消息,如果可以,则本机kafka运行正常

       2.5:若不启动 kafka-console-consumer.bat 和  kafka-console-producer.bat,则可使用java来测试生产者和消费者

          生产者:

  1. package com.example.gtx.kafka;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import java.util.Properties;
  9. public class KafkaProducerDemo {
  10. private static Logger logger = LoggerFactory.getLogger(KafkaProducerDemo.class);
  11. private static String KAFKA_TEST_TOPIC = "kafka_test_topic";
  12. public static void main(String[] args) {
  13. Properties properties = new Properties();
  14. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
  15. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  16. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  17. KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);
  18. String msg = "hello kafka, i am A";
  19. ProducerRecord<String, String> record = new ProducerRecord<>(KAFKA_TEST_TOPIC, msg);
  20. kafkaProducer.send(record);
  21. logger.info("=====send message success=====,message:{}",msg);
  22. /*int a =1;
  23. while(true){
  24. String msg = "hello kafka, i am "+a;
  25. ProducerRecord<String, String> record = new ProducerRecord<>(KAFKA_TEST_TOPIC, msg);
  26. kafkaProducer.send(record);
  27. a++;
  28. try {
  29. Thread.sleep(60000);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. break;
  33. }
  34. logger.info("=====send message success=====,message:{}",msg);
  35. }*/
  36. }
  37. }

消费者:

  1. package com.example.gtx.kafka;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.util.Arrays;
  10. import java.util.Properties;
  11. public class KafkaConsumerDemo {
  12. private static Logger logger = LoggerFactory.getLogger(KafkaConsumerDemo.class);
  13. private static String KAFKA_TEST_TOPIC = "kafka_test_topic";
  14. public static void main(String[] args) {
  15. Properties properties = new Properties();
  16. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  17. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  18. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  19. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_0");
  20. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  21. kafkaConsumer.subscribe(Arrays.asList(KAFKA_TEST_TOPIC));
  22. while (true) {
  23. ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
  24. for (ConsumerRecord<String, String> record : records) {
  25. logger.info("**********:topic={}, offset={}, value={};", record.topic(), record.offset(), record.value());
  26. }
  27. }
  28. }
  29. }

3.kafka相关介绍

kafka概述
Kafka 最初是由Linkedin 即领英公司基于Scala和 Java语言开发的分布式消息发布-订阅系统,现已捐献给Apache软件基金会。Kafka 最被广为人知的是作为一个 消息队列(mq)系统存在,而事实上kafka已然成为一个流行的分布式流处理平台。其具有高吞吐、低延迟的特性,许多大数据处理系统比如storm、spark、flink等都能很好地与之集成。按照Wikipedia上的说法,kafka的核心数据结构本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”。总的来讲,kafka通常具有3重角色:

消息系统

        Kafka和传统的消息队列比如RabbitMQ、RocketMQ、ActiveMQ类似,支持流量削锋、服务解耦、异步通信等核心功能。

流处理平台

       Kafka 不仅能够与大多数流式计算框架完美整合,并且自身也提供了一个完整的流式处理库,即kafka Streaming。kafka Streaming提供了类似Flink中的窗口、聚合、变换、连接等功能。

存储系统

       通常消息队列会把消息持久化到磁盘,防止消息丢失,保证消息可靠性。Kafka的消息持久化机制和多副本机制使其能够作为通用数据存储系统来使用。

一句话概括:Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),在业界主要应用于大数据实时处理领域。

Kafka体系结构
      如图所示,kafka的体系结构中通常包含多个Producer(生产者)、多个Consumer(消费者)、多个Broker(Kafka服务器)以及一个zookeeper集群。

                

体系结构中几个角色:

Producer

           消息发送方,即生产者,负责生产消息,并将其发送到kafka服务器(broker)中。

Consumer

           消息接收方,即消费者,负责消费消息。消费者客户端主动从kafka服务器上拉取(pull)到消息,应用程序进行业务处理。

Broker

           kafka服务实例,即kafka服务器,让生产者客户端、消费者客户端来连接,可以看做消息的中转站。多个Broker 将组成一个Kafka 集群。

ZooKeeper

           ZooKeeper 是在Kafka集群中负责管理集群元数据、控制器选举等操作的分布式协调器。

分区和主题
           Topic (主题)和Partition(分区)是Kafka 中的两个核心概念。在Kafka 中,消息以topic为单位进行归类。生产者必须将消息发送到指定的topic,即发送到Kafka 集群的每一条消息都必须指定一个主题;消费者消费消息也要指定主题,即消费者负责订阅主题并进行消费。

试想如果一个Topic在Kafka中只对应一个存储文件,那么海量数据场景下这个文件所在机器的I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。在Kafka中一个topic可以分为多个分区(partition),每个分区通常以分布式的方式存储在不同的机器上。一个特定的partition只属于一个topic。kafka通常用来处理超大规模数据,因此创建主题时可以立即指定多个分区以提高处理性能。当然也可以创建完成后再修改分区数。同一个主题的不同分区包含的消息是不同的。底层存储上,每一个分区对应一个可追加写的Log文件,消息在被追加到分区Log文件时会分配一个特定的offset (偏移量),offset 是消息在分区中的唯一标识, Kafka 通过offset 来保证消息在分区内的有序性。offset 并不跨越分区,即Kafka 保证的是分区有序而不是全局有序。
下图展示了消息的追加写入: 

Kafka中的分区可以分布在不同的服务器( broker )上,因此主题可以通过分区的方式跨越多个broker ,相比单个broker 、单个分区而言并行度增加了,性能提升不少。

在分区之下,Kafka又引入了副本(Replica)的概念。如果说增加分区数实现了水平扩展,增加副本数则是实现了纵向扩展,并提升了容灾能力。同一分区的不同副本中保存的消息是相同的。需要注意的是,在同一时刻,副本之间并非完全一样,因为同步存在延迟。副本之间是一主多从的关系。leader 副本负责处理读写请求, follower 副本只负责与leader 副本进行消息同步。副本存在不同的broker中,当leader 副本出现故障时,从follower 副本中重新选举新的leader 副本对外提供读写服务。Kafka 通过多副本机制实现了故障的自动转移,当Kafka 集群中某个broker 挂掉时,副本机制保证该节点上的partition数据不丢失,仍然能保证kafka服务可用。

下图展示了一个多副本的架构。本例中kafka集群中有4台broker,主题分区数为3,且副本因子也为3。生产者和消费者客户端只和leader副本进行交互,follower副本只负责和leader进行消息同步。每个分区都存在不同的broker中,如果每个broker单独部署一台机器的话,那么不同的Partition及其副本在物理上便是隔离的。


可以认为topic 是逻辑上的概念,partition是物理上的概念,因为每个partition 都对应于一个.log文件存储在kafka 的log目录下,该log 文件中存储的就是producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset(偏移位)。消费者组中的每个消费者,每次消费完数据都会向kafka服务器提交offset,以便出错恢复时从上次的位置继续消费。

消费组(Consumer Group)是Kafka的消费理念中一种特有的概念,每个消费者都属于一个消费组。生产者的消息发布到主题后,只会被投递给订阅该主题的每个消费组中的一个消费者。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;所有的消费者都有一个与之对应的消费者组,即消费者组是逻辑上的一个订阅者。消费者组之间互不影响,多个不同的消费者组可以同时订阅一个Topic,此时消息会同时被每个消费者组中一个消费者消费。

理解上述概念有助于在实际应用中规划topic分区数,消费者数、生产者数。实际生产中,一般分区数和消费者数保持相等,如果这个主题的消费者数大于主题的分区数,那么多出来的消费者将消费不到数据,只能浪费系统资源。

kafka文件存储机制
如前文所述,生产者生产的消息会不断追加到log 文件末尾,为防止log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个partition 分为多个segment。每个segment都有对应的.index文件和.log文件以及.timeindex文件。这些文件位于kafka的配置文件server.properties中配置项log.dirs所指定目录下的一个文件夹中,该文件夹的命名规则为:topic名称+分区序号。例如test这个topic 设置了三个分区,则会创建对应的文件夹test-0,test-1,test-2。每个文件下的索引和日志格式文件如下,index和log文件以当前segment 的第一条消息的offset 命名。

00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
00000000000000130610.index
00000000000000130610.log
00000000000000121343.timeindex

日志分段文件对应的两个索引文件主要是用来提高查找消息的速度。偏移量索引建立了消息位移(offset)和物理地址之间的映射关系;时间戳索引则方便根据指定时间戳查找偏移量信息。每写入一定量(kafka配置文件参数log.index.interval.bytes 指定,默认值为4096 ,即4KB )的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。可以通过配置log.index.interval.bytes 的值,增加或减小索引项的密度。

kafka高效读写的原因之一在于使用了顺序写磁盘技术和零拷贝技术。

1)顺序写盘

传统的消息中间件比如RabbitMQ使用内存作为默认的存储介质,而将磁盘作为备选介质,以此实现高吞吐、低延迟的特性。事实上有研究表明,同样的磁盘,顺序写速度能到600MB/s,而随机写只有100K/s。这与磁盘的机械特性有关,顺序写省去了大量磁头寻址时间。因此顺序写磁盘的速度甚至快于随机写内存。因此Kafka 在设计时采用了文件追加的方式来顺序写入消息到磁盘中。此外kafka还充分利用了磁盘页缓存来减少磁盘IO。

2)零拷贝

零拷贝是指将数据直接从磁盘复制到网卡设备中而无需经过应用程序。零拷贝大大提高了应用程序的性能,减少了系统在内核模式和用户模式之间的上下文切换。在netty等框架中也使用了Zero-Copy技术来提升IO性能。在Linux中,零拷贝依赖操作系统底层的sendfile()函数,其实JDK中的FileChannel.transferTo()方法底层实现即依赖sendfile()函数。

举个栗子,比如服务端要将本地文件传递给客户端,两种不同的技术流程分别如下:

传统非零拷贝技术
首先要调用read()系统函数将磁盘中的文件复制到内核态的Read Buffer中,在CPU的控制下,再将内核态数据复制到用户态下。然后调用系统函数write()将用户模式下的数据复制到内核模式下的Socket Buffer中。最后将内核态的Socket Buffer中的数据复制到硬件网卡设备中传输。上述过程中,数据白白地从内核态到用户态“浪”了一圈,即2次复制操作,4次内核态、用户态上下文切换。再来看看零拷贝是如何处理的。

零拷贝技术
零拷贝技术使用操作系统支持的DMA ( Direct Memory Access )技术将文件内容复制到内核态下的ReadBuffer 中。数据没有被复制到Socket Buffer。只有包含数据的位置和长度的信息的文件描述符传送到Socket Buffer中。数据直接从内核态传输到网卡外设中,操作系统的内核、用户态上下文切换只有2次,数据复制也减少了。
 


每个topic可以有多个partition, partition中选出一个Leader,该Leader负责消息的读写,其他partiontion从该Leader中pull新的消息
producer在发布消息到topic中时,先通过Zookeeper找到该topic的Partition中的Leader
producer将消息发给该Leader
Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。
因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。
Consumer读消息也是从Leader读取,只有被commit过的消息才会暴露给Consumer。

1、 producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader 
2、 producer 将消息发送给该 leader 
3、 leader 将消息写入本地 log 
4、 followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK 
5、 leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/78592
推荐阅读
相关标签
  

闽ICP备14008679号