赞
踩
(1)高吞吐量、低延迟: kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个主题可以分多个分区,消费组对分区进行消费操作;
(2)可扩展性: kafka 集群支持热扩展;
(3)持久性、可靠性: 消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
(4)容错性: 允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
(5)高并发: 支持数千个客户端同时读写;
一个公司可以用 Kafka 可以收集各种服务的 l0g,通过kafka以统一接口服务的方式开放给各种 consumer,例如 Hadoop、Hbase、Solr 等;
解耦和生产者和消费者、缓存消息等:
Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 Hadoop、数据仓库中做离线分析和挖掘;
Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
比如 spark streaming 和 storm。
1)可伸缩性: Kafka 的两个重要特性造就了它的可伸缩性。
2)容错性和可靠性:
Kafka 的设计方式使某个代理的故障能够被集群中的其他代理检测到。由于每个主题都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此类故障中恢复并继续运行。
3)吞吐量: 代理能够以超快的速度有效地存储和检索数据
生产者即数据的发布者,该角色将消息发布到 Kafka 的 topic 中。broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中。生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition。
消费者可以从 broker 中读取数据。消费者可以消费多个 topic 中的数据。
在 Kafka 中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为 topic。如果把Kafka看做为一个数据库,topic 可以理解为数据库中的一张表,topic 的名字即为表名。
topic 中的数据分割为一个或多个 partition。每个 topic 至少有一个 partition。每个 partition 中的数据使用多个 segment 文件存储。partition 中的数据是有序的,partition 间的数据丢失了数据的顺序。如果 topic 有多个 partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将 partition 数目设为 1。
每条消息都有一个当前 Partition 下唯一的64字节的 offset,它指明了这条消息的起始位置。
副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为 folower 的 partition 中消费数据,而是从为 leader 的 partition 中读取数据。副本之间是一主多从的关系。
Kafka 集群包含一个或多个服务器,服务器节点称为 broker。broker 存储 topic 的数据。如果某 topic 有N个 partition,集群有N个 broker,那么每个 broker 存储该 topic 的一个 partition。如果某 topic 有N个 partition,集群有(N+M)个 broker,那么其中有N个 broker 存储该 topic 的一个 partition,剩下的M个 broker 不存储该 topic 的 partition 数据。如果某 topic 有N个 partition,集群中 broker 数目少于N个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。
每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 partition。
Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。如果 Leader 失效,则从 Follower 中选举出一个新的Leader。当 Follower 与 Leader 挂掉、卡住或者同步太慢,leader 会把这个 follower 从 “in syncreplicas"(ISR) 列表中删除,重新创建一个 Follower。
Zookeeper 负责维护和协调 broker。当 Kafka 系统中新增了 broker 或者某个 broker 发生故障失效时,由 ZooKeeper 通知生产者和消费者。生产者和消费者依据 Zookeeper 的 broker 状态信息与 broker 协调数据的发布和订阅任务。
分区中所有的副本统称为 AR。
所有与 Leader 部分保持一定程度的副(包括 Leader 副本在内)本组成 ISR。
与 Leader 副本同步滞后过多的副本。
高水位,标识了一个特定的 offset,消费者只能拉取到这个 offset 之前的消息。
即日志末端位移(log end offset),记录了该副本底层日志(l0g)中下一条消息的位移值。注意是下一条消息!也就是说,如果 LEO=10,那么表示该副本保存了10条消息,位移值范围是[0,9]。
Oracle官网下载JDK
https://www.oracle.com/technetwork/java/javase/downloads/jdk12-downloads-5295953.html
Java 开发工具包_JDK 各个版本 下载:
jdk.java.net
http://jdk.java.net/
Oracle 官网:
https://www.oracle.com/java/technologies/javase-downloads.html
JDK-7 下载:
http://jdk.java.net/java-se-ri/7
JDK-8 下载:
https://jdk.java.net/java-se-ri/8-MR5
JDK-9 下载:
http://jdk.java.net/java-se-ri/9
Java 平台标准版 9 参考实现
为Java SE 9(官方参考实现JSR 379)仅在从现有的开源代码是基于JDK 9项目在OpenJDK的社区。
JDK-10 下载:
http://jdk.java.net/java-se-ri/10
JDK-11 下载:
http://jdk.java.net/java-se-ri/11
JDK-12 下载:
http://jdk.java.net/java-se-ri/12
JDK-13 下载:
http://jdk.java.net/java-se-ri/13
JDK-14 下载:
http://jdk.java.net/java-se-ri/14
JDK-15 下载:
http://jdk.java.net/java-se-ri/15
JDK-16 下载:
http://jdk.java.net/java-se-ri/16
sudo vim /etc/profile # 添加 jdk 环境变量配置 export JAVA_HOME=/opt/java/jdk-12.0.1 export JRE_HOME-${JAVA_HOME}/jre #export MAVEN_HOME=/opt/maven/apache-maven-3.5.0 export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib #export PATH=S{JAVA_HOME}/bin:${MAVEN_HOME}/bin:$PATH export PATH=S{JAVA_HOME}/bin:$PATH # 或者简单配置 export JAVA_HOME=/usr/local/java/jdk-12 export PATH=${JAVA_HOME}/bin:$PATH # 配置完 JDK 记得断开连接重新连接 或者重启系统。 # 测试 jdk 是否安装配置成功 java -version
1)Zookeeper 是安装 Kafka 集群的必要组件,Kafka 通过 Zookeeper 来实施对元数据信息的管理,包括集群、主题分区等内容。
2)同样在官网下载安装包到指定目录解压缩,步骤如下:
ZooKeeper 官网: http://zookeeper.apache.org
https://github.com/apache/zookeeper/tags?after=release-3.8.0-1
3)把 zookeeper 上传至服务器,并解压
# 切换目录:
cd /usr/local/zookeeper/
# 解压即安装
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz -C /usr/local/zookeeper/
4)修改 Zookeeper 的配置文件,首先进入安装路径 conf 目录,并将 zoo_sample.cfg 文件修改为 zoo.cfg,并对核心参数进行配置。文件内容如下:
# 切换目录 cd /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/conf/ # 将 zoo_sample.cfg 文件修改为 zoo.cfg mv zoo_sample.cfg zoo.cfg # 对核心参数进行配置 vim zoo.cfg # The number of milliseconds of each tick #zk服务器的心跳时间 tickTime-2080 # The number of ticks that the initial #synchronization phase can take #投票选举新Leader的初始化时间 initlimit=10 # The number of ticks that can pass between # sendidg a request and getting an acknowledgementsyncLimit-5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. # 数据目录(需要新建此目录) dataDir=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin/data # 日志目录 dataLogDir=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin/log # the port at which the clients will connect #Zookeeper对外服务端口,保持默认 clientPort-2181
# 切换目录
cd /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/
# 启动 Zookeeper
bin/zkServer.sh start
# 查询 zookeeper 是否启动成功
ps -ef | grep zookeeper
# 或者
jps -l
官网下载地址: https://kafka.apache.org/downloads
# 切换目录
cd /usr/local/kafka/
# 解压即安装
tar -zxvf kafka-2.12.2.8.0.tar.gz -C /usr/local/kafka/
# 切换目录 cd /usr/local/kafka/kafka_2.12-2.8.0/ # 修改 kafka 配置参数: vim config/server.properties # server.properties 配置中需要关注以下几个参数: # 表示 broker 的编号,如果集群中有多个 broker,则每个 broker 的编号需要设置的不同 broker.id=0 # brokder 对外提供的服务入口地址( kafka 监听地址:你的虚拟机 IP 地址) # listeners=PLAINTEXT://127.0.0.1:9092 listeners=PLAINTEXT://172.18.30.110:9092 # 设置存放消息日志文件的地址 log.dirs=/tmp/kafka/log # Kafka 所需 Zookeeper 集群地址 zookeeper.connect=lolalhost:2181
# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 启动 kafka
bin/kafka-server-start.sh config/server.properties
# 重新打开一个终端
# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 查询 kafka 是否启动成功
ps -ef | grep kafka
# 或者
jps -l
1)命令: bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1
2)参数说明:
–zookeeper :指定了 kafka 所连接的 zookeeper 服务地址。
–topic : 指定了所要创建主题的名称
–partitions : 指定了分区个数
–replication-factor : 指定了副本因子
–create : 创建主题的动作指令。
# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 创建一个名为 heima 的 主题
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --create --topic heima --partitions 2 --replication-factor 1
# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 展示出当前所有主题
bin/kafka-topics.sh --zookeeper 1ocalhost:2181 --1ist
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --1ist
# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 查看主题详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic heima
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --describe --topic heima
命令: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heima
参数说明:
–bootstrap-server 指定了连接 Kafka 集群的地址
–topic 指定了消费端订阅的主题
# 重新打开一个终端:
# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 启动消费端接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heima
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-console-consumer.sh --bootstrap-server 172.18.30.110:9092 --topic heima
命令: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic heima
参数说明:
–broker-list 指定了连接的 Kafka 集群的地址
–topic 指定了发送消息时的主题
# 重新打开一个终端:
# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 生产端发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic heima
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-console-producer.sh --broker-list 172.18.30.110:9092 --topic heima
--> idea --> File
--> New --> Project
--> Maven
Project SDK: ( 1.8(java version "1.8.0_131" )
--> Next
--> Groupld : ( djh.it )
Artifactld : ( kafka_learn )
Version : 1.0-SNAPSHOT
--> Name: ( kafka_learn )
Location: ( ...\kafka_learn\ )
--> Finish
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>djh.it</groupId> <artifactId>kafka_learn</artifactId> <version>1.0-SNAPSHOT</version> <name>kafka_learn</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.8.RELEASE</version> <relativePath></relativePath> </parent> <properties> <java.version>8</java.version> <!-- <scala.version>2.11</scala.version>--> <scala.version>2.12</scala.version> <slf4j.version>1.7.21</slf4j.version> <!-- <kafka.version>2.0.0</kafka.version>--> <kafka.version>2.8.0</kafka.version> <lombok.version>1.18.8</lombok.version> <junit.version>4.11</junit.version> <gson.version>2.2.4</gson.version> <protobuff.version>1.5.4</protobuff.version> <!-- <spark.version>2.3.1</spark.version>--> <spark.version>2.4.8</spark.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.version}</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>${gson.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>${protobuff.version}</version> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>${protobuff.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.4</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.9.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> <!-- kafka_learn\pom.xml -->
/** * kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ProducerFastStart.java * * 2024-6-21 创建 生产者 ProducerFastStart.java 类 */ package djh.it.kafka.learn.chapter1; import io.protostuff.StringSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerFastStart { //private static final String brokerList = "localhost:9092"; private static final String brokerList = "172.18.30.110:9092"; private static final String topic = "heima"; public static void main( String[] args ) { Properties properties = new Properties(); //1)设置 key 序列化器 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //2)设置重试次数 properties.put(ProducerConfig.RETRIES_CONFIG, 10); //3)设置值序列化器 properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //4)设置集群地址 properties.put("bootstrap.servers", brokerList); //properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties); ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo", "hello-kafka-test!"); try{ producer.send(record); }catch (Exception e){ e.printStackTrace(); } producer.close(); } }
/** * kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ConsumerFastStart.java * * 2024-6-21 创建 消费者 ConsumerFastStart.java 类 */ package djh.it.kafka.learn.chapter1; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class ConsumerFastStart { //private static final String brokerList = "localhost:9092"; private static final String brokerList = "172.18.30.110:9092"; private static final String topic = "heima"; private static final String groupId = "group.demo"; public static void main( String[] args ) { Properties properties = new Properties(); //1)设置 key 序列化器 properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //2)设置值序列化器 properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //3)设置集群地址 properties.put("bootstrap.servers", brokerList); properties.put("group.id", groupId); KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Collections.singletonList(topic)); while (true){ ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord<String,String> record : records){ System.out.println(record.value()); } } } }
/** * kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ProducerFastStart.java * * 2024-6-21 创建 生产者 ProducerFastStart.java 类 */ package djh.it.kafka.learn.chapter1; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; //注意导包,一定要导成 kafka 的序列化包 import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class ProducerFastStart { //private static final String brokerList = "localhost:9092"; private static final String brokerList = "172.18.30.110:9092"; private static final String topic = "heima"; public static void main( String[] args ) { Properties properties = new Properties(); //1)设置 key 序列化器 -- 优化代码 //properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //2)设置重试次数 -- 优化代码 properties.put(ProducerConfig.RETRIES_CONFIG, 10); //3)设置值序列化器 -- 优化代码 //properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //4)设置集群地址 -- 优化代码 //properties.put("bootstrap.servers", brokerList); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties); ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo", "优化-2024-6-21-kafka-test!"); try{ producer.send(record); }catch (Exception e){ e.printStackTrace(); } producer.close(); } }
/** * kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ConsumerFastStart.java * * 2024-6-21 创建 消费者 ConsumerFastStart.java 类 */ package djh.it.kafka.learn.chapter1; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; //注意导包,一定要导成 kafka 的序列化包 import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class ConsumerFastStart { //private static final String brokerList = "localhost:9092"; private static final String brokerList = "172.18.30.110:9092"; private static final String topic = "heima"; private static final String groupId = "group.demo"; public static void main( String[] args ) { Properties properties = new Properties(); //1)设置 key 序列化器 -- 优化代码 //properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //2)设置值序列化器 -- 优化代码 //properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //3)设置集群地址 -- 优化代码 //properties.put("bootstrap.servers", brokerList); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); //properties.put("group.id", groupId); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Collections.singletonList(topic)); while (true){ ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord<String,String> record : records){ System.out.println(record.value()); } } } }
kafka 启动前,在进行配置 kafka 参数时,即修改 kafka/kafka_2.12-2.8.0/config/server.properties 配置文件时,需要注意以下几点:
指明 Zookeeper 主机地址,如果 zookeeper 是集群则以逗号隔开,如:172.6.14.61:2181,172.6.14.62:2181,172.6.14.63:21B1
broker 对外提供服务时绑定的 IP 和端口。多个以逗号隔开,如果监听器名称不是一个安全的协议,listener.security.protocol.map 也必须设置。主机名称设置 0.0.0.0 绑定所有的接口,主机名称为空则绑定默认的接口。如:PLAINTEXT://myhost:9092,SSL:/1:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
broker 的唯一标识符,如果不配置则自动生成,建议配置且一定要保证集群中必须唯一,默认-11.4.4
日志数据存放的目录,如果没有配置则使用 log.dir,建议此项配置。
服务器接受单个消息的最大大小,默认 1000012 约等于 976.6KB。
# 切换目录 cd /usr/local/kafka/kafka_2.12-2.8.0/ # 修改 kafka 配置参数: vim config/server.properties # server.properties 配置中需要关注以下几个参数: # 1)Kafka 所需 Zookeeper 集群地址 zookeeper.connect=lolalhost:2181 # 2)listeners 监听列表 配置:brokder 对外提供的服务入口地址( kafka 监听地址:你的虚拟机 IP 地址) # listeners=PLAINTEXT://127.0.0.1:9092 listeners=PLAINTEXT://172.18.30.110:9092 # 3)表示 broker 的编号,如果集群中有多个 broker,则每个 broker 的编号需要设置的不同 broker.id=0 # 4)log.dirs 设置存放消息日志文件的地址 log.dirs=/tmp/kafka/log
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。