当前位置:   article > 正文

kafka 3.2.0 linux部署并测试。_kafka-console-producer.sh

kafka-console-producer.sh

前言

kafka是一个优秀的消息队列框架,现在基于linux安装写个文档记录下。


提示:以下是本篇文章正文内容,下面案例可供参考

一、环境准备

1.下载linux 安装包。
https://kafka.apache.org/downloads
在这里插入图片描述
注意下载二进制包,如果不是下载二进制包,就要自己编译了,这里不需要这么麻烦,就直接下载了。官网推荐下载2.13版本,就下载这个版本就行了。
2. 注意这个版本已经内嵌了zookeeper了,不需要再去安装zookeeper了。
3. 安装了jdk1.8以及以上版本,并且配置了环境变量这里就不展开了,各位自己找文档配。

二、安装步骤

1.上传二进制文件到linux并且解压。

tar -xzf kafka_2.13-3.2.0.tgz
  • 1

2.修改kafka配置

#broker实例标识,集群时要保证唯一
broker.id=1

# kafka存放数据的目录
log.dirs=/tmp/kafka-logs

# 注册中心zookeeper的地址
zookeeper.connect=localhost:2181

# 访问IP,需要保证服务能够通信
listeners=PLAINTEXT://192.168.31.96:9092

##超时将被删除,也就是说7天之前的数据将被清理掉。
log.retention.hours=168

# 是否允许自动创建topic ,若是false,就需要通过命令创建topic
delete.topic.enable=true

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

3.启动zookeeper

cd kafka_2.13-3.2.0
bin/zookeeper-server-start.sh config/zookeeper.properties
  • 1
  • 2

4. 启动kafka

bin/kafka-server-start.sh config/server.properties
  • 1

5. 测试是否启动成功

5.1 先看看进程在不在,

ps -ef|grep kafka
  • 1

在这里插入图片描述

5.2 看看zookeeper进程是不是在。

在这里插入图片描述

5.3 新建topic

listeners=PLAINTEXT://192.168.31.96:9092
ip修改为上面kafka的地址

 bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server 192.168.31.96:9092
  • 1

5.4 开启生产者端。

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 192.168.31.96:9092
  • 1

5.5 另外打开一个终端, 开启监听端

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server 192.168.31.96:9092
  • 1

5.6 在生产端输入消息,然后再5.5的消费端能消费到数据,就证明了kafka安装成功了。

在这里插入图片描述

三、springboot集成

3.1 引入pom

      <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4

3.2 application.properties,增加配置kafka

这里需要注意key不要写错,我当时就是spring.kafka.bootstrap-servers
写成了kafka.bootstrap-servers,导致一直连的是localhost:9092一直测试半天连接不上。

spring.kafka.bootstrap-servers=192.168.31.96:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.listener.ack-mode=manual_immediate
spring.listener.missing-topics-fatal=false
  • 1
  • 2
  • 3
  • 4
  • 5

3.3 新增control测试

@RestController
public class KafkaControl {

    @Autowired
    private KafkaTemplate kafkaTemplate;



    @RequestMapping(value = "/hello3",method = RequestMethod.GET)
    public String index() {
        kafkaTemplate.send("quickstart-events","hillll");
        return "200";
    }




}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

这里往刚刚的虚拟机topic发送消息,如果里面的消费者端收到消息则证明连接成功了。
在这里插入图片描述

3.4 发送成功了,看下虚拟机的情况。

在这里插入图片描述

可以看到已经发送成功了,测试完毕。

总结

  1. 踩了些坑,总结了一些经验,刚开始不知道要检查是否启动成功了,只查看了进程,但是其实是启动失败的。用springboot连接的时候死活连接不上,
    所以一定要先在linux上测试。
  2. 这个版本的安装资料比较少,网上的资料的启动方式都是错误的,参考了官网才知道新的kafka启动方式和zookeeper启动方式,官网才是好的学习方法。
  3. 附带linux开端口的方法,适用于centos7版本
    开启端口命令
    firewall-cmd --zone=public --add-port=9092/tcp --permanent
    重启防火墙
    systemctl restart firewalld.service
    查看开放的端口
    #查看已开启的端口
    firewall-cmd --list-ports
    本地测试连接虚拟机的时候需要开放端口才能连接上虚拟机,这里也算是补充一个小知识点。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/78567
推荐阅读
相关标签
  

闽ICP备14008679号