当前位置:   article > 正文

CentOS 搭建Kafka集群_centos8 kafka容器集群部署方案

centos8 kafka容器集群部署方案

        目录

                1、Kafka下载

                2、安装 Zookeeper 

                3、安装 Kafka

                4、spring boot 整合Kafka 

                5、使用 kraft 注册中心


1、Kafka下载

       进行 Kafka官网 下载 kafka

wget https://downloads.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz

2、安装 Zookeeper 

        Zookeeper 集群搭建

        可以不使用 zookeeper 集群,单节点 zookeeper 即可。

        后面会有 kraft 作为注册中的说明,不想使用zookeeper的可以使用 kraft 。

3、安装 Kafka

        使用tar命令解压 kafka_2.12-3.1.0.tgz,通过 -C 指定 kafka 的安装目录。

        在 Kafka 根目录下创建 data 文件夹,存放数据文件

  1. [root@server01 software]# tar -zxvf kafka_2.12-3.1.0.tgz -C /usr/local
  2. [root@server01 software]# cd /usr/local/kafka_2.12-3.1.0/
  3. [root@server01 kafka_2.12-3.1.0]# mkdir data

        修改Kafka 的配置文件 server.properties 

[root@server01 kafka_2.12-3.1.0]# vi config/server.properties 

        每台服务器的 broker.id 必须唯一,host.name=ip 是新增的一行配置,避免本来连接虚拟机中的Kafka服务器报错。

        在配置zookeeper 的连接地址时,后面需要加上 /kafka ,这样Kafka 的相关配置就在zookeeper的 /kafka 节点下。

        

        Kafka 集群的IP分别是192.168.19.11、192.168.19.12、192.168.19.11,对应如下:

        Kafka 01 配置:

        broker.id=1
        host.name=192.168.19.11
        listeners=PLAINTEXT://192.168.19.11:9092
        advertised.listeners=PLAINTEXT://192.168.19.11:9092
        zookeeper.connect=192.168.19.11:2181,192.168.19.12:2181,192.168.19.13:2181/kafka

        Kafka 02 配置:

        broker.id=2
        host.name=192.168.19.12
        listeners=PLAINTEXT://192.168.19.12:9092
        advertised.listeners=PLAINTEXT://192.168.19.12:9092
        zookeeper.connect=192.168.19.11:2181,192.168.19.12:2181,192.168.19.13:2181/kafka

        Kafka 03 配置:

        broker.id=3
        host.name=192.168.19.13
        listeners=PLAINTEXT://192.168.19.13:9092
        advertised.listeners=PLAINTEXT://192.168.19.13:9092
        zookeeper.connect=192.168.19.11:2181,192.168.19.12:2181,192.168.19.13:2181/kafka

        分别启动 Kafka 服务,启动成功后,我们可以使用jps 查看启动情况,也可以通过 Kafka 日志查看 Kafka 是否启动成功,Kafka 日志文件在 Kafka 根目录下 logs/server.log。

[root@server01 kafka_2.12-3.1.0]# sh bin/kafka-server-start.sh -daemon config/server.properties

        使用 PrettyZoo 连接zookeeper,查看 Kafka 注册情况

        PrettyZoo GitHub 地址:GitHub PrettyZoo

        PrettyZoo 下载地址:Releases · vran-dev/PrettyZoo · GitHub

        ids 分别代表启动的三台 Kafka 服务器,topics 是Kafka 集群上的所有 topic。

4、spring boot 整合Kafka 

        新建 Spring Boot 项目

        引入 kafka 整合 spring 的依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>2.8.2</version>
  5. </dependency>

        application.yml 配置 

  1. server:
  2. port: 10009
  3. spring:
  4. application:
  5. name: spring-boot-kafka
  6. kafka:
  7. bootstrap-servers: 192.168.19.11:9092,192.168.19.12:9092,192.168.19.13:9092
  8. producer:
  9. retries: 3 # 客户端会发送失败重试次数
  10. batch-size: 16384 #16K
  11. buffer-memory: 33554432 #32M
  12. # 指定消息key和消息体的编码方式
  13. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  14. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  15. consumer:
  16. group-id: consumer-group # 消费者组
  17. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  18. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

       使用 KafkaTemplate 发送一条简单的MQ 消息

  1. @RestController
  2. @RequestMapping("/kafka")
  3. public class KafkaTopicController {
  4. @Resource
  5. private KafkaTemplate<String,String> kafkaTemplate;
  6. @RequestMapping("/sendMessage")
  7. public void sendMessage(){
  8. String topic = "order-topic";
  9. kafkaTemplate.send(topic,"{orderId:1}");
  10. }
  11. }

         使用 @KafkaListener(topics = {"order-topic"}) 配置监听的topic。

  1. @Slf4j
  2. @Component
  3. public class KafkaTopicListener {
  4. @KafkaListener(topics = {"order-topic"})
  5. public void receiveMessage(String message){
  6. log.info("接收到的消息:{}" , message);
  7. }
  8. }

         访问 127.0.0.1:10009/kafka/sendMessage 测试 Kafka 消息是否发送成功。

        在 zookeeper 上,我们可以看到刚刚我们新创建的 order-topic ,Kafka 配置文件中,分区数配置的是1 ,num.partitions=1 ,所以 order-topic 下只有一个分区。

        代码地址:

        GitHub spring-boot/spring-boot-kafka

5、使用 kraft 注册中心

        Kafka决定未来弃用zookeepr已经有很长时间了,从Kafka 2.8开始,官方提出了Kraft模式,使用raft算法。不过目前仍然处于测试阶段,生产上不建议使用
        使用Kraft模式,元数据会存储再controller节点中的KRaft quorum中。相比与zookeeper模式,Kraft模式更加容易扩展。

        KRaft mode in Kafka 3.1 is provided for testing only, NOT for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode.

        There may be bugs, including serious ones. You should assume that your data could be lost at any time if you try the preview release of KRaft mode.

        下面,我会根据官方给出的文档,搭建一个Kraft集群。 

         KRaft 相关的文档和配置,都在 kafka_2.12-3.1.0/config/kraft 目录下面

            

        在 kafka_2.12-3.1.0 目录下创建 kraft-combined-logs 文件夹存储数据文件。

启动KRaft集群,需要执行以下操作:

       (1)、修改kraft/server.properties
       (2)、生成一个cluster ID
       (3)、格式化存储目录
       (4)、启动Kafka服务

1、修改kraft/server.properties

node.id=1

controller.quorum.voters=1@192.168.19.11:9093,2@192.168.19.12:9093,3@192.168.19.13:9093

listeners=PLAINTEXT://192.168.19.11:9092,CONTROLLER://192.168.19.11:9093

inter.broker.listener.name=PLAINTEXT

log.dirs=/app/kafka-3.1.0/kraft-combined-logs

2、生成一个cluster ID

[root@server01 kafka_2.12-3.1.0]# ./bin/kafka-storage.sh random-uuid

3、格式化存储目录

[root@server01 kafka_2.12-3.1.0]# ./bin/kafka-storage.sh format -t jkrwjNohQKm2iVf4ttheQQ -c ./config/kraft/server.properties

 4、启动Kafka服务

[root@server01 kafka_2.12-3.1.0]# ./bin/kafka-server-start.sh ./config/kraft/server.properties

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/564867
推荐阅读
相关标签
  

闽ICP备14008679号