赞
踩
一、下载安装包,我用的是kafka_2.12-2.3.0 百度盘提取https://pan.baidu.com/s/1G8sp9Y5vkIGraZ7vFS1OSw,提取码:15h2。
二、安装
1、解压到:D:\Tools\kafka_2.12-2.3.0 ,其实解压到哪里,依据你的习惯吧,解压位置随便。
2、修改配置文件:D:\Tools\kafka_2.12-2.3.0\config\server.properties
log.dirs=D:\Tools\kafka_2.12-2.3.0\kafka-logs;
解压之后的server.properties 是这样的,
我们要加上面两个配置,如果不加上的话,就会出现:Connection to node 0 could not be established. Broker may not be available.的错误。
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
3、启动
(1)启动zookeeper,这个在kafka包下已经有的了,不需要再安装了。在目录D:\Tools\kafka_2.12-2.3.0\bin\windows下的地址栏中直接输入cmd回车就可进入命令窗口。
输入:zookeeper-server-start.bat ..\..\config\zookeeper.properties
(2)启动kafka,和上面进入某目录命令窗口操作一样。目录:E:\softinstall\kafka_2.12-2.2.1\bin\windows 执行: kafka-server-start.bat ..\..\config\server.properties
(3)建pp主题,目录:E:\softinstall\kafka_2.12-2.2.1\bin\windows 执行:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pp
(4)建消费者,目录:E:\softinstall\kafka_2.12-2.2.1\bin\windows 执行:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic pp --from-beginning
(5)建生产者,E:\softinstall\kafka_2.12-2.2.1\bin\windows 执行:kafka-console-producer.bat --broker-list localhost:9092 --topic pp
(6)然后在窗口5生产者中输入hello可以在窗口4消费者中看到hello字符。
在springboot中使用KafkaTemplate。
一,在pom.xml中加入kafka包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
二、properties文件
# =============== Kafka配置 Begin ===============
# 集群地址列表
spring.kafka.bootstrap-servers=192.168.11.152:9092(我本地的kafka地址来的)
# 连接超时时间
spring.kafka.session.timeout.ms=1000
#请求失败重试次数
spring.kafka.retries=5
#spring.kafka.batch.size=16384
spring.linger.ms=1
spring.kafka.auto-offset-reset=earliest
# 开启手动 ack
spring.kafka.listener.ack-mode=manual
#生产者设置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#重试次数.
spring.kafka.producer.retries=10
#只需要 leader 许可
spring.kafka.producer.acks=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
#消费组设置
#指定默认消费者group id
spring.kafka.consumer.group-id=tradeGroup
spring.kafka.consumer.max-poll-records=3000
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# =============== kafka配置 End ===============
三、发送消息
@Component
public class KafkaProductService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public boolean send(UserModel userModel) {
kafkaTemplate.send(KafkaTopic.KAFKA_TEST, JsonUtil.bean2String(userModel));
return true;
}
}
直接使用KafkaTemplate类的send方法进行消息的发送。参数就是topic主题和所发送的消息。
四、接收消息
@Component
@Slf4j
public class KafkaConsumerService {
@KafkaListener(topics={KafkaTopic.KAFKA_TEST})
public void receiverDataListener(ConsumerRecord<String, String> record ,Acknowledgment ack){
Optional<String> kafkaMessgger=Optional.ofNullable(record.value());
if (kafkaMessgger.isPresent()) {
String messger=kafkaMessgger.get();
try {
log.info("消費成功",messger);
} catch (Exception e) {
log.error("消费失败 offset:{},message:{}",record);
}
}
ack.acknowledge();
}
}
注解 @KafkaListener(topics={KafkaTopic.KAFKA_TEST})就是消费topic主题是KAFKA_TEST的消息。
ConsumerRecord<String, String> record中的record就是所接收的消息来的。打印出来的信息如下
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。