赞
踩
前面我们使用基于console的生产者和消费者对topic实现了数据的生产和消费,,这个基于控制台的生产者和消费者主要是让我们做测试用的。
在实际工作中,我们有时候需要将生产者和消费者功能集成到我们已有的系统中,此时就需要写代码实现生产者和消费者的逻辑了。
在这我们使用java代码来实现生产者和消费者的功能。
先创建maven项目,db_kafka
添加kafka的maven依赖。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
package com.imooc.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
/**
* 需求:Java代码实现生产者代码
*/
public class ProducerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
//指定key-value数据的序列化格式
prop.put("key.serializer", StringSerializer.class.getName());
prop.put("value.serializer", StringSerializer.class.getName());
//指定topic
String topic = "hello";
//创建kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
//向topic中生产数据
producer.send(new ProducerRecord<String, String>(topic, "hello kafka"));
//关闭链接
producer.close();
}
}
等一会我们把消费者代码实现好了以后一起验证。
package com.imooc.kafka;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
* 需求:Java代码实现消费者代码
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
//指定key-value的反序列化类型
prop.put("key.deserializer", StringDeserializer.class.getName());
prop.put("value.deserializer", StringDeserializer.class.getName());
//指定消费者组
prop.put("group.id", "con-1");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);
Collection<String> topics = new ArrayList<String>();
topics.add("hello");
//订阅指定的topic
consumer.subscribe(topics);
while(true) {
//消费数据【注意:需要修改jdk编译级别为1.8,否则Duration.ofSeconds(1)会语法报错】
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> consumerRecord : poll) {
System.out.println(consumerRecord);
}
}
}
}
1、关闭kafka服务器的防火墙
2、配置windows的hosts文件 添加kafka节点的hostname和ip的映射关系。
[如果我们的hosts文件中没有对kafka节点的 hostnam和ip的映射关系做配置,在这经过多次尝试连接不上就会报错]
发现没有消费到数据,这个topic中是有数据的,为什么之前的数据没有消费出来呢?不要着急,先带着这个问题往下面看
此时回到kafka的消费者端就可以看到消费出来的数据了。
ConsumerRecord(topic = hello, partition = 3, leaderEpoch = 3, offset = 0, CreateTime = 1591687499753, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
所以这个时候我们发现,新产生的数据我们是可以消费到的,但是之前的数据我们就无法消费了,那下面我们来分析一下这个问题。
//==================================================
//开启自动提交offset功能,默认就是开启的
prop.put("enable.auto.commit","true");
//自动提交offset的时间间隔,单位是毫秒
prop.put("auto.commit.interval.ms","5000");
/*
注意:正常情况下,kafka消费数据的流程是这样的
先根据group.id指定的消费者组到kafka中查找之前保存的offset信息
如果查找到了,说明之前使用这个消费者组消费过数据,则根据之前保存的offset继续进行消费
如果没查找到(说明第一次消费),或者查找到了,但是查找到的那个offset对应的数据已经不存在了
这个时候消费者该如何消费数据?
(因为kafka默认只会保存7天的数据,超过时间数据会被删除)
此时会根据auto.offset.reset的值执行不同的消费逻辑
这个参数的值有三种:[earliest,latest,none]
earliest:表示从最早的数据开始消费(从头消费)
latest【默认】:表示从最新的数据开始消费
none:如果根据指定的group.id没有找到之前消费的offset信息,就会抛异常
解释:【查找到了,但是查找到的那个offset对应的数据已经不存在了】
假设你第一天使用一个消费者去消费了一条数据,然后就把消费者停掉了,
等了7天之后,你又使用这个消费者去消费数据
这个时候,这个消费者启动的时候会到kafka里面查询它之前保存的offset信息
但是那个offset对应的数据已经被删了,所以此时再根据这个offset去消费是消费不到数据的
总结,一般在实时计算的场景下,这个参数的值建议设置为latest,消费最新的数据
这个参数只有在消费者第一次消费数据,或者之前保存的offset信息已过期的情况下才会生效
*/
prop.put("auto.offset.reset","latest");
//==================================================
此时我们来验证一下,
先启动一次生产者
再启动一次消费者,看看消费者能不能消费到这条数据,如果能消费到,就说明此时是根据上次保存的offset信息进行消费了。
结果发现是可以消费到的。
注意:消费者消费到数据之后,不要立刻关闭程序,要至少等5秒,因为自动提交offset的时机是5秒提交一次。
ConsumerRecord(topic = hello, partition = 4, leaderEpoch = 5, offset = 0, CreateTime = 1591687894952, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
将auto.offset.reset置为earliest,修改一下group.id的值,相当于使用一个新的消费者,验证一下,看是否能把这个topic中的所有数据都取出来,因为新的消费者第一次肯定是获取不到offset信息的,所以就会根据auto.offset.reset的值来消费数据。
prop.put("group.id", "con-2");
prop.put("auto.offset.reset","earliest");
结果发现确实把之前的所有数据都消费过来了.
ConsumerRecord(topic = hello, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1591672800863, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hehe)
ConsumerRecord(topic = hello, partition = 3, leaderEpoch = 3, offset = 0, CreateTime = 1591687499753, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
ConsumerRecord(topic = hello, partition = 4, leaderEpoch = 5, offset = 0, CreateTime = 1591687864482, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
此时,关闭消费者(需要等待5秒,这样才会提交offset),再重新启动,发现没有消费到数据,说明此时就根据上次保存的offset来消费数据了,因为没有新数据产生,所以就消费不到了。
最后来处理一下程序输出的日志警告信息,这里其实示因为缺少依赖日志依赖
在pom文件中添加log4j的依赖,然后将log4j.properties添加到resources目录中。
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
</dependency>
kafka0.9版本以前,消费者的offset信息保存在zookeeper中。
从kafka0.9开始,使用了新的消费API,消费者的信息会保存在kafka里面的__consumer_offsets这个topic中。
因为频繁操作zookeeper性能不高,所以kafka在自己的topic中负责维护消费者的offset信息。
如何查询保存在kafka中的Consumer的offset信息呢?
使用kafka-consumer-groups.sh这个脚本可以查看
查看目前所有的consumer group
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
con-1
con-2
具体查看某一个consumer group的信息。
GROUP:当前消费者组,通过group.id指定的值
TOPIC:当前消费的topic
PARTITION:消费的分区
CURRENT-OFFSET:消费者消费到这个分区的offset
LOG-END-OFFSET:当前分区中数据的最大offset
LAG:当前分区未消费数据量
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group con-1
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
con-1 hello 4 1 1 0 - - -
con-1 hello 2 1 1 0 - - -
con-1 hello 3 1 1 0 - - -
con-1 hello 0 0 0 0 - - -
con-1 hello 1 0 0 0 - - -
此时再执行一次生产者代码,生产一条数据,重新查看一下这个消费者的offset情况。
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group con-1
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
con-1 hello 4 1 2 1 - - -
con-1 hello 2 1 1 0 - - -
con-1 hello 3 1 1 0 - - -
con-1 hello 0 0 0 0 - - -
con-1 hello 1 0 0 0 - - -
当一个消费者消费一个partition时候,消费的数据顺序和此partition数据的生产顺序是一致的。
当一个消费者消费多个partition时候,消费者按照partition的顺序,首先消费一个partition,当消费完一个partition最新的数据后再消费其它partition中的数据。
总之:如果一个消费者消费多个partiton,只能保证消费的数据顺序在一个partition内是有序的
也就是说消费kafka中的数据只能保证消费partition内的数据是有序的,多个partition之间是无序的。
kafka可以实现以下三种语义,这三种语义是针对消费者而言的:
至少一次:at-least-once
这种语义有可能会对数据重复处理
实现至少一次消费语义的消费者也很简单。
1: 设置enable.auto.commit为false,禁用自动提交offset
2: 消息处理完之后手动调用consumer.commitSync()提交offset
这种方式是在消费数据之后,手动调用函数consumer.commitSync()异步提交offset,
有可能处理多次的场景是消费者的消息处理完并输出到结果库,但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息,所以至少会处理一次。
至多一次:at-most-once
这种语义有可能会丢失数据
至多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是
1: enable.auto.commit设置为true。
2: auto.commit.interval.ms设置为一个较低的时间范围。
由于上面的配置,此时kafka会有一个独立的线程负责按照指定间隔提交offset。
消费者的offset已经提交,但是消息还在处理中(还没有处理完),这个时候程序挂了,导致数据没有被成功处理,再重启的时候会从上次提交的offset处消费,导致上次没有被成功处理的消息就丢失了。
仅一次:exactly-once
这种语义可以保证数据只被消费处理一次。
实现仅一次语义的思路如下:
1: 将enable.auto.commit设置为false,禁用自动提交offset
2: 使用consumer.seek(topicPartition,offset)来指定offset
3: 在处理消息的时候,要同时保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果,这个时候相当于自己保存offset信息了,把offset和具体的数据绑定到一块,数据真正处理成功的时候才会保存offset信息。
这样就可以保证数据仅被处理一次了。
写到这里也结束了,在文章最后放上一个小小的福利,以下为小编自己在学习过程中整理出的一个关于 java开发 的学习思路及方向。从事互联网开发,最主要的是要学好技术,而学习技术是一条慢长而艰苦的道路,不能靠一时激情,也不是熬几天几夜就能学好的,必须养成平时努力学习的习惯,更加需要准确的学习方向达到有效的学习效果。
由于内容较多就只放上一个大概的大纲,需要更及详细的学习思维导图的 点击我的Gitee获取。
还有 高级java全套视频教程 java进阶架构师 视频+资料+代码+面试题!
全方面的java进阶实践技术资料,并且还有技术大牛一起讨论交流解决问题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。