当前位置:   article > 正文

kafka部分资料整理_kafka资料

kafka资料

kafka

简介

kafka是由ScalaJava编写,是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

kafka与rabbitMQ的实际场景选择

在实际生产应用中,通常会使用kafka作为消息传输的数据管道,rabbitmq作为交易数据作为数据传输管道,主要的取舍因素则是是否存在丢失数据的可能;rabbitmq在金融场景中经常使用,具有较高的严谨性,数据丢失的可能性更小,同事具备更高的实时性;而kafka优势主要体现在吞吐量上,虽然可以通过策略实现数据不丢失,但从严谨性角度来讲,大不如rabbitmq;而且由于kafka保证每条消息最少送达一次,有较小的概率会出现数据重复发送的情况;

搭建单点kafka

环境准备

在安装kafka之前,我们需要确保服务器上有Java环境。同时kafka依赖zookeeper,我们可以自行安装zookeeper,在下载kafka的同时,其也自带有zookeeper,也可以使用kafka自带的zookeeper。

下载kafka

下载kafka地址:https://kafka.apache.org/downloads

选择目录解压kafka,并重命名文件夹

tar -xzvf kafka_2.13-3.2.0.tgz
mv kafka_2.13-3.2.0.tgz kafka
  • 1
  • 2

配置kafka

server.properties

进入kafka的config目录下,修改server.properties文件

cd config
vim server.properties
  • 1
  • 2

需要修改的属性如下:

broker.id=0 
port=9092 #端口号 
host.name=localhost #单机可直接用localhost,也可填写为服务器地址
log.dirs=/usr/local/kafka/logs #日志存放路径可修改可不修改
zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181 
listeners=PLAINTEXT://0.0.0.0:9092 #0.0.0.0表示绑定所有网卡
advertised.listeners=PLAINTEXT://47.108.248.73:9092 #此处填写公网IP(即服务器地址)
zookeeper.connect=127.0.0.1:2181
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

创建日志存放目录

mkdir /usr/local/kafka/logs
  • 1
zookeeper.properties

回到kafka的config目录下,修改zookeeper.properties

dataDir=/usr/local/kafka/data/zookeeper  #zookeeper数据目录  (可以修改可以不修改)
clientPort=2181
maxClientCnxns=100
tickTimes=2000
initLimit=10
syncLimit=5
admin.enableServer=false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

同样,创建dataDir文件夹

mkdir /usr/local/kafka/data/zookeeper
  • 1
consumer.properties
# consumer group id
group.id=test-consumer-group #该属性可改可不改,在spring中配置kafka的consumer.group-id时需要
  • 1
  • 2

启动

启动zookeeper
bin/zookeeper-server-start.sh -daemon  config/zookeeper.properties
  • 1
启动kafka
bin/kafka-server-start.sh -daemon  config/server.properties
  • 1

Kafka topic

创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
或
bin/kafka-topics.sh --create --zookeeper localhost:2181  --topic demo
  • 1
  • 2
  • 3

其中demo为创建的topic名称。创建了一个名为 demo 的主题,其中包含一个分区和一个副本因子。 创建成功之后会输出:Created topic "demo".创建主题后,系统会在config / server.properties文件中的"/ tmp / kafka-logs /"中指定的创建主题的日志。每进行一个不同的事件,则需要新建一个topic

查询topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
或
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic demo
  • 1
  • 2
  • 3
删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic demo
  • 1

Kafka 生产/消费

启动生产者

进入kafka目录,执行以下命令

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
  • 1

生产者命令行客户端需要两个主要参数:代理列表和主体名称。localhost:9092为代理列表,demo为主体名称

启动消费者

旧版本

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic demo --from-beginning
  • 1

若使用旧版本命令启动报错 zookeeper is not a recognized option,则请使用新版本命令启动

新版本

bin/kafka-console-consumer.sh --bootstrap-server 192.168.xxx.xxx:9092 --topic 名称 --from-beginning
  • 1
查看kafka生产最大位置偏移量

kafka所在目录下,执行以下命令

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic demo --time -1
  • 1
消费消息查看
./kafka-console-consumer.sh --bootstrap-server 192.168.xxx.xxx:9092 --topic redis_event --from-beginning 
./kafka-console-consumer.sh --bootstrap-server 192.168.xxx.xxx:9092 --topic redis_event #查看最新的数据
  • 1
  • 2

搭建kafka集群

搭建集群与单点的差别在于在配置文件上有所变化。在其他服务器中按照以上方式搭建时,修改参数如下

server.properties

broker.id=0 # 修改每一个节点的以下参数,使其不同
listens: PLAINTEXT://192.168.xx.xx:9092 kafka服务器地址以及启动端口
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181 #将该属性配置为其他节点所在服务器的zookeeper访问地址,即zookeeper集群访问地址
  • 1
  • 2
  • 3

分发kafka安装目录

# 分发kafka安装目录给其他集群节点
scp -r /usr/local/kafka/ 192.168.xx.x1:/opt/module
scp -r /usr/local/kafka/ 192.168.xx.x2:/opt/module
  • 1
  • 2
  • 3

编写kafka集群操作脚本

# 创建kafka启动脚本
vim kafka-cluster.sh
# 添加如下内容
#!/bin/bash
case $1 in
"start"){
	for i in 192.168.xx.x1 192.168.xx.x2 192.168.xx.x3
	do 
		 echo -------------------------------- $i kafka 启动 ---------------------------
		ssh $i "source /etc/profile;/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties"
	done
}
;;
"stop"){
	for i in 192.168.xx.x1 192.168.xx.x2 192.168.xx.x3
	do
		echo -------------------------------- $i kafka 停止 ---------------------------
		ssh $i "/usr/local/kafka/bin/kafka-server-stop.sh"
	done
}
;;
esac
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

保存退出后,修改执行权限

chmod +x ./kafka-cluster.sh
  • 1
启动zookeeper集群

按照单点的方法将集群中每个zookeeper节点启动即可

启动kafka集群

找到编写的该脚本的位置,执行以下命令

./kafka-cluster.sh start
  • 1

安装cmak

cmak(kafka-manager)是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理操作。是使用图像对kafka进行一些简单的集群管理的工具

下载地址:https://github.com/yahoo/CMAK/releases

注意:cmak要求java版本为11

解压cmak

unzip -o cmak-3.0.0.6.zip
mv cmak-3.0.0.6 kafka-manager
  • 1
  • 2

进入kafka-manager的bin文件,修改application.conf配置

kafka-manager.zkhosts="localhost:2181"
cmak.zkhosts="localhost:2181"
  • 1
  • 2

如果需要对kafka集群进行管理,则将application.conf改为

kafka-manager.zkhosts="192.168.xx.x1:2181,192.168.xx.x2:2181,192.168.xx.x3:2181"
cmak.zkhosts="192.168.xx.x1:2181,192.168.xx.x2:2181,192.168.xx.x3:2181"
  • 1
  • 2

启动cmak

nohup bin/cmak >/dev/null 2>&1 &
  • 1

注*:启动端口号默认为9000,可通过-Dhttp.port在指定端口;-Dconfig.file=conf/application.conf指定配置文件;

nohup bin/cmak -Dconfig.file=conf/application.conf -Dhttp.port=9001 &
  • 1

查看cmak是否启动

ss -ntl
或
lsof -i:9000
  • 1
  • 2
  • 3

查看linux防火墙是否开放了该端口号,如果没有,则开放该端口,并重新加载防火墙:

firewall-cmd --zone=public --list-ports

firewall-cmd --zone=public --add-port=9000/tcp --permanent

firewall-cmd --reload
  • 1
  • 2
  • 3
  • 4
  • 5

访问该地址http://192.168.xx.xxx:9000

在这里插入图片描述

新建Cluster

能成功访问说明配置成功。点击Cluster的下拉按钮,新建一个Cluster,添加Cluster Name、Cluster Zookeeper Host,选择kafka版本后保存即可
在这里插入图片描述
在这里插入图片描述
查看创建的Cluster

brokers即kafka集群数,在集群里有多少个kafka服务则有多少个brokers

新建topic

点击Topic的下拉按钮新建一个Topic,填入topic名称即可。

在这里插入图片描述

查看已创建的topic
在这里插入图片描述

至此kafka以及管理工具配置完成。

springboot整合kafka

  1. 在项目中引入kafka依赖
<!--kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${kafka.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 在application.yml中配置kafka相关参数
spring:
    kafka:
        bootstrap-servers: 47.108.248.73:9092
        producer: #生产者
          # 发生错误后,消息重发的次数。
          # 此值需要结合业务场景,重试与否各有千秋(重试,好处:尽可能的确保生产者写入block成功;坏处:有可能使带有顺序写入的数据打乱顺序
          #比如:依次写入数据 1/2/3,但写入1时因网络异常,进行了重写,结果落到block的数据成了2/3/1)
          retries: 1
          #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
          batch-size: 16384 #16k
          # 设置生产者内存缓冲区的大小
          buffer-memory: 33554432
          acks: 1
          # 键的序列化方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 值的序列化方式
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer: #消费者
          # 与kafka config consumer.properties中的group.id属性对应
          # 不配置该属性,则消息会被所有的消费者消费,即有多少个消费者,这条消息就会被消费多少次。(多对多,也叫广播模式)
          # 配置了该属性,则消息只会被这个消费组下的一个消费者所消费(点对点,也叫发布订阅模式)
          group-id: test-consumer-group 
          # 自动提交的时间间隔
          # 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,
          # 如1S,1M,2H,5D.此属性只有在enable-auto-commit:true时生效
          auto-commit-interval: 1S
          enable-auto-commit: false
          # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理
          # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
          # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
          auto-offset-reset: earliest
          # 键的反序列化方式
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # 值的反序列化方式
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        listener:
          # record 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
          # batch 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
          # time 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
          # count 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
          # count_time time 和 count 有一个条件满足时提交
          # manual 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
          # manual_immediate 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
          ack-mode: manual_immediate
          # 在侦听器容器中运行的线程数
          concurrency: 5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  1. 代码层:

生产:

@Autowired
KafkaTemplate kafkaTemplate;
public void sendMessage(){
    kafkaTemplate.send("test", "msgContent");
}
  • 1
  • 2
  • 3
  • 4
  • 5

消费:

@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record){
    System.out.println(record.value().toString());
}
  • 1
  • 2
  • 3
  • 4

特别注意,生产和消费方法都需要在带有@Component注解的类中实现

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/142387
推荐阅读
相关标签
  

闽ICP备14008679号