赞
踩
Kafka
是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper
协调的分布式日志系统(也可以当做 MQ
系统),常见可以用于 web/nginx
日志、访问日志,消息服务等等,Linkedin于2010年12月贡献给了 Apache基金会 并成为顶级开源项目。
Kafka
能确保从生产者传到消费者的记录都是有序的。Kafka
支持单机每秒至少处理10万以上消息,通常可以达到数百万条消息。Kafka
本质是 支持分布式的消息系统/消息中间件
。分析 Kafka
的应用场景等同于分析 消息中级件
的应用场景。通常,使用 消息系统
的 发布/订阅模型 功能来连接 生产者
和 消费者
。实现以下三大功能:
具体应用场景有:
需要说明一下, 为了支持 Kafka
的集群功能, Zookeeper
必须使用集群模式部署。
本文以部署 3 个Zookeeper 实例的伪集群为例。具体安装步骤参阅之前的文章:Zookeeper 安装教程和使用指南
下载链接:Kafka Downloads
下载页面中包含两种下载方式
- : kafka-[version]-src.tgz:包含 Kafka 源码和API源码,需要自己编译
[root@Ali ~]# wget https://downloads.apache.org/kafka/3.6.2/kafka_2.12-3.6.2.tgz
[root@Ali ~]# tar xzvf kafka_2.12-3.6.2.tgz
[root@Ali ~]# mv /usr/local/kafka_2.12-3.6.2 /usr/local/kafka
配置第一个 Kafka
实例
# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9091端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9091
# 相当于listeners=PLAINTEXT://0.0.0.0:9091
# 消息日志存放地址
log.dirs=/usr/local/kafka/logs
# ZooKeeper 地址,多个用,分隔 /kafka指定在zk上的目录
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka
配置第二个 Kafka
实例
# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9092端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9092
# 消息日志存放地址
log.dirs=/opt/kafka/logs
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka
注:两个客户端的listeners中的port不能一样
# 启动服务 -daemon 表示后台启动 $KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties # 查看服务 jps -l 43330 org.apache.zookeeper.server.quorum.QuorumPeerMain 14356 org.elasticsearch.bootstrap.Elasticsearch 14583 org.logstash.Logstash 45976 kafka.Kafka # kafka服务进程 netstat -anlpt | grep 9091 tcp6 0 0 :::9091 :::* LISTEN 45976/java tcp6 0 0 192.168.18.128:9091 192.168.18.128:49356 TIME_WAIT - # 关闭服务 $KAFKA_HOME/bin/kafka-server-stop.sh
#两条命令效果一样
bin/kafka-topics.sh --create --bootstrap-server localhost:9091 --partitions 2 --replication-factor 2 --topic yumu
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 2 --replication-factor 2 --topic yumu
在kafka1上创建一个topic,会自动同步到其他客户端
--create
表示创建操作--zookeeper
指定了 Kafka 连接的 ZooKeeper--partitions
表示每个主题4个分区--replication-factor
表示创建每个分区创建2个副本(副本因子)--topic
表示主题名称。
注:副本因子不能超过存活的broker数量,否则报错:Replication factor: 20 larger than available brokers: xxx.
# 查看topic列表 #两条命令效果一样
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka
__consumer_offsets
topic-demo
yumu
# 查看topic详细信息 #两条命令效果一样
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic yumu
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic yumu
Topic: yumu PartitionCount: 2 ReplicationFactor: 2 Configs:
Topic: yumu Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: yumu Partition: 1 Leader: 1 Replicas: 2,1 Isr: 1,2
# 窗口1,启动生产者,向yumu主题发送消息 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu # 窗口2,启动消费者,订阅yumu主题 bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu # 窗口3,启动消费者,订阅yumu主题 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu =====结果===== # 生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu >hello, kafka! >once again. > # 消费者1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu hello, kafka! once again. # 消费者2 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu hello, kafka! once again. # 查看所有消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu --from-beginning # 删除topic bin/kafka-topics.sh --delete --bootstrap-server localhost:9091 --topic yumu
1. 第一次启动kafka成功后,关闭kafka并修改配置,再次启动失败,报错如下:
[2020-11-07 20:43:00,866] INFO Cluster ID = MChFWWMBT9GJClVEriND5A (kafka.server.KafkaServer)
[2020-11-07 20:43:00,873] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID MChFWWMBT9GJClVEriND5A doesn't match stored clusterId Some(c6QPfvqlS6C3gtsYZptQ8Q) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
at kafka.server.KafkaServer.startup(KafkaServer.scala:235)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
[2020-11-07 20:43:00,875] INFO shutting down (kafka.server.KafkaServer)
[2020-11-07 20:43:00,877] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,986] INFO Session: 0x1000da0dde2000c closed (org.apache.zookeeper.ZooKeeper)
[2020-11-07 20:43:00,986] INFO EventThread shut down for session: 0x1000da0dde2000c (org.apache.zookeeper.ClientCnxn)
[2020-11-07 20:43:00,987] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,992] INFO shut down completed (kafka.server.KafkaServer)[2020-11-07 20:43:00,992] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
[2020-11-07 20:43:00,993] INFO shutting down (kafka.server.KafkaServer)
原因:
kafka启动之后会生成一些日志和配置,导致这个问题的原因是第一次启动之后生成了log/meta.properties文件
cat meta.properties
#
#Sat Nov 07 21:43:51 CST 2020
broker.id=1
version=0
cluster.id=MChFWWMBT9GJClVEriND5A
第二次改完配置后再去启动的时候生成应该会生成一个新的id,新的id和旧的ID不一致导致无法启动,删除log/meta.properties文件后重新启动即可(疑问:是不是我关闭的方法不对呢?)
推荐阅读:
下一篇:Kafka消息系统原理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。