赞
踩
目录
分享一下这半年学习kafka的笔记,先提前安装好ZooKeeper,方法请参见文章CentOS7安装zookeeper
kafka是分布订阅式生产者消费者模型
架构:
角色解释:
1、broker:一台kafka服务器就是一个broker,里面有多个topic
2、producer:向broker发送消息的结点,生产者
3、consumer:从broker读取消息的结点,消费者
4、consumerGroup:消费者组,里面的消费者负责不同分区的数据。同一个消费者组只能读取一个分区的作用
5、topic:消息话题,用于消息分类
6、partition:分区,一个话题可以有多个分区,提高broker的负载能力与并发度
7、Replica:数据副本,可实现高可用
8、leader:话题分区时的主分区9、follwer:话题分区时的从分区
kafka机器数计算:
机器数 = 2 * (峰值生产速度 * 副本数 / 100) + 1
1、解压kafka_2.11-0.11.0.0.tgz文件
# tar -zxvf kafka_2.11-0.11.0.0.tgz
2、切换到其config目录下,修改server.properties文件
- # cd kafka_2.11-0.11.0.0/config/
- # vim server.properties
主要是使能删除话题、修改ZooKeeper地址、修改日志文件目录
- broker.id=0 # broker的id,全局唯一的整数
-
- delete.topic.enable=true # 使能话题删除
-
-
- ############################# Socket Server Settings #############################
- num.network.threads=3
-
- num.io.threads=8
-
- socket.send.buffer.bytes=102400
-
- socket.receive.buffer.bytes=102400
-
- socket.request.max.bytes=104857600
-
- ############################# Log Basics #############################
- log.dirs=/home/szc/kafka_2.11-0.11.0.0/data # 数据目录
-
- num.partitions=1
-
- num.recovery.threads.per.data.dir=1
-
- ############################# Internal Topic Settings #############################
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
-
-
- ############################# Log Flush Policy #############################
-
- ############################# Log Retention Policy #############################
- log.retention.hours=168
-
- #log.retention.bytes=1073741824
-
- log.segment.bytes=1073741824
-
- log.retention.check.interval.ms=300000
-
-
- ############################# Zookeeper #############################
- zookeeper.connect=192.168.57.141:2181 # ZooKeeper地址
-
- zookeeper.connection.timeout.ms=6000
-
- ############################# Group Coordinator Settings #############################
- group.initial.rebalance.delay.ms=0
3、切换到bin目录,以守护进程启动ZooKeeper,并指定配置文件
- # cd ../bin
- # ./kafka-server-start.sh -daemon ../config/server.properties
查看kafka进程
- [root@localhost bin]# ps -ef | grep kafka
-
- root 79436 1 99 23:58 pts/5 00:00:02 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/home/szc/kafka_2.11-0.11.0.0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/home/szc/kafka_2.11-0.11.0.0/bin/../logs -Dlog4j.configuration=file:./../config/log4j.properties -cp :/home/szc/kafka_2.11-0.11.0.0/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/argparse4j-0.7.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/commons-lang3-3.5.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/connect-api-0.11.0.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/connect-file-0.11.0.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/connect-json-0.11.0.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/connect-runtime-0.11.0.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/connect-transforms-0.11.0.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/guava-20.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/hk2-api-2.5.0-b05.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/hk2-locator-2.5.0-b05.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/hk2-utils-2.5.0-b05.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jackson-annotations-2.8.5.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jackson-core-2.8.5.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jackson-databind-2.8.5.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jackson-jaxrs-base-2.8.5.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jackson-jaxrs-json-provider-2.8.5.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jackson-module-jaxb-annotations-2.8.5.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/javassist-3.21.0-GA.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/javax.annotation-api-1.2.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/javax.inject-1.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/javax.inject-2.5.0-b05.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jersey-client-2.24.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jersey-common-2.24.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jersey-container-servlet-2.24.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jersey-container-servlet-core-2.24.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jersey-guava-2.24.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jersey-media-jaxb-2.24.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jersey-server-2.24.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jetty-http-9.2.15.v20160210.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jetty-io-9.2.15.v20160210.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jetty-security-9.2.15.v20160210.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jetty-server-9.2.15.v20160210.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jetty-servlet-9.2.15.v20160210.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jetty-servlets-9.2.15.v20160210.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jetty-util-9.2.15.v20160210.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/jopt-simple-5.0.3.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/kafka_2.11-0.11.0.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/kafka_2.11-0.11.0.0-sources.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/kafka_2.11-0.11.0.0-test-sources.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/kafka-clients-0.11.0.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/kafka-log4j-appender-0.11.0.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/kafka-streams-0.11.0.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/kafka-streams-examples-0.11.0.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/kafka-tools-0.11.0.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/log4j-1.2.17.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/lz4-1.3.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/maven-artifact-3.5.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/metrics-core-2.2.0.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/plexus-utils-3.0.24.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/reflections-0.9.11.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/rocksdbjni-5.0.1.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/scala-library-2.11.11.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/slf4j-api-1.7.25.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/slf4j-log4j12-1.7.25.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/snappy-java-1.1.2.6.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/validation-api-1.1.0.Final.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/zkclient-0.10.jar:/home/szc/kafka_2.11-0.11.0.0/bin/../libs/zookeeper-3.4.10.jar kafka.Kafka ../config/server.properties
-
- root 79516 77781 0 23:58 pts/5 00:00:00 grep --color=auto kafka
4、结束kafka进程
# ./kafka-server-stop.sh --daemon
查看kafka进程
- # ps -ef | grep kafka
- root 79546 77781 0 23:58 pts/5 00:00:00 grep --color=auto kafka
以下命令都在kafka安装目录的bin目录下执行
增加话题,需要指定连接的ZooKeeper地址、话题名、分区数、副本数,注意副本数不能大于broker数
- # ./kafka-topics.sh --create --zookeeper 192.168.57.141:2181 --topic test0 --partitions 2 --replication-factor 1
-
- Created topic "test0".
查看话题分区情况,在配置文件里指定的数据目录下查看
- [root@localhost bin]# ll ../log
-
- total 16
- ...
- drwxr-xr-x. 2 root root 141 May 1 16:36 test0-0
- drwxr-xr-x. 2 root root 141 May 1 16:36 test0-1
查看所有话题,指定ZooKeeper地址
- # ./kafka-topics.sh --list --zookeeper 192.168.57.141:2181
-
- test0
删除话题,需要指定ZooKeeper地址和话题名
- # ./kafka-topics.sh --delete --zookeeper 192.168.57.141:2181 --topic test0
-
- Topic test0 is marked for deletion.
- Note: This will have no impact if delete.topic.enable is not set to true.
由于我们已经在配置文件里使能了删除话题,所以话题已经被删除了
- # ./kafka-topics.sh --list --zookeeper 192.168.57.141:2181
- #
获取话题描述信息,要指定话题名和ZooKeeper地址
- # ./kafka-topics.sh --describe --topic test0 --zookeeper 192.168.57.141:2181
-
- Topic:test0 PartitionCount:2 ReplicationFactor:1 Configs:
- Topic: test0 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
- Topic: test0 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
首先启动生产者命令行,指定话题和broker服务器,kafka服务器端口号默认9092
- # ./kafka-console-producer.sh --topic test0 --broker-list 192.168.57.141:9092
- >
然后新打开一个tab,启动消费者,指定话题和ZooKeeper地址
# ./kafka-console-consumer.sh --topic test0 --zookeeper 192.168.57.141:2181
此时生产者发出两条消息,消费者就会按顺序收到
生产者端:
- >szc
- >anyang
- >
消费者端:
- szc
- anyang
如果此时再启动一个消费者订阅此话题,之前的消息不会收到,除非加上参数--from-beginning。不过过期的数据依旧不会收到,数据有效期可以在配置文件里设置
- # ./kafka-console-consumer.sh --topic test0 --zookeeper 192.168.57.141 --from-beginning
-
- Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
- anyang
- szc
如果不想看到消费者启动时的警告信息,把--zookeeper换成--bootstrap-server即可,地址换成broker的地址
- # ./kafka-console-consumer.sh --topic test0 --bootstrap-server 192.168.57.141:9092
- henan
换成--bootstrap-server后,kafka会在消费者本地的数据目录下创建50个数据分区目录,用来存放消费的数据
- # ll ../log
- total 20
- ...
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-0
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-1
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-10
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-11
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-12
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-13
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-14
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-15
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-16
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-17
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-18
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-19
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-2
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-20
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-21
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-22
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-23
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-24
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-25
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-26
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-27
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-28
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-29
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-3
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-30
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-31
- drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-32
- ....
1、在kafka的配置文件里,开启监听9092端口
listeners=PLAINTEXT://192.168.57.141:9092
并在防火墙上开启9092端口
- # firewall-cmd --add-port=9092/tcp --permanent
- # firewall-cmd --reload
2、windows下打开idea,在pom文件里添加依赖
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>0.11.0.0</version>
- </dependency>
3、新建producer对象,传入property对象
- Properties properties = new Properties();
-
-
- // broker集群地址
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.57.141:9092");
-
-
- // 序列化方法
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer");
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer");
-
-
- // 创建producer对象
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
4、发送消息,传入话题对象和回调函数(回调函数可以不传)。构造话题对象时传入话题和具体消息
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord<String, String>("test0",
- "msg " + i + " from windows10"), new Callback() {
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e != null) {
- e.printStackTrace();
- }
- }
- });
- producer.flush(); // 记得刷写
- }
此时,如果在远程开启了命令行消费者,会收到已经发送的数据
- # ./kafka-console-consumer.sh --bootstrap-server 192.168.57.141:9092 --topic test0
- msg 0 from windows10
- msg 1 from windows10
- msg 2 from windows10
- msg 3 from windows10
- msg 4 from windows10
- msg 5 from windows10
- msg 6 from windows10
- msg 7 from windows10
- msg 8 from windows10
- msg 9 from windows10
可以在回调里输出recordMetadata的内容
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e != null) {
- e.printStackTrace();
- }
-
-
- if (recordMetadata != null) {
- System.out.println(recordMetadata.toString());
- }
- }
某一次发送的输出如下
- test0-1@17
- test0-0@16
- test0-1@18
- test0-0@17
- test0-1@19
- test0-0@18
- test0-1@20
- test0-0@19
- test0-1@21
- test0-0@20
输出格式为:话题-分区@偏移量,所以下一次的输出为
- test0-1@22
- test0-0@21
- test0-1@23
- test0-0@22
- test0-1@24
- test0-0@23
- test0-1@25
- test0-0@24
- test0-1@26
- test0-0@25
所以kafka的分区策略为轮询,消息队列是分区内有序的
5、同步发送消息(用的比较少),获取send()方法的返回值,一个future对象,然后调用future对象的get()方法即可。
- for (int i = 0; i < 10; i++) {
- Future<RecordMetadata> fu = producer.send(new ProducerRecord<String, String>("test0",
- "msg " + i + " from windows10.."), new Callback() {
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e != null) {
- e.printStackTrace();
- }
-
-
- if (recordMetadata != null) {
- System.out.println(recordMetadata.toString());
- }
- }
- });
- try {
- RecordMetadata recordMetadata = fu.get(); // 获取发送结果
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }
6、关闭生产者
producer.close();
1、创建类MyPartitioner类,实现Partitioner接口,在partition()方法中实现自定义分区逻辑
- public class MyPartitioner implements Partitioner {
- public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
- return o.hashCode() % cluster.availablePartitionsForTopic(s).size(); // 轮询
- }
-
-
- public void close() {
-
-
- }
-
-
- public void configure(Map<String, ?> map) {
-
-
- }
- }
2、在生产者的property对象中,传入分区器的全类名
- Properties properties = new Properties();
-
- ...
-
- properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "partitioner.MyPartitioner");
-
-
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
1、创建properties对象,传入参数,并根据properties对象创建消费者对象
- Properties properties = new Properties();
-
- // broker集群地址
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.57.141:9092");
-
- // 键值反序列化方法
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
-
- // 设置偏移量自动提交和提交间隔
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
-
- // 消费者组名
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group0");
-
- // 创建消费者
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
2、订阅话题
consumer.subscribe(Collections.singletonList("test0"));
3、创建死循环,接收消息,传入最大延时时间
- boolean exit = false;
-
-
- while (!exit) {
- ConsumerRecords<String, String> records = consumer.poll(100); // 接收消息,最长空转100毫秒
-
- if (records.isEmpty()) { // 消息为空,则跳过本次循环
- continue;
- }
-
- for (ConsumerRecord<String, String> record : records) { // 遍历接收到的消息
- System.out.println(record.toString());
- }
- }
启动生产者后,消费者得到的输出如下,每一行输出的主要信息有话题、分区、分区内偏移量、消息内容等
- ConsumerRecord(topic = test0, partition = 1, offset = 32, CreateTime = 1588473193330, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 33, CreateTime = 1588473193348, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 34, CreateTime = 1588473193353, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 35, CreateTime = 1588473193357, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 36, CreateTime = 1588473193361, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 31, CreateTime = 1588473193347, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 32, CreateTime = 1588473193351, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 33, CreateTime = 1588473193355, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 34, CreateTime = 1588473193359, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 35, CreateTime = 1588473193363, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
4、读取历史消息,在属性中传入偏移重置策略为earliest,并更换新的组名
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
-
-
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
再次运行消费者,就会得到所有的历史消息
- ConsumerRecord(topic = test0, partition = 1, offset = 0, CreateTime = 1588325037161, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test0)
- ConsumerRecord(topic = test0, partition = 1, offset = 1, CreateTime = 1588325149520, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test2)
- ConsumerRecord(topic = test0, partition = 1, offset = 2, CreateTime = 1588432968113, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10)
- ConsumerRecord(topic = test0, partition = 1, offset = 3, CreateTime = 1588432968120, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10)
- ConsumerRecord(topic = test0, partition = 1, offset = 4, CreateTime = 1588432968126, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10)
- ConsumerRecord(topic = test0, partition = 1, offset = 5, CreateTime = 1588432968133, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10)
- ConsumerRecord(topic = test0, partition = 1, offset = 6, CreateTime = 1588432968139, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10)
- ConsumerRecord(topic = test0, partition = 1, offset = 7, CreateTime = 1588468271890, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 8, CreateTime = 1588468271915, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 9, CreateTime = 1588468271920, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 10, CreateTime = 1588468271925, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 11, CreateTime = 1588468271929, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 12, CreateTime = 1588468408402, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 13, CreateTime = 1588468408407, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 14, CreateTime = 1588468408410, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 15, CreateTime = 1588468408415, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 16, CreateTime = 1588468408421, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 17, CreateTime = 1588468418037, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 18, CreateTime = 1588468418056, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 19, CreateTime = 1588468418061, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 20, CreateTime = 1588468418065, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 21, CreateTime = 1588468418070, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 22, CreateTime = 1588468568065, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 23, CreateTime = 1588468568084, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 24, CreateTime = 1588468568088, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 25, CreateTime = 1588468568093, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 26, CreateTime = 1588468568097, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 27, CreateTime = 1588472294168, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 28, CreateTime = 1588472294189, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 29, CreateTime = 1588472294192, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 30, CreateTime = 1588472294196, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 31, CreateTime = 1588472294203, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 32, CreateTime = 1588473193330, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 33, CreateTime = 1588473193348, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 34, CreateTime = 1588473193353, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 35, CreateTime = 1588473193357, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
- ConsumerRecord(topic = test0, partition = 1, offset = 36, CreateTime = 1588473193361, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 0, CreateTime = 1588325131644, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test1)
- ConsumerRecord(topic = test0, partition = 0, offset = 1, CreateTime = 1588432968084, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10)
- ConsumerRecord(topic = test0, partition = 0, offset = 2, CreateTime = 1588432968116, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10)
- ConsumerRecord(topic = test0, partition = 0, offset = 3, CreateTime = 1588432968123, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10)
- ConsumerRecord(topic = test0, partition = 0, offset = 4, CreateTime = 1588432968130, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10)
- ConsumerRecord(topic = test0, partition = 0, offset = 5, CreateTime = 1588432968136, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10)
- ConsumerRecord(topic = test0, partition = 0, offset = 6, CreateTime = 1588468271864, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 7, CreateTime = 1588468271913, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 8, CreateTime = 1588468271918, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 9, CreateTime = 1588468271922, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 10, CreateTime = 1588468271927, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 11, CreateTime = 1588468408386, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 12, CreateTime = 1588468408405, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 13, CreateTime = 1588468408409, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 14, CreateTime = 1588468408413, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 15, CreateTime = 1588468408417, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 16, CreateTime = 1588468418054, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 17, CreateTime = 1588468418058, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 18, CreateTime = 1588468418063, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 19, CreateTime = 1588468418068, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 20, CreateTime = 1588468418072, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 21, CreateTime = 1588468568082, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 22, CreateTime = 1588468568086, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 23, CreateTime = 1588468568091, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 24, CreateTime = 1588468568095, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 25, CreateTime = 1588468568100, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 26, CreateTime = 1588472294186, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 27, CreateTime = 1588472294190, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 28, CreateTime = 1588472294194, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 29, CreateTime = 1588472294200, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 30, CreateTime = 1588472294205, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 31, CreateTime = 1588473193347, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 32, CreateTime = 1588473193351, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 33, CreateTime = 1588473193355, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 34, CreateTime = 1588473193359, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
- ConsumerRecord(topic = test0, partition = 0, offset = 35, CreateTime = 1588473193363, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
5、手动提交,先取消自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
然后,每读取一次数据,就手动提交一次,既可以是异步提交(传入的回调可选)
- consumer.commitAsync(new OffsetCommitCallback() {
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
- Exception exception) {
- for (TopicPartition topicPartition : offsets.keySet()) {
- System.out.println("partition:");
- System.out.println(topicPartition.toString());
- System.out.println("offset and metadata");
- System.out.println(offsets.get(topicPartition).toString());
- }
- }
- });
也可以是同步提交
consumer.commitSync();
异步提交的输出如下(省略掉了输出的具体消息)
- ...
- partition:
- test0-1
- offset and metadata
- OffsetAndMetadata{offset=43, metadata=''}
- partition:
- test0-0
- offset and metadata
- OffsetAndMetadata{offset=41, metadata=''}
- ...
- partition:
- test0-1
- offset and metadata
- OffsetAndMetadata{offset=46, metadata=''}
- partition:
- test0-0
- offset and metadata
- OffsetAndMetadata{offset=45, metadata=''}
- partition:
- test0-1
- offset and metadata
- OffsetAndMetadata{offset=47, metadata=''}
- partition:
- test0-0
- offset and metadata
- OffsetAndMetadata{offset=46, metadata=''}
如果要自定义偏移量的存储,就要在订阅的时候传入重分配监听器
- consumer.subscribe(Collections.singletonList("test0"), new ConsumerRebalanceListener() {
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- // 重分配前调用,提交偏移
-
- }
-
-
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- // 重分配后调用,对分区进行偏移量寻址
-
- }
- });
拦截器用于消息的过滤、统计等
1、创建类,实现ProducerInterceptor接口,指定键值对泛型,实现接口的四个方法
- public class TimeInterceptor implements ProducerInterceptor<String, String> {
-
-
- public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
- // 每发送一个消息,调用此方法
-
- return new ProducerRecord<String, String>(record.topic(), record.partition(),
- record.key(), System.currentTimeMillis() + ", " + record.value()); // 把时间戳拼接到消息前面
- }
-
-
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
-
-
- }
-
-
- public void close() {
-
-
- }
-
-
- public void configure(Map<String, ?> configs) {
-
-
- }
- }
再创建一个类,同样实现ProducerInterceptor接口,指定键值对泛型,实现接口的四个方法
- package interceptor;
-
-
- import org.apache.kafka.clients.producer.ProducerInterceptor;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
-
- import java.util.Map;
-
-
- public class CountInterceptor implements ProducerInterceptor<String, String> {
- private int mSuccessCount = 0, mFailCount = 0;
-
-
- public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
- return record;
- }
-
-
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
- // 收到一个ack后,调用此方法,进行统计
- if (metadata == null) {
- mFailCount++;
- } else {
- mSuccessCount++;
- }
- }
-
-
- public void close() {
- // 关闭生产者后,调用此方法
- System.out.println("Fail count:" + mFailCount);
- System.out.println("Success count:" + mSuccessCount);
- }
-
-
- public void configure(Map<String, ?> configs) {
-
-
- }
- }
2、在生产者的属性里,加入拦截器类
- properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
- Arrays.asList(TimeInterceptor.class.getCanonicalName(),
- CountInterceptor.class.getCanonicalName())); // 注意按顺序
3、启动生产者,会发现生产者会输出消息成功发送与否的数量
- Fail count:0
- Success count:10
如果启动了命令行消费者,会看到以下输出
- 1588489595360, msg 0 from windows10..
- 1588489595472, msg 1 from windows10..
- 1588489595476, msg 2 from windows10..
- 1588489595589, msg 3 from windows10..
- 1588489595612, msg 4 from windows10..
- 1588489595616, msg 5 from windows10..
- 1588489595621, msg 6 from windows10..
- 1588489595624, msg 7 from windows10..
- 1588489595630, msg 8 from windows10..
- 1588489595633, msg 9 from windows10..
这里给出两种监控工具的安装方法:eagle和kafkaManager,二者选其一即可
1、修改kafka目录下bin目录中的kafka-server-start.sh文件,把启动参数修改成下面的内容
- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
- export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
- export JMX_PORT="9999"
- fi
2、解压kafka-eagle的压缩文件(要解压两次)
- # tar -zxvf kafka-eagle-bin-1.3.7.tar.gz
- # mkdir /home/szc/kafka_eagle
- # tar -zxvf kafka-eagle-bin-1.3.7/kafka-eagle-web-1.3.7-bin.tar.gz -C /home/szc/kafka_eagle
然后/home/szc/kafka_eagle会多出一个kafka-eagle-web-1.3.7目录,这就是eagle的安装目录
- # ll kafka_eagle/
-
- total 0
- drwxr-xr-x. 8 root root 74 May 3 15:36 kafka-eagle-web-1.3.7
修改其目录下conf目录中的system-config.properties文件
- ######################################
- # multi zookeeper&kafka cluster list
- ######################################
- kafka.eagle.zk.cluster.alias=cluster1 # 可有多个集群
- cluster1.zk.list=192.168.57.141:2181 # ZooKeeper地址
-
-
- ######################################
- # zk client thread limit
- ######################################
- kafka.zk.limit.size=25
-
-
- ######################################
- # kafka eagle webui port
- ######################################
- kafka.eagle.webui.port=8048 # eagle的webui端口
-
-
- ######################################
- # kafka offset storage
- ######################################
- cluster1.kafka.eagle.offset.storage=kafka # kafka0.10以上最好改成kafka
-
-
- ######################################
- # enable kafka metrics
- ######################################
- kafka.eagle.metrics.charts=true # 使能图形显示
- kafka.eagle.sql.fix.error=false
-
-
- ######################################
- # kafka sql topic records max
- ######################################
- kafka.eagle.sql.topic.records.max=5000
-
-
- ######################################
- # alarm email configure 邮件相关,直接忽略
- ######################################
- kafka.eagle.mail.enable=false
- kafka.eagle.mail.sa=alert_sa@163.com
- kafka.eagle.mail.username=alert_sa@163.com
- kafka.eagle.mail.password=mqslimczkdqabbbh
- kafka.eagle.mail.server.host=smtp.163.com
- kafka.eagle.mail.server.port=25
-
-
- ######################################
- # alarm im configure 钉钉、微信,忽略
- ######################################
- #kafka.eagle.im.dingding.enable=true
- #kafka.eagle.im.dingding.url=https://oapi.dingtalk.com/robot/send?access_token=
-
-
- #kafka.eagle.im.wechat.enable=true
- #kafka.eagle.im.wechat.token=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=xxx&corpsecret=xxx
- #kafka.eagle.im.wechat.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=
- #kafka.eagle.im.wechat.touser=
- #kafka.eagle.im.wechat.toparty=
- #kafka.eagle.im.wechat.totag=
- #kafka.eagle.im.wechat.agentid=
-
-
- ######################################
- # delete kafka topic token
- ######################################
- kafka.eagle.topic.token=keadmin
-
-
- ######################################
- # kafka sasl authenticate
- ######################################
- cluster1.kafka.eagle.sasl.enable=false
- cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
- cluster1.kafka.eagle.sasl.mechanism=PLAIN
- cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="kafka-eagle";
-
-
- ######################################
- # kafka jdbc driver address jdbc地址,修改成自己的,数据库不存在会自行创建
- ######################################
- kafka.eagle.driver=com.mysql.jdbc.Driver
- kafka.eagle.url=jdbc:mysql://192.168.0.102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
- kafka.eagle.username=root
- kafka.eagle.password=root
3、在eagle目录下的bin目录中,修改ke.sh,添加一行创建目录的命令mkdir -p ${KE_HOME}/kms/webapps/ke/WEB-INF/classes/
- ....
- rm -rf ${KE_HOME}/kms/webapps/ke/WEB-INF/classes/*.properties
- mkdir -p ${KE_HOME}/kms/webapps/ke/WEB-INF/classes/
- cp ${KE_HOME}/conf/*.properties ${KE_HOME}/kms/webapps/ke/WEB-INF/classes/
- ...
4、修改/etc/profile文件,添加环境变量KE_HOME
- export KE_HOME=/home/szc/kafka_eagle/kafka-eagle-web-1.3.7
- export PATH=$PATH:$KE_PATH/bin
运行命令,使能/etc/profile,开启8048端口
- # source /etc/profile
- # firewall-cmd --add-port=8048/tcp --permanent
- # firewall-cmd --reload
确认一下jdk是完整安装的,尤其是jar命令可用
- # jar
-
- Usage: jar {ctxui}[vfmn0PMe] [jar-file] [manifest-file] [entry-point] [-C dir] files ...
- Options:
- -c create new archive
- -t list table of contents for archive
- -x extract named (or all) files from archive
- -u update existing archive
- -v generate verbose output on standard output
- -f specify archive file name
- -m include manifest information from specified manifest file
- -n perform Pack200 normalization after creating a new archive
- -e specify application entry point for stand-alone application
- bundled into an executable jar file
- -0 store only; use no ZIP compression
- -P preserve leading '/' (absolute path) and ".." (parent directory) components from file names
- -M do not create a manifest file for the entries
- -i generate index information for the specified jar files
- -C change to the specified directory and include the following file
- If any file is a directory then it is processed recursively.
- The manifest file name, the archive file name and the entry point name are
- specified in the same order as the 'm', 'f' and 'e' flags.
-
-
- Example 1: to archive two class files into an archive called classes.jar:
- jar cvf classes.jar Foo.class Bar.class
- Example 2: use an existing manifest file 'mymanifest' and archive all the
- files in the foo/ directory into 'classes.jar':
- jar cvfm classes.jar mymanifest -C foo/ .
5、启动eagle
- # kafka_eagle/kafka-eagle-web-1.3.7/bin/ke.sh start
-
- ....
- *******************************************************************
- * Kafka Eagle system monitor port successful...
- *******************************************************************
- [2020-05-03 16:54:25] INFO: Status Code[0]
- [2020-05-03 16:54:25] INFO: [Job done!]
- Welcome to
- __ __ ___ ____ __ __ ___ ______ ___ ______ __ ______
- / //_/ / | / __/ / //_/ / | / ____/ / | / ____/ / / / ____/
- / ,< / /| | / /_ / ,< / /| | / __/ / /| | / / __ / / / __/
- / /| | / ___ | / __/ / /| | / ___ | / /___ / ___ |/ /_/ / / /___ / /___
- /_/ |_| /_/ |_|/_/ /_/ |_| /_/ |_| /_____/ /_/ |_|\____/ /_____//_____/
-
-
-
- Version 1.3.7
- *******************************************************************
- * Kafka Eagle Service has started success.
- * Welcome, Now you can visit 'http://127.0.0.1:8048/ke'
- * Account:admin ,Password:123456
- *******************************************************************
- * <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
- * <Usage> https://www.kafka-eagle.org/ </Usage>
- *******************************************************************
注意里面给出的账户和密码,然后在windows浏览器里输入192.168.57.141(虚拟机ip):8048/ke,再输入用户名密码,就能看到如下界面
1、解压zip包
[root@localhost szc]# unzip kafka-manager-1.3.3.22.zip
2、进入kafka-manager解压目录,修改conf/application.conf文件
- [root@localhost szc]# cd kafka-manager-1.3.3.22/
- [root@localhost kafka-manager-1.3.3.22]# vim conf/application.conf
修改zookeeper的地址
kafka-manager.zkhosts="192.168.57.141:2181"
3、启动kafka-manager,指定端口号
- [root@localhost kafka-manager-1.3.3.22]# bin/kafka-manager -Dhttp.port=7456 > start.log 2>&1 &
- [1] 38560
> start.log表示输出日志追加到start.log中,2>&1表示错误信息写到控制台,由于前面的文件重定向,这里也会把错误信息追加到日志文件中,最后一个&表示后台运行此进程
输出的38560是kafka-manager的进程号
4、开启端口号7456
- [root@localhost kafka-manager-1.3.3.22]# firewall-cmd --add-port=7456/tcp --permanent
- success
- [root@localhost kafka-manager-1.3.3.22]# firewall-cmd --reload
- success
5、在windows浏览器里访问192.168.57.141:7456,点击上面的Cluster-> Add Cluster
进入以下界面后,给集群起名字,输入zookeeper的ip和端口,指定kafka的版本
点击最下面的保存
然后回到首页,就可以看到新添加的集群了
6、关闭kafka-manager,关闭pid指定的进程即可
[root@localhost kafka-manager-1.3.3.22]# kill 38560
1、生产者压力测试,使用bin/kafka-producer-perf-test.sh脚本,给定话题、每条记录大小、总记录数、每秒写入记录数和服务器属性
- [root@localhost kafka_2.11-0.11.0.0]# bin/kafka-producer-perf-test.sh --topic test0 --record-size 100 --num-records 100000 --throughput 1000 --producer-props bootstrap.servers=192.168.57.141:9092
- 5002 records sent, 1000.0 records/sec (0.10 MB/sec), 2.6 ms avg latency, 157.0 max latency.
- 5005 records sent, 1001.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 7.0 max latency.
- 5002 records sent, 1000.4 records/sec (0.10 MB/sec), 0.6 ms avg latency, 4.0 max latency.
- 5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.6 ms avg latency, 8.0 max latency.
- 5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.5 ms avg latency, 4.0 max latency.
- 5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.5 ms avg latency, 9.0 max latency.
- 5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.5 ms avg latency, 5.0 max latency.
- 5004 records sent, 1000.4 records/sec (0.10 MB/sec), 1.0 ms avg latency, 68.0 max latency.
- ....
- 100000 records sent, 999.970001 records/sec (0.10 MB/sec), 0.68 ms avg latency, 157.00 ms max latency, 1 ms 50th, 1 ms 95th, 3 ms 99th, 38 ms 99.9th.
最后一条是总结,总共发了100000记录,每秒 999.970001条,吞吐量0.1MB/s,平均延迟0.68ms,最大延迟157ms
2、消费者压力测试,使用bin/kafka-consumer-perf-test.sh,给定ZooKeeper地址、消费话题、每次抓取记录数、总记录数、使用的线程数
- [root@localhost kafka_2.11-0.11.0.0]# bin/kafka-consumer-perf-test.sh --zookeeper 192.168.57.141:2181 --topic test0 --fetch-size 1000 --messages 10000000 --threads 1
-
- start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
- 2020-05-25 19:53:25:047, 2020-05-25 19:53:29:393, 9.5367, 2.1944, 100000, 23009.6641
输出信息主要有起止时间、最大吞吐率(9.5367MB)、每秒消费的数据量(2.1944MB)、最大每秒消费记录数(100000)、平均每秒消费记录数(23009.6641)
1、安装解压flume
下载安装包,传输到centos中,解压,新建logs目录,存放日志
# mkdir logs
2、到flume解压目录下,新建job目录,在里面建立作业的配置文件
- # mkdir job
- # vim kafka_flume.conf
配置如下
- a1.sources = r1
- a1.channels = c1
- a1.sinks = k1
-
- # 输入类型为netcat,来自localhost:6666
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 6666
-
- # 内存输入
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- # kafka输出
- a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.k1.kafka.topic = test0 # 话题
- a1.sinks.k1.kafka.bootstrap.servers = 192.168.57.141:9092 # kafka服务器地址
- a1.sinks.k1.kafka.flumeBatchSize = 20
- a1.sinks.k1.kafka.producer.acks = 1
- a1.sinks.k1.kafka.producer.linger.ms = 1
-
- # 绑定
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
3、安装netcat
- # cd /usr/local
- # wget https://ncu.dl.sourceforge.net/project/netcat/netcat/0.7.1/netcat-0.7.1.tar.gz
- # tar -zxvf netcat-0.7.1.tar.gz
- # mv netcat-0.7.1 netcat
- # cd netcat/
- # ./configure
- # make
- # make install
删除原来的软链接
# rm -f /usr/bin/nc
测试netcat
- $ nc -nv -w 1 -z 192.168.57.141 1-500
-
- 192.168.57.141 22 (ssh) open
- 192.168.57.141 111 (sunrpc) open
4、按顺序启动kafka-console-consumer、flume、netcat
# /home/szc/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.57.141:9092 --topic test0
flume:
# ./bin/flume-ng agent -c conf/ -f job/kafka_flume.conf -n a1
netcat监听并在localhost:6666发送数据
- # nc localhost 6666
- sads
- OK
- szc
- OK
每发一条,kafka就会消费一条
- # /home/szc/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.57.141:9092 --topic test0
- sads
- szc
1、在idea中编写flume的拦截器类,首先在pom中引入依赖
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>1.8.0</version>
- </dependency>
然后编写FlumeInterceptor类,实现Interceptor接口
- public class FlumeInterceptor implements Interceptor {
- private ArrayList<Event> mHeaderEvents = new ArrayList<Event>();
-
-
- public void initialize() {
-
- }
-
-
- public Event intercept(Event event) { // 主要的拦截逻辑
- String body = new String(event.getBody());
- Map<String, String> headers = event.getHeaders();
-
-
- String topic = body.contains("szc") ? "test0" : "test1";
- headers.put("topic", topic); // 根据消息内容,在头信息中添加话题
- return event;
- }
-
-
- public List<Event> intercept(List<Event> list) {
- mHeaderEvents.clear();
-
-
- for (Event event : list) {
- mHeaderEvents.add(intercept(event));
- }
- return mHeaderEvents;
- }
-
-
- public void close() {
-
-
- }
-
-
- public static class Builder implements Interceptor.Builder {
- // 构造器,配置文件中要传入此类的全类名
-
- public Interceptor build() {
- return new FlumeInterceptor();
- }
-
-
- public void configure(Context context) {
-
-
- }
- }
- }
在右侧的maven工具栏中,点击项目名->Lifecycle->package打包
完成后,在项目根目录下找到打好的jar包,传入flume安装目录中的lib文件夹下
2、编写作业配置文件,加入拦截器设置
- a1.sources = r1
-
- ....
-
- # 拦截器设置
- a1.sources.r1.interceptors = i1
- a1.sources.r1.interceptors.i1.type = interceptor.FlumeInterceptor$Builder
-
- ....
3、先启动flume
# ./bin/flume-ng agent -c conf/ -f job/kafka_flume.conf -n a1
再用nc监听并在6666端口发消息
- # nc localhost 6666
- szc
- OK
- szcc
- OK
- 31244h
- OK
此时,启动订阅test0和test1的两个消费者,都会收到对应的消息
- # /home/szc/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.57.141:9092 --topic test0
- szc
- szcc
-
- # /home/szc/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.57.141:9092 --topic test1
- 31244h
以上就是kafka在CentOS7下使用的全部内容,有问题欢迎在评论区讨论
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。