赞
踩
1. 在使用kafka之前,你需要弄清楚的一些概念
名词个人理解topic对发送到kafka中的message分类的一个标签,message1属于callLog类,message2属于runningLong类,callLog跟runningLog就是两个topic
partition消息存储分区的概念,同一个topic下的数据会分布在不同partition中,每个partition中使用segment文件存储数据,每个partition中的数据是有序的,但是不同的partiton之间数据是无序的,若想保证消费消息的顺序,partition的数量设置为1
offsetOffset记录着下一条将要发送给Consumer的消息的序号,当前partiton中存了3条消息,消息1和2 已经被消费过了,所以offset就是2
borker可以简单理解为kafka集群,一个broker就是单节点kafka,消息都是先发送到broker中的partiton存储,再被消费者消费
所以针对上面的表格,一张关系图说明下
一个topic中多条message分别均匀存储在broker不同的partition中,同时该topic下的消息可以被多个消费者组中的消费者同时消费
下面,使用kafka原生的API进行消息的发送和消费,windows上如何安装kafka本文就不涉及了,可以自行百度。
2. 命令行启动kafka
kafka依赖zookeeper,现在下载的kafka中自带zookeeper,先启动zookeeper,再启动kafka-server。 windows下kafka相关的bat命令都在bin\windows里面
启动zookeeper命令 zookeeper启动默认端口 2181
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
启动kafka-server命令 默认端口 9092
.\bin\windows\kafka-server-start.bat .\config\server.properties
还需要创建一个topic,方便后面编码时使用该topic,创建topic时可以指定partition个数
创建topic的命令
.\bin\windows\kafka-topics.bat --create --zookeeper zookeeper的地址 --replication-factor 1 --partitions partitions的个数 --topic topic的名字
创建成功如下图
准备工作都做好了,下面就是编码实现消息的发送和消费了。
3. idea使用原生API发送和消费消息
创建一个springboot项目,结构目录如下
kafka发送消息有三种方式
同步发送消息,不关注返回结果(发出去后就不管了)
同步发送消息,关注返回结果
异步发送消息,由回调函数判断是否发送成功
下面我们就来一个个分析下到底怎么使用kafka
第一种方式:同步发送消息,不关注返回结果(发出去后就不管了)
MyProducer类
package com.study.kafkastudy.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Component
public class MyProducer {
private static KafkaProducer producer;
static{
//设置kafka启动需要的参数
Properties properties = new Properties();
//(kafka集群地址)broker地址
properties.put("bootstrap.servers","localhost:9092");
//key的序列化方式
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
//value的序列化方式
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(properties);
}
//发送消息不关注返回结果
public static void sendMessage(){
//ProducerRecord的三个参数,topic,发送的key,发送的value
ProducerRecord record =
new ProducerRecord<>("user-info-topic","name","路飞");
producer.send(record);
//关闭producer
producer.close();
}
public static void main(String[] args) {
sendMessage();
}
}
复制代码
MyConsumer类
MyConsumer类我写了一个while死循环,一直占用线程监听user-info-topic这个topic是否有新消息
package com.study.kafkastudy.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MyConsumer {
private static KafkaConsumer consumer;
private static Properties properties;
static {
//设置kafka启动需要的参数
properties = new Properties();
//(kafka集群地址)broker地址
properties.put("bootstrap.servers","localhost:9092");
//key的反序列化方式
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//value的反序列化方式
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//消费组的id,属于哪个消费组
properties.put("group.id","consumerGroup1");
}
public static void getMessage(){
//是否自动提交
properties.put("enable.auto.commit",true);
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("user-info-topic"));
while (true){
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.println(String.format("消费到的结果key:%s,value:%s,partition:%s",
record.key(),
record.value(),
record.partition()));
}
}
}
public static void main(String[] args) {
getMessage();
}
}
复制代码
先依次启动zookeeper,kafka-server,myconsumer,最后启动myproduce发送消息。我们发送“路飞”,观察消费者能否收到消息
从图中可以看出已经可以消费到消息了
第二种方式:同步发送消息,关注返回结果
我们修改一下Myproducer类,在其中增加一个sendMessageWithCareResult方法,在main方法中调用sendMessageWithCareResult方法
public static void sendMessageWithCareResult() throws ExecutionException, InterruptedException {
//ProducerRecord的三个参数,topic,发送的key,发送的value
ProducerRecord record =
new ProducerRecord<>("user-info-topic","name","路飞");
//send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata
Future sendResult = producer.send(record);
RecordMetadata recordMetadata = sendResult.get();
//打印下发送消息的topic,partition,offset
System.out.println(String.format("发送结果:topic:%s,存储的partition:%s,offset:%s",
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset()));
producer.close();
}
复制代码
发送结果:
第三种方式:异步发送,使用回调函数监控发送结果
异步发送不用阻塞当前线程,所以速度上比同步要快很多
kafka要求异步发送消息需要在send时指定一个callBack回调对象,我们新建一个类去实现CallBack接口,实现其中的onCompletio方法,这个方法有两个参数,一个是发送消息的元信息,一个是发送出现的异常,有异常就说明发送有问题,我们再新建一个sendMessageCallBack方法
public static void sendMessageCallBack(){
ProducerRecord record =
new ProducerRecord<>("user-info-topic","name","路飞");
producer.send(record,new MyCallBack());
producer.close();
}
private static class MyCallBack implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("进入异步发送回调函数");
if(exception != null){
System.out.println("出现异常:"+exception.getMessage());
}
System.out.println(String.format("发送结果:topic:%s,存储的partition:%s,offset:%s",
metadata.topic(),
metadata.partition(),
metadata.offset()));
}
}
复制代码
结果:
现在使用kafka原生API发送消息基本都已经结束了,还有一些高级一点的用法像自定义消息的分区策略,我会抽时间更新在文章下面,大家敬请期待 2019/11/29 21:33 =============================分割线========================================
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。