赞
踩
- version: '3'
- services:
- zookeeper-kafka:
- image: wurstmeister/zookeeper
- restart: unless-stopped
- hostname: zookeeper-kafka
- container_name: zookeeper-kafka
- ports:
- - 2181:2181
- volumes:
- - ./data:/data
- - ./datalog:/datalog
- kafka:
- image: wurstmeister/kafka:2.12-2.4.1
- depends_on:
- - zookeeper-kafka
- container_name: kafka
- ports:
- - 9092:9092
- environment:
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_BROKER_ID: 0
- KAFKA_ZOOKEEPER_CONNECT: 192.168.200.135:2181
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.135:9092
- KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
- restart: unless-stopped
- volumes:
- - ./docker.sock:/var/run/docker.sock
ip要改成自己的ip 云服务器需要开放端口 出现报错可以查看docker容器日志排查
1. 将Kafka的安装包上传到虚拟机,并解压
cd /export/software/ tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/ cd /export/server/kafka_2.12-2.4.1/ |
2. 修改 server.properties
cd /export/server/kafka_2.12-2.4.1/config vim server.properties # 指定broker的id broker.id=0 # 指定Kafka数据的位置 log.dirs=/export/server/kafka_2.12-2.4.1/data # 配置zk的三个节点 zookeeper.connect=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181 |
3. 将安装好的kafka复制到另外两台服务器
cd /export/server scp -r kafka_2.12-2.4.1/ node2.itcast.cn:$PWD scp -r kafka_2.12-2.4.1/ node3.itcast.cn:$PWD 修改另外两个节点的broker.id分别为1和2 ---------node2.itcast.cn-------------- cd /export/server/kafka_2.12-2.4.1/config vim erver.properties broker.id=1 --------node3.itcast.cn-------------- cd /export/server/kafka_2.12-2.4.1/config vim server.properties broker.id=2 |
4. 配置KAFKA_HOME环境变量
vim /etc/profile export KAFKA_HOME=/export/server/kafka_2.12-2.4.1 export PATH=:$PATH:${KAFKA_HOME} 分发到各个节点 scp /etc/profile node2.itcast.cn:$PWD scp /etc/profile node3.itcast.cn:$PWD 每个节点加载环境变量 source /etc/profile |
5. 启动服务器
# 启动ZooKeeper nohup bin/zookeeper-server-start.sh config/zookeeper.properties & # 启动Kafka cd /export/server/kafka_2.12-2.4.1 nohup bin/kafka-server-start.sh config/server.properties & # 测试Kafka集群是否启动成功 bin/kafka-topics.sh --bootstrap-server node1.itcast.cn:9092 --list |
注意:
kafka的目录结构:
目录名称 | 说明 |
bin | Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等 |
config | Kafka的所有配置文件 |
libs | 运行Kafka所需要的所有JAR包 |
logs | Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息 |
site-docs | Kafka的网站帮助文件 |
工具使用步骤
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等。
官网: EFAK
采用Docker进行安装
docker pull nickzurich/efak:latest
docker run -d --name kafka-eagle -p 8048:8048 -e EFAK_CLUSTER_ZK_LIST="116.62.237.97:2181" nickzurich/efak:latest
EFAK_CLUSTER_ZK_LIST:配置的是zookeeper服务的地址
JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户可以在任何Java应用程序中使用这些代理和服务实现管理。很多的一些软件都提供了JMX接口,来实现一些管理、监控功能。
在启动Kafka的脚本前,添加:
- cd ${KAFKA_HOME}
- export JMX_PORT=9988
- nohup bin/kafka-server-start.sh config/server.properties &
安装JDK,并配置好JAVA_HOME。
将kafka_eagle上传,并解压到 /export/server 目录中。
- cd cd /export/software/
- tar -xvzf kafka-eagle-bin-1.4.6.tar.gz -C ../server/
- cd /export/server/kafka-eagle-bin-1.4.6/
- tar -xvzf kafka-eagle-web-1.4.6-bin.tar.gz
- cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
配置kafka_eagle 环境变量。
- vim /etc/profile
- export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
- export PATH=$PATH:$KE_HOME/bin
- source /etc/profile
配置kafka_eagle。使用vi打开conf目录下的system-config.properties
- vim conf/system-config.properties
- # 修改第4行,配置kafka集群别名
- kafka.eagle.zk.cluster.alias=cluster1
- # 修改第5行,配置ZK集群地址
- cluster1.zk.list=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181
- # 注释第6行
- #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
-
- # 修改第32行,打开图标统计
- kafka.eagle.metrics.charts=true
- kafka.eagle.metrics.retain=30
-
- # 注释第69行,取消sqlite数据库连接配置
- #kafka.eagle.driver=org.sqlite.JDBC
- #kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
- #kafka.eagle.username=root
- #kafka.eagle.password=www.kafka-eagle.org
-
- # 修改第77行,开启mys
- kafka.eagle.driver=com.mysql.jdbc.Driver
- kafka.eagle.url=jdbc:mysql://node1.itcast.cn:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
- kafka.eagle.username=root
- kafka.eagle.password=123456
配置JAVA_HOME
- cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin
- vim ke.sh
- # 在第24行添加JAVA_HOME环境配置
- export JAVA_HOME=/export/server/jdk1.8.0_241
修改Kafka eagle可执行权限
- cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin
- chmod +x ke.sh
启动kafka_eagle。
./ke.sh start
访问Kafka eagle,默认用户为admin,密码为:123456
http://ip:8048/ke
Kafka度量指标
topic list
点击Topic下的List菜单,就可以展示当前Kafka集群中的所有topic。
指标 | 意义 |
Brokers Spread | broker使用率 |
Brokers Skew | 分区是否倾斜 |
Brokers Leader Skew | leader partition是否存在倾斜 |
生产者消息总计
- <!-- kafka客户端工具 -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.4.1</version>
- </dependency>
小tips 将日志输出到文件
<!-- SLF桥接LOG4J日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>
<!-- SLOG4J日志-->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
log4j.rootLogger=INFO,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
- // 1. 创建用于连接Kafka的Properties配置
- Properties props = new Properties();
- props.put("bootstrap.servers", "116.62.237.97:9092");
- props.put("acks", "all");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- // 2. 创建一个生产者对象KafkaProducer
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
-
- // 3. 发送1-100的消息到指定的topic中
- for (int i = 0; i < 100; ++i) {
- // 构建一条消息,直接new ProducerRecord
- ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
- Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
- // 调用Future的get方法等待响应
- future.get();
- System.out.println("第" + i + "条消息写入成功!");
- }
-
- // 4.关闭生产者
- kafkaProducer.close();
- /**
- * 消费者程序
- * <p>
- * 1.创建Kafka消费者配置
- * Properties props = new Properties();
- * props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("enable.auto.commit", "true");
- * props.setProperty("auto.commit.interval.ms", "1000");
- * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- * <p>
- * 2.创建Kafka消费者
- * 3.订阅要消费的主题
- * 4.使用一个while循环,不断从Kafka的topic中拉取消息
- * 5.将将记录(record)的offset、key、value都打印出来
- */
- public class KafkaConsumerOne {
- public static void main(String[] args) {
- // 1.创建Kafka消费者配置
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", "116.62.237.97:9092");
- // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
- // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
- props.setProperty("group.id", "test");
- // 自动提交offset
- props.setProperty("enable.auto.commit", "true");
- // 自动提交offset的时间间隔
- props.setProperty("auto.commit.interval.ms", "1000");
- // 拉取的key、value数据的
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- // 2.创建Kafka消费者
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
-
- // 3. 订阅要消费的主题
- // 指定消费者从哪个topic中拉取数据
- kafkaConsumer.subscribe(Arrays.asList("test"));
-
- // 4.使用一个while循环,不断从Kafka的topic中拉取消息
- while (true) {
- // Kafka的消费者一次拉取一批的数据
- ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
- // 5.将将记录(record)的offset、key、value都打印出来
- for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
- // 主题
- String topic = consumerRecord.topic();
- // offset:这条消息处于Kafka分区中的哪个位置
- long offset = consumerRecord.offset();
- // key\value
- String key = consumerRecord.key();
- String value = consumerRecord.value();
-
- System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
- }
- }
- }
- }
- // 二、使用异步回调的方式发送消息
- ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
- kafkaProducer.send(producerRecord, new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- // 1. 判断发送消息是否成功
- if(exception == null) {
- // 发送成功
- // 主题
- String topic = metadata.topic();
- // 分区id
- int partition = metadata.partition();
- // 偏移量
- long offset = metadata.offset();
- System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset);
- }
- else {
- // 发送出现错误
- System.out.println("生产消息出现异常!");
- // 打印异常消息
- System.out.println(exception.getMessage());
- // 打印调用栈
- System.out.println(exception.getStackTrace());
- }
- }
- });
props.put("enable.idempotence",true);
- // 开启事务必须要配置事务的ID
- props.put("transactional.id", "dwd_user");
- // 配置事务的隔离级别
- props.put("isolation.level","read_committed");
- // 关闭自动提交,一会我们需要手动来提交offset,通过事务来维护offset
- props.setProperty("enable.auto.commit", "false");
如果使用了事务,不要使用异步发送
- public class TransactionProgram {
- public static void main(String[] args) {
- // 1. 调用之前实现的方法,创建消费者、生产者对象
- KafkaConsumer<String, String> consumer = createConsumer();
- KafkaProducer<String, String> producer = createProducer();
-
- // 2. 生产者调用initTransactions初始化事务
- producer.initTransactions();
-
- // 3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
- while(true) {
- try {
- // (1) 生产者开启事务
- producer.beginTransaction();
-
- // 这个Map保存了topic对应的partition的偏移量
- Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
-
- // 从topic中拉取一批的数据
- // (2) 消费者拉取消息
- ConsumerRecords<String, String> concumserRecordArray = consumer.poll(Duration.ofSeconds(5));
- // (3) 遍历拉取到的消息,并进行预处理
- for (ConsumerRecord<String, String> cr : concumserRecordArray) {
- // 将1转换为男,0转换为女
- String msg = cr.value();
- String[] fieldArray = msg.split(",");
-
- // 将消息的偏移量保存
- // 消费的是ods_user中的数据
- String topic = cr.topic();
- int partition = cr.partition();
- long offset = cr.offset();
-
- int i = 1 / 0;
-
- // offset + 1:offset是当前消费的记录(消息)对应在partition中的offset,而我们希望下一次能继续从下一个消息消息
- // 必须要+1,从能消费下一条消息
- offsetMap.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset + 1));
-
- // 将字段进行替换
- if(fieldArray != null && fieldArray.length > 2) {
- String sexField = fieldArray[1];
- if(sexField.equals("1")) {
- fieldArray[1] = "男";
- }
- else if(sexField.equals("0")){
- fieldArray[1] = "女";
- }
- }
-
- // 重新拼接字段
- msg = fieldArray[0] + "," + fieldArray[1] + "," + fieldArray[2];
-
- // (4) 生产消息到dwd_user topic中
- ProducerRecord<String, String> dwdMsg = new ProducerRecord<>("dwd_user", msg);
- // 发送消息
- Future<RecordMetadata> future = producer.send(dwdMsg);
- try {
- future.get();
- } catch (Exception e) {
- e.printStackTrace();
- producer.abortTransaction();
- }
- // new Callback()
- // {
- // @Override
- // public void onCompletion(RecordMetadata metadata, Exception exception) {
- // // 生产消息没有问题
- // if(exception == null) {
- // System.out.println("发送成功:" + dwdMsg);
- // }
- // else {
- // System.out.println("生产消息失败:");
- // System.out.println(exception.getMessage());
- // System.out.println(exception.getStackTrace());
- // }
- // }
- // });
- }
-
- producer.sendOffsetsToTransaction(offsetMap, "ods_user");
-
- // (6) 提交事务
- producer.commitTransaction();
- }catch (Exception e) {
- e.printStackTrace();
- // (7) 捕获异常,如果出现异常,则取消事务
- producer.abortTransaction();
- }
- }
- }
-
- // 一、创建一个消费者来消费ods_user中的数据
- private static KafkaConsumer<String, String> createConsumer() {
- // 1. 配置消费者的属性(添加对事务的支持)
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
- props.setProperty("group.id", "ods_user");
- // 配置事务的隔离级别
- props.put("isolation.level","read_committed");
- // 关闭自动提交,一会我们需要手动来提交offset,通过事务来维护offset
- props.setProperty("enable.auto.commit", "false");
- // 反序列化器
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- // 2. 构建消费者对象
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
-
- // 3. 订阅一个topic
- kafkaConsumer.subscribe(Arrays.asList("ods_user"));
-
- return kafkaConsumer;
-
- }
-
- // 二、编写createProducer方法,用来创建一个带有事务配置的生产者
- private static KafkaProducer<String, String> createProducer() {
- // 1. 配置生产者带有事务配置的属性
- Properties props = new Properties();
- props.put("bootstrap.servers", "node1.itcast.cn:9092");
- // 开启事务必须要配置事务的ID
- props.put("transactional.id", "dwd_user");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- // 2. 构建生产者
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
-
- return kafkaProducer;
- }
- }
乱序问题
自定义分区策略:
- public class KeyWithRandomPartitioner implements Partitioner {
-
- private Random r;
-
- @Override
- public void configure(Map<String, ?> configs) {
- r = new Random();
- }
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // cluster.partitionCountForTopic 表示获取指定topic的分区数量
- return r.nextInt(1000) % cluster.partitionCountForTopic(topic);
- }
-
- @Override
- public void close() {
- }
- }
- props.
- put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyWithRandomPartitioner.class.getName());
分区分配策略:保障每个消费者尽量能够均衡地消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费的分区特别少
producer是不断地往Kafka中写入数据,写入数据会有一个返回结果,表示是否写入成功。这里对应有一个ACKs的配置。
- Properties props = new Properties();
- props.put("bootstrap.servers", "node1.itcast.cn:9092");
- props.put("acks", "all");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
根据业务情况来选择ack机制,是要求性能最高,一部分数据丢失影响不大,可以选择0/1。如果要求数据一定不能丢失,就得配置为-1/all。
分区中是有leader和follower的概念,为了确保消费者消费的数据是一致的,只能从分区leader去读写消息,follower做的事情就是同步数据,Backup。
- /**
- * 消费者程序:从test主题中消费数据
- */
- public class _2ConsumerTest {
- public static void main(String[] args) {
- // 1. 创建Kafka消费者配置
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", "192.168.88.100:9092");
- props.setProperty("group.id", "test");
- props.setProperty("enable.auto.commit", "true");
- props.setProperty("auto.commit.interval.ms", "1000");
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- // 2. 创建Kafka消费者
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-
- // 3. 订阅要消费的主题
- consumer.subscribe(Arrays.asList("test"));
-
- // 4. 使用一个while循环,不断从Kafka的topic中拉取消息
- while (true) {
- // 定义100毫秒超时
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records)
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- }
- }
- String topic = "test";
- TopicPartition partition0 = new TopicPartition(topic, 0);
- TopicPartition partition1 = new TopicPartition(topic, 1);
- consumer.assign(Arrays.asList(partition0, partition1));
bin/kafka-leader-election.sh --bootstrap-server node1.itcast.cn:9092 --topic test --partition=2 --election-type preferred
Flink里面有对应的每种不同机制的保证,提供Exactly-Once保障(二阶段事务提交方式)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。