赞
踩
kafka是由Scala
和Java
编写,是一种高吞吐量的分布式发布订阅消息系统
,它可以处理消费者在网站中的所有动作流数据。
在实际生产应用中,通常会使用kafka作为消息传输的数据管道,rabbitmq作为交易数据作为数据传输管道,主要的取舍因素则是是否存在丢失数据的可能;rabbitmq在金融场景中经常使用,具有较高的严谨性,数据丢失的可能性更小,同事具备更高的实时性;而kafka优势主要体现在吞吐量上,虽然可以通过策略实现数据不丢失,但从严谨性角度来讲,大不如rabbitmq;而且由于kafka保证每条消息最少送达一次,有较小的概率会出现数据重复发送的情况;
在安装kafka之前,我们需要确保服务器上有Java环境。同时kafka依赖zookeeper,我们可以自行安装zookeeper,在下载kafka的同时,其也自带有zookeeper,也可以使用kafka自带的zookeeper。
下载kafka地址:https://kafka.apache.org/downloads
选择目录解压kafka,并重命名文件夹
tar -xzvf kafka_2.13-3.2.0.tgz
mv kafka_2.13-3.2.0.tgz kafka
进入kafka的config目录下,修改server.properties文件
cd config
vim server.properties
需要修改的属性如下:
broker.id=0
port=9092 #端口号
host.name=localhost #单机可直接用localhost,也可填写为服务器地址
log.dirs=/usr/local/kafka/logs #日志存放路径可修改可不修改
zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181
listeners=PLAINTEXT://0.0.0.0:9092 #0.0.0.0表示绑定所有网卡
advertised.listeners=PLAINTEXT://47.108.248.73:9092 #此处填写公网IP(即服务器地址)
zookeeper.connect=127.0.0.1:2181
创建日志存放目录
mkdir /usr/local/kafka/logs
回到kafka的config目录下,修改zookeeper.properties
dataDir=/usr/local/kafka/data/zookeeper #zookeeper数据目录 (可以修改可以不修改)
clientPort=2181
maxClientCnxns=100
tickTimes=2000
initLimit=10
syncLimit=5
admin.enableServer=false
同样,创建dataDir文件夹
mkdir /usr/local/kafka/data/zookeeper
# consumer group id
group.id=test-consumer-group #该属性可改可不改,在spring中配置kafka的consumer.group-id时需要
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
或
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic demo
其中demo为创建的topic名称。创建了一个名为 demo 的主题,其中包含一个分区和一个副本因子。 创建成功之后会输出:Created topic "demo".
创建主题后,系统会在config / server.properties文件中的"/ tmp / kafka-logs /"中指定的创建主题的日志。每进行一个不同的事件,则需要新建一个topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
或
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic demo
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic demo
进入kafka目录,执行以下命令
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
生产者命令行客户端需要两个主要参数:代理列表和主体名称。localhost:9092为代理列表,demo为主体名称
旧版本
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic demo --from-beginning
若使用旧版本命令启动报错 zookeeper is not a recognized option,则请使用新版本命令启动
新版本
bin/kafka-console-consumer.sh --bootstrap-server 192.168.xxx.xxx:9092 --topic 名称 --from-beginning
kafka所在目录下,执行以下命令
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic demo --time -1
./kafka-console-consumer.sh --bootstrap-server 192.168.xxx.xxx:9092 --topic redis_event --from-beginning
./kafka-console-consumer.sh --bootstrap-server 192.168.xxx.xxx:9092 --topic redis_event #查看最新的数据
搭建集群与单点的差别在于在配置文件上有所变化。在其他服务器中按照以上方式搭建时,修改参数如下
broker.id=0 # 修改每一个节点的以下参数,使其不同
listens: PLAINTEXT://192.168.xx.xx:9092 kafka服务器地址以及启动端口
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181 #将该属性配置为其他节点所在服务器的zookeeper访问地址,即zookeeper集群访问地址
# 分发kafka安装目录给其他集群节点
scp -r /usr/local/kafka/ 192.168.xx.x1:/opt/module
scp -r /usr/local/kafka/ 192.168.xx.x2:/opt/module
# 创建kafka启动脚本 vim kafka-cluster.sh # 添加如下内容 #!/bin/bash case $1 in "start"){ for i in 192.168.xx.x1 192.168.xx.x2 192.168.xx.x3 do echo -------------------------------- $i kafka 启动 --------------------------- ssh $i "source /etc/profile;/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties" done } ;; "stop"){ for i in 192.168.xx.x1 192.168.xx.x2 192.168.xx.x3 do echo -------------------------------- $i kafka 停止 --------------------------- ssh $i "/usr/local/kafka/bin/kafka-server-stop.sh" done } ;; esac
保存退出后,修改执行权限
chmod +x ./kafka-cluster.sh
按照单点的方法将集群中每个zookeeper节点启动即可
找到编写的该脚本的位置,执行以下命令
./kafka-cluster.sh start
cmak(kafka-manager)是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理操作。是使用图像对kafka进行一些简单的集群管理的工具
下载地址:https://github.com/yahoo/CMAK/releases
注意:cmak要求java版本为11
unzip -o cmak-3.0.0.6.zip
mv cmak-3.0.0.6 kafka-manager
进入kafka-manager的bin文件,修改application.conf配置
kafka-manager.zkhosts="localhost:2181"
cmak.zkhosts="localhost:2181"
如果需要对kafka集群进行管理,则将application.conf改为
kafka-manager.zkhosts="192.168.xx.x1:2181,192.168.xx.x2:2181,192.168.xx.x3:2181"
cmak.zkhosts="192.168.xx.x1:2181,192.168.xx.x2:2181,192.168.xx.x3:2181"
nohup bin/cmak >/dev/null 2>&1 &
注*:启动端口号默认为9000,可通过-Dhttp.port在指定端口;-Dconfig.file=conf/application.conf指定配置文件;
nohup bin/cmak -Dconfig.file=conf/application.conf -Dhttp.port=9001 &
查看cmak是否启动
ss -ntl
或
lsof -i:9000
查看linux防火墙是否开放了该端口号,如果没有,则开放该端口,并重新加载防火墙:
firewall-cmd --zone=public --list-ports
firewall-cmd --zone=public --add-port=9000/tcp --permanent
firewall-cmd --reload
访问该地址http://192.168.xx.xxx:9000
能成功访问说明配置成功。点击Cluster的下拉按钮,新建一个Cluster,添加Cluster Name、Cluster Zookeeper Host,选择kafka版本后保存即可
查看创建的Cluster
brokers即kafka集群数,在集群里有多少个kafka服务则有多少个brokers
点击Topic的下拉按钮新建一个Topic,填入topic名称即可。
查看已创建的topic
至此kafka以及管理工具配置完成。
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${kafka.version}</version>
</dependency>
spring: kafka: bootstrap-servers: 47.108.248.73:9092 producer: #生产者 # 发生错误后,消息重发的次数。 # 此值需要结合业务场景,重试与否各有千秋(重试,好处:尽可能的确保生产者写入block成功;坏处:有可能使带有顺序写入的数据打乱顺序 #比如:依次写入数据 1/2/3,但写入1时因网络异常,进行了重写,结果落到block的数据成了2/3/1) retries: 1 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 batch-size: 16384 #16k # 设置生产者内存缓冲区的大小 buffer-memory: 33554432 acks: 1 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: #消费者 # 与kafka config consumer.properties中的group.id属性对应 # 不配置该属性,则消息会被所有的消费者消费,即有多少个消费者,这条消息就会被消费多少次。(多对多,也叫广播模式) # 配置了该属性,则消息只会被这个消费组下的一个消费者所消费(点对点,也叫发布订阅模式) group-id: test-consumer-group # 自动提交的时间间隔 # 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式, # 如1S,1M,2H,5D.此属性只有在enable-auto-commit:true时生效 auto-commit-interval: 1S enable-auto-commit: false # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理 # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 auto-offset-reset: earliest # 键的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: # record 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 # batch 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # time 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交 # count 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交 # count_time time 和 count 有一个条件满足时提交 # manual 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交 # manual_immediate 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种 ack-mode: manual_immediate # 在侦听器容器中运行的线程数 concurrency: 5
生产:
@Autowired
KafkaTemplate kafkaTemplate;
public void sendMessage(){
kafkaTemplate.send("test", "msgContent");
}
消费:
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record){
System.out.println(record.value().toString());
}
特别注意,生产和消费方法都需要在带有@Component注解的类中实现
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。