赞
踩
前言:Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。Apache Kafka与传统消息系统相比,有以下不同:
在安装kafka之前必须先安装JDK和zookeeper
Centos7中安装Java环境,在安装前先查看有无安装过Java环境。直接运行java 命令,会提示命令未找到,则是没有安装过java环境。
(1) 查看yum源的java包
yum list java*
(2) 安装jdk1.8
yum -y install java-1.8.0-openjdk
安装完成后会在最下面显示完成
(3) 查看版本,检测安装是否成功
java -version
若出现上述提示表示JDK已经成功安装啦!
# 解压文件
tar ‐zxvf apache‐zookeeper‐3.5.8‐bin.tar.gz# 复制一份配置文件, 方便修改
cp conf/zoo_sample.cfg conf/zoo.cfg# 启动
bin/zkServer.sh start# 连接控制台
bin/zkCli.sh # 查看zk的根目录相关节点
ls /
其实上面不用安装Zookeeper也行,Kafak自带Zookeeper
(1) 解压 kafka,进入kakfa目录
tar -xzf kafka_2.13-2.6.0.tgz
cd kafka_2.13-2.6.0
(2) 修改 kafka-server 的配置文件
vim config/server.config
修改其中的:
- # broker.id属性在kafka集群中必须要是唯一
- broker.id=0
-
- # kafka的消息存储文件,别放默认的temp目录下,它默认空间不够
- log.dir=/usr/local/data/kafka‐logs
-
- # kafka 连接 zookeeper 的地址
- zookeeper.connect=localhost:2181
-
- #阿里云内网的ip地址和9092端口
- listeners=PLAINTEXT://xxx.xx.xx:9092
-
- # kafka部署的机器ip和提供服务的端口号,阿里云外网地址
- advertised.listeners=PLAINTEXT://xx.xx.xx:9092
别光用这个指定 listeners=PLAINTEXT://xxx.xxx.xx.xx:9092,会出现kafka主机名解析hostname异常,解析成主机名SpringBoot启动会报错
DEBUG [org.apache.kafka.clients.NetworkClient] - Error connecting to node 1 at hs:9092: java.io.IOException: Can't resolve address: hs:9092
原因:Windows 上运行的kafka拿到的host是机器名而不是IP地址所以会导致报错,hs是 Kafka 实例所在Linux服务器的主机名。解决方案就是用advertised.listeners=PLAINTEXT://xx.xx.xx:9092。
(3) 首先启动Zookeeper
如果未启动zookeeper 需要先启动zookeeper,如果没有自己安装的zk可以使用Kafka自带的:
启动ZK:使用Kafka安装包中的脚本启动单节点 Zookeeper 实例(加守护进程启动)
- #在bin的上一级目录执行命令
- bin/zookeeper-server-start.sh config/zookeeper.properties
(4)启动kakfa
kafka如果直接启动会出现问题,就是日志信息会打印在控制台台。然后关闭窗口,kafka随之关闭,可以后台启动命令解决这个问题。
- # 后台不挂断启动Kakfa服务 , 运行的日志打印在 logs 目录里的server.log 里
- nohup bin/kafka-server-start.sh config/server.properties &
-
- #这时候不知道是否启动成功,我们可以查看日志,kafka目录下
- tail -500f logs/server.log
nohup这里若不指定输出日志文件,缺省地会输出到nohup.out文件中;想要不输出日志可以
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 & ,>/dev/null 2>&1表示不输出正常和错误日志。
(5)关闭kakfa
- # 停止kafka (先进入kafka目录)
- bin/kafka‐server‐stop.sh
(6)确认Linux 防火墙 kafka和Zookeeper的端口开启
如果是阿里云服务器,还得添加安全组入方向策略,因为它默认只开放远程连接的22端口
- #创建topic
-
- bin/kafka-topics.sh --create --zookeeper localhost:2100 --replication-factor 1 --partitions 1 --topic test02
-
- #查看topic
-
- bin/kafka-topics.sh --list --zookeeper localhost:2100
-
- #生产者生产消息
-
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test02
-
- 写一条this is test的信息
-
- #消费者消费消息
-
- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning
可以看到消费信息
搭建kafka 和 zookeeper 环境 并且启动它们
- <!--Kakfa-->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
-
- <!--Json工具-->
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </dependency>
-
- <!--SpringMVC-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <!--LomBok使用@Data 注解-->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.4</version>
- <scope>provided</scope>
- </dependency>
- spring:
- kafka:
- bootstrap-servers: xx.xxx.xx.x:9092 #bootstrap-servers:连接kafka的地址,多个地址用逗号分隔 (指定kafka server的地址,集群配多个,中间,逗号隔开)
- consumer:
- group-id: myGroup
- enable-auto-commit: true
- auto-commit-interval: 100ms
- properties:
- session.timeout.ms: 15000
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- auto-offset-reset: earliest
- producer:
- retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送
- batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
- buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
- key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类
- value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类
- package com.hs.demo.kakfa;
-
- import lombok.Data;
-
- import java.util.Date;
-
- //@Data : 注解在类上, 为类提供读写属性, 此外还提供了 equals()、hashCode()、toString() 方法
- @Data
- public class Message {
-
- private Long id;
-
- private String msg;
-
- private Date sendTime;
- }
主要利用 KafkaTemplate 来发送消息 ,将消息封装成Message 并且进行转化成Json串 发送到Kafka中
- package com.hs.demo.kakfa;
-
- import com.google.gson.Gson;
- import com.google.gson.GsonBuilder;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
- import java.util.Date;
-
- //@Slf4j : 注解在类上, 为类提供一个属性名为 log 的 log4j 的日志对象
- @Component
- @Slf4j
- public class KafkaProducer {
-
- private final KafkaTemplate<String, String> kafkaTemplate;
-
- //构造器方式注入 kafkaTemplate
- public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
- this.kafkaTemplate = kafkaTemplate;
- }
-
- private Gson gson = new GsonBuilder().create();
-
- public void send(String msg) {
- Message message = new Message();
-
- message.setId(System.currentTimeMillis());
- message.setMsg(msg);
- message.setSendTime(new Date());
- log.info("【生产者发送消息:{}】", gson.toJson(message));
- //对 topic = hello2 的发送消息
- kafkaTemplate.send("hello2",gson.toJson(message));
- }
-
- }
在监听的方法上通过注解配置一个监听器即可,另外就是指定需要监听的topic。kafka的消息在消费者接收端会被封装成ConsumerRecord对象返回,它内部的value属性就是实际的消息。
- package com.hs.demo.kakfa;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
- import java.util.Optional;
-
- @Component
- @Slf4j
- public class KafkaConsumer {
-
- @KafkaListener(topics = {"hello2"})
- public void listen(ConsumerRecord<?, ?> record) {
-
- Optional.ofNullable(record.value())
- .ifPresent(message -> {
- log.info("【+++++++++++++++++ record = {} 】", record);
- log.info("【消费者接收message : {}】", message);
- });
-
- // Optional msg = Optional.ofNullable(record.value());
- // if (msg.isPresent())
- // {
- // log.info("消费者消费消息:{}", msg.get());
- // }
-
- }
-
- }
提供一个 Http接口调用 KafkaProdcuer 去发送消息
- package com.hs.demo.kakfa;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @Slf4j
- public class TestController {
-
- @Autowired
- private KafkaSender kafkaSender;
-
- @GetMapping("sendMessage/{msg}")
- public void sendMessage(@PathVariable("msg") String msg){
- kafkaSender.send(msg);
- }
- }
KafkaMessageListenerContainer中有 consumer group = myGroup 有一个 监听 hello2-0 topic 的 消费者
(1)调用Http接口
http://localhost:8080/sendMessage/KafkaTestMsg
(2)输出结果
参考链接:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。