赞
踩
在一台服务器上,已经安装好的kafka中,到config配置目录下多创建两个server.properties文件,一共启动2个ekafka的服务。
上一篇博客中介绍到将kafka下载解压缩后,直接启动就完事了,但是需要注意的是配置文件中有个listeners
和advertised.listeners
的存在。
其实如果启动一个kafka服务,只是一个单机使用的,且不涉及到外网访问,那么什么都没必要去设置。但是如果是集群之类的,那么就需要做一些相应的配置了。
listeners:其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。一般公司内网的kafka集群只需要配置这个就行了。
listeners=PLAINTEXT://内网ip:9092
advertised.listeners: Advertise的含义表示宣称的、公布的,就是组监听器是 Broker 用于对外发布的。涉及到外网kafka集群时,还需要配置这个。只要是涉及到外网访问的,必须要设置这个,否则Consumer或Producer连接不到kafka的服务
advertised.listeners=PLAINTEXT://外网ip:9092
我这里涉及到外网访问,需要把 advertised.listeners配置一下。
broker.id=0
# 指定监听所有的ip, 监听所有ip的9092端口
listeners=PLAINTEXT://0.0.0.0:9092
# 注册到zookeeper的ip和端口
advertised.listeners=PLAINTEXT://xxx.xxx.xx.xx:9092
log.dir=/env/kafka/data/kafka-logs
broker.id=1
# 指定监听所有的ip, 监听所有ip的9093端口
listeners=PLAINTEXT://0.0.0.0:9093
# 注册到zookeeper的ip和端口
advertised.listeners=PLAINTEXT://xxx.xxx.xx.xx:9093
log.dir=/env/kafka/data/kafka-logs-1
注意:以上listeners写的是 0.0.0.0
,那么必须要把 advertised.listeners
配置一下,advertised.listeners是对外的地址和端口,是要注册到Zookeeper上面的,如果没有配置advertised.listeners,那么advertised.listeners的值就是 listeners的值,Zookeeper就没法知道broker的具体地址是什么了。
按照0.0.0.0配置之后,其他的所有ip都能访问到此broker的端口上。
在启动kafka服务器前,我们要先确保Zookeeper已经启动了,如果没有启动,请先启动Zookeeper的服务
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
然后进入到kafka安装目录的bin目录下,执行命令启动2台服务器。守护进程方式启动
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
启动完之后,可以到zookeeper的服务中看一下有没有注册好broker们。
可以看到已经注册了两个broker,id分别是 0 和 1
创建一个topic主题,我们相当于有两个broker,然后创建2个分区,2个副本
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic my-topic
创建完毕后,查看一下topic的详细信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-topic
那么副本是一个什么样的概念呢?
副本是为了主题中的分区创建多个备份,这些备份在kafka集群的多个broker中,会有一个副本作为leader,其他的副本都是follower,注意:在一个broker节点服务器中,同一个topic中的 Partition,不可能存在Leader和Follower都在一个broker服务器中。因为其他的备份都是被保存在其他的broker服务器上的
下图,可以看到,两个broker的数据文件中,都存在my-topic-0
和my-topic-1
。这就是数据同步,副本的体现。
一个kafka集群中包括:
创建一个生产者,这个生产者是向一个kafka集群中发送消息
./kafka-console-producer.sh --broker-list xxx.xxx.xx.xx:9092,xxx.xxx.xx.xx:9093 --topic my-topic
创建消费者,监听kafka集群中 my-topic
的主题
./kafka-console-consumer.sh --bootstrap-server xxx.xxx.xx.xx:9092,xxx.xxx.xx.xx:9093 --from-beginning --topic my-topic
然后在消费者中,发送一些信息,完全没问题,可以接收到
那么假如是两个消费者且在同一个消费组中,消费同一个topic的partition中的消息呢?
./kafka-console-consumer.sh --bootstrap-server xxx.xxx.xx.xx:9092,xxx.xxx.xx.xx:9093 --from-beginning --consumer-property group.id=group1 --topic my-topic
第一个消费者正常消费消息。
第二个消费者,无法正常消费消息。
继续生产消息,发现只有一个消费者能够正常接收到消息。
这种其实也就是前一篇博客上说的一种消息,单播消息。在一个消费者组中,消费同一个topic的partition时,只有一个消费者能消费到这个消息。
每个broker中有多个partition。一个partition只能被一个消费者组里的某一个消费者消费,从而保证消费顺序。一个消费者可以消费多个partition。
消费组中的消费者数量不能比topic中的partition数量多,否则多出来的消费者消费不到消息。
如果生产者发送消息到kafka服务器后没有接收到kafka服务器返回的ack码,生产者会被阻塞,阻塞3s的时间,如果还没有收到 ack,会进行重试,重试3次。
异步发送,生产者发送消息后就可以执行后面的业务,broker在收到消息后异步调用生产者的callback回调方法。异步发送可能会由于网络不稳定造成消息丢失。
在同步发消息的场景下:生产者将消息发送到broker后,ack会有3种不同的选择:
ack=0 : 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容 易丢失数据
ack=1(默认) : 需要等待leader成功将数据写入本地的log,但是不需要等待所有follower是否写入成功。就可以继续发送下一条消息。这种情况,如果follower没有备份成功,此时leader又挂掉了,那么消息会丢失。
ack=-1或者all,需要等待 min.insync.replicas(默认值1,推荐配置>=2),这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。 一般除非是金融级别的系统,或者是跟钱有关系的场景会使用。
当ack=0时,kafka集群中不需要任何broker收到消息,立即返回ack给生产者,效率最高,最容易丢失数据
ack=1(默认),leader完成存盘(将数据存入本地)后,返回ack给生产者,性能和安全是均衡的
ack=-1或all,当一定数量的副本(注意包括leader)完成了存盘操作时,返回ack,效率最低安全性能最高。
在生产者的配置中,可以配置生产者多少时间内如果没有接收到ack,会进行重试机制,重新发送一次信息,也可以配置重试的次数。
生产者会有一个缓冲区(默认32m),会把消息放入到缓冲区中。然后生产者中有一个本地线程,从缓冲区中一次拉取16k的消息(如果数据没有达到16k,10ms后也会将数据发送
),然后发送给kafka服务器。
生产者在发送消息的时候怎么知道自己的消息要发到哪个分区(多分区场景下)里呢?有几种方式:
无论是手动提交还是自动提交,都需要把所属的消费者+消费的主题+消费的某个分区以及消费的偏移量offset提交给_consumer_offsets
主题中
自动提交:自动提交就是当消费者将消息从kafka中间件poll下来之后,直接去提交一个offset到kafka的_consumer_offsets
的50分区中的某个分区中。假如消费者还没来得及消费,然后出现突然宕机,,但是offset已经提交了,那么这个消息就消费不到了。
poll() 是消费者的一个拉取消息的长轮询
手动提交:可以在消费消息后再去提交offset
手动提交分为同步提交和异步提交
同步提交 : 同步提交offset,当前的线程会被阻塞,直到offset提交成功
在消费完消息后调用同步提交方法,当集群返回ack前一直进行阻塞,返回ack后标识提交成功。再执行之后的逻辑
consumer.commitSync(); // 会进行线程阻塞
异步提交 : 当前的线程不会阻塞,可以继续处理后续的程序逻辑’
消息消费完之后,不需要等待ack,直接执行之后的逻辑,然后设置一个回调方法,供集群调用
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if(e !=null) {
// 提交出现错误了,及时处理
}
}
});
消费者和broker建立一个长连接,poll消息,可以设置一次poll多少条数据。假如300条数据。
max.poll.records=300
可以设置如果两次poll消息的时间超过了某个时间间隔,那么kafka认为其消费能力过弱,将其踢出消费组,将分区分配给其他的消费者
max.poll.interval.ms=300000
可以设置长轮询
的时间,假如是1000ms。那么如果一次poll到300条数据,就开始进行消费。
如果一次没有poll到300条,且现在时间不超过1000ms,那么继续poll,要么拉取到300条数据,要么到1000ms时间就开始消费。
也就是有两种情况,一种是在规定时间内poll到了指定的数据条数,一种是没在指定时间内poll到指定条数,然后都开始进行消费数据。
其他配置,健康检查
消费者需要每隔一段时间都去给kafka发送心跳,如果在一段时间内kafka没有收到消费者的心跳,那么会把该消费者踢出消费组,进行rebalance,将分区分配给其他消费者。
session.timeout.ms=3000
只是作为了解,后面使用springboot,都是注解声明式的。不用硬编码。
消费者可以指定消费主题下的哪个分区中的消息
java实现
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)))
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
一个场景:比如我们需要消费2个小时内的消息。
这种场景不太常见,遇见了查阅下资料即可。
当消费组是一个新创建的消费组,或者指定了offset的消费方式后,这个offset不存在该如何消费?
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。