赞
踩
创建Topic->向Topic发送消息->接收Topic消息,所有消息命令都在kafka文件的bin目录下
1.创建Topic
和Topic有关的指令都是用bin目录下的kafka-Topics.sh
bin/kafka-topics.sh --create --bootstrap-server 192.168.108.129:9092 --replication-factor 2 --partitions 2 --topic hello.kafka
2.展示Topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.108.129:9092
3.删除Topic
bin/kafka-topics.sh --delete --bootstrap-server 192.168.108.129:9092 --topic hello.kafka
4.Topic指令参数说明
参数 | 说明 |
---|---|
create | 创建Topic |
delete | 删除Topic |
alter | 修改Topic,修改副本数和分区数 |
list | 列出可用Topic |
topic | 指定Topic名称 |
bootstrap-server | 指定连接的Kafka服务器,默认端口9092,该参数必须 |
replication-factor | 指定每个分区副本数 |
partitions | 指定分区数 |
zookeeper | 指定zookeeper,该参数已经过时,当指定该参数时,bootstrap-server不是必须的 |
消息生产者是通过bin目录下kafka-console-producer.sh指令
bin/kafka-console-producer.sh --broker-list 192.168.108.129:9092 --topic hello.kafka
>hello world #发送消息hello world
>this is the first message #发送消息this is the first message
>
指令向hello.kafka发送两条消息hello world和 this is the first message。参数说明
参数 | 说明 |
---|---|
broker-list | 指定broker集群,多个broker用逗号分隔 |
topic | 指定发送消息的Topic |
消息消费者指令通过bin目录下kafka-console-consumer.sh实现
bin/kafka-console-consumer.sh --bootstrap-server 192.168.108.129:9092 --topic hello.kafka --from-beginning
this is the first message #读取hello.kafka中的数据
hello world
通过该指令读取到了之前发送给Kafka的两条消息。参数说明
参数 | 说明 |
---|---|
bootstrap-server | 指定连接的服务器 |
topic | 读取消息的Topic |
from-beginning | 从头开始读取消息,即读取所有消息。若不配参数则只读取实时消息 |
Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.108.128:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
创建生产者首先创建Properties对象进行属性设置,在创建生产者对象时bootstrap.servers、key.serializer、value.serializer这三个属性是必须要设置的。
bootstrap.servers
指定broker地址清单,由host:port组成,多个broker用逗号隔开。列表中不需要包含所有的broker。
key.serializer
broker接收的消息是键和值的字节数组。所以需要为键和值设置一个序列化器,key.serializer设置的是键的序列化器。Kafka自带的序列化器有ByteArraySerializer、DoubleSerializer、FloatSerializer、StringSerializer等,在org.apache.kafka.common.serialization包中。
value.serializer
同样的向broker发送的消息(也就是值)同样需要设置序列化器将内容序列化为字节数组。可以使用与key.serializer一样的序列化器。
消息发送三种方式:发送并忘记(fire-and-forget)、同步发送、异步发送
1.发送并忘记
发送并忘记模式将消息发送给服务器,不关心消息是否正常到达。
ProducerRecord<String, String> record = new ProducerRecord<>("hello.kafka", "hello", "this is the first message-fire and forget");
producer.send(record);
发送消息首先创建ProducerRecord对象,ProducerRecord是个泛型类,泛型对应了发送消息的键和值类型,在代码中发送的消息都是字符串类型,所以泛型选用String,键和值序列化器使用StringSerializer。ProducerRecord有多个构造函数,代码中构造函数参数对应Topic名、键、值。
调用KafkaProducer的send()方法,将消息发送到broker。send方法返回一个Future对象,由于我们不关心是否发送成功,因此调用send方法后便不再做操作。
2.同步发送
同步发送在调用send()方法发送消息后,再调用get()方法等待,发送成功便会返回RecordMetadata元数据对象,对象中包含发送消息的元数据信息。
ProducerRecord<String, String> record = new ProducerRecord<>("hello.kafka", "hello", "this is the first message-sync");
RecordMetadata recordMetadata = producer.send(record).get();
log.info("写入Topic: " + recordMetadata.topic() + " 偏移量: " + recordMetadata.offset() + " " + " 分区: " + recordMetadata.partition());
控制台显示消息
10:19:59.852 [main] INFO com.sk.demo1.KafkaTest - 写入Topic: hello.kafka 偏移量: 7 分区: 1
3.异步发送
ProducerRecord<String, String> record
= new ProducerRecord<>("hello.kafka", "hello", "this is the first message-async");
producer.send(record, (metadata, exception) -> {
if (Objects.nonNull(exception)){
exception.printStackTrace();
}
});
在异步发送消息时,为了使消息发送失败对异常正常处理,生产者使用了回调,在send()方法中重写org.apache.kafka.clients.producer.Callback 的 onCompletion() 方法来对异常进行处理。
在创建消息时可以设置消息的键,设置的键有两个作用:
相同键的消息被写入到同一个分区,也就是说从一个分区读消息时,相同键的消息将会被全部读出,这样可以在读写消息时进行过滤操作。
如果不设置键,并使用默认分区器,分区器将会使用轮询的方式将消息写如各个分区中。
如果设置了键,并使用默认分区器,Kafka会对键进行散列,再根据散列值将消息映射到对应分区,确保相同键在相同分区当中,注:在进行映射时会使用所有分区,而不是可用分区,当分区不可用时,映射会发生错误。
这样使用键和分区的机制保证了他们对应关系的不变,但是主题增加了新的分区,这样的关系将会被打乱,无法再保证映射关系。因此创建主题时就需要把分区规划好。
Kafka消费者从属于消费者组,一个消费者组由多个消费者组成,一个消费者组里的消费者订阅同一个主题,每个消费者接收主题的一部分分区消息.消费者组里的消费者会将主题的每个分区消息都读取到,但是消费者数超过主题分区数时,就会有一部分消费者被闲置,不接受任何消息.因此消费者数不能超过主题分区数.
当消费者组里的一个消费者宕机,它将离开群组,原本它读取的分区所有权交给另一个消费者,这样的行为称为再均衡.再均衡为消费者组带来了高可用性和伸缩性,但是也有缺点:
消费者通过向被指派为群组协调器的broker发送心跳来维持他们和群组的从属关系以及各自分区所有权.
创建消费者对象的过程与创建生产者类似
Properties prop = new Properties();
prop.put("bootstrap.servers","192.168.108.128:9092,192.168.108.129:9092");
prop.put("group.id", "hello");
prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Arrays.asList("hello.kafka"));
首先设置bootstrap.servers,指定Kafka集群。与创建生产者相对应的,设置键和值的反序列化,将消息的字节数组转成Java对象。group.id属性不是必须的,他指定当前消费者属于哪个消费者组。最后通过消费者对象consumer订阅主题。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);//拉取消息,设置阻塞时间
for (ConsumerRecord<String, String> record : records) {
String topic = record.topic();
int partition = record.partition();
String key = record.key();
String value = record.value();
long offset = record.offset();
log.info("topic: " + topic + " partition: " + partition + " offset: " + offset + " key: " + key + " value: " + value);
}
}
} catch (Exception e) {
} finally {
consumer.close();
}
当订阅了主题之后,消费者通过poll方法对Kafka进行轮训,poll()方法的参数是超时时间,控制poll()的阻塞时间,poll()方法返回ConsumerRecords对象,该对象记录了返回的消息,是一个记录列表,poll()方法读取的是生产者写入Kafka但是还没有被消费者读取的记录。运行上面代码后控制台打印日志。
10:47:41.985 [main] INFO com.sk.demo1.KafkaTest - topic: hello.kafka partition: 1 offset: 12 key: hello value: this is the first message-sync
10:47:41.993 [main] INFO com.sk.demo1.KafkaTest - topic: hello.kafka partition: 1 offset: 13 key: hello-hello value: this is the first message-async
成功读取到生产者发送的消息。
在Kafka中消费者可以追踪消息所在分区的位置,并且在处理消息后会更新分区当前位置,这样的操作叫做提交。
自动提交
enable.auto.commit设为true之后,消费者会将poll()到的信息的最大偏移量每隔5s自动提交一次,提交间隔由auto.commit.interval.ms设置,默认5s。虽然自动提交的方式最简单,但是如果3s内的消息发送给消费者已经消费,如果此时发生再均衡,重组后的消费者重新读取到的最大偏移量为3s前的偏移量,这3s内的消息将再一次被消费者处理,造成数据重复处理。
虽然可以将间隔设置小一点,但是这样始终不能解决极端情况。
手动提交
同步提交
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);//拉取消息,设置阻塞时间
for (ConsumerRecord<String, String> record : records) {
log.info("topic: " + record.topic() + " partition: " + record.partition() + " offset: " + record.offset() + " key: " + record.key() + " value: " + record.value());
try {
consumer.commitSync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
使用consumer.commitSync()来提交由poll()返回的最新偏移量,在broker对提交请求做出相应之前,应用会一直被阻塞,如果需要提高吞吐量,可以选用异步提交的方式。
异步提交
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);//拉取消息,设置阻塞时间
for (ConsumerRecord<String, String> record : records) {
log.info("topic: " + record.topic() + " partition: " + record.partition() + " offset: " + record.offset() + " key: " + record.key() + " value: " + record.value());
try {
consumer.commitAsync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
broker不可用
控制台报错
WARN [AdminClient clientId=adminclient-1] Connection to node -1 (/192.168.108.129:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
解决方案
错误原因是重启服务器或者关闭里Kafka运行界面,导致Kafka未启动,重启Kafka服务即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。