当前位置:   article > 正文

本地安装kafka和使用

本地安装kafka和使用

一、下载安装包,我用的是kafka_2.12-2.3.0 百度盘提取https://pan.baidu.com/s/1G8sp9Y5vkIGraZ7vFS1OSw,提取码:15h2。

二、安装

1、解压到:D:\Tools\kafka_2.12-2.3.0 ,其实解压到哪里,依据你的习惯吧,解压位置随便。

2、修改配置文件:D:\Tools\kafka_2.12-2.3.0\config\server.properties 

log.dirs=D:\Tools\kafka_2.12-2.3.0\kafka-logs;

解压之后的server.properties 是这样的,

  我们要加上面两个配置,如果不加上的话,就会出现:Connection to node 0 could not be established. Broker may not be available.的错误。

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092

3、启动

(1)启动zookeeper,这个在kafka包下已经有的了,不需要再安装了。在目录D:\Tools\kafka_2.12-2.3.0\bin\windows下的地址栏中直接输入cmd回车就可进入命令窗口。

输入:zookeeper-server-start.bat ..\..\config\zookeeper.properties

(2)启动kafka,和上面进入某目录命令窗口操作一样。目录:E:\softinstall\kafka_2.12-2.2.1\bin\windows      执行:  kafka-server-start.bat ..\..\config\server.properties

(3)建pp主题,目录:E:\softinstall\kafka_2.12-2.2.1\bin\windows   执行:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pp

(4)建消费者,目录:E:\softinstall\kafka_2.12-2.2.1\bin\windows   执行:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic pp --from-beginning

(5)建生产者,E:\softinstall\kafka_2.12-2.2.1\bin\windows    执行:kafka-console-producer.bat --broker-list localhost:9092 --topic  pp

(6)然后在窗口5生产者中输入hello可以在窗口4消费者中看到hello字符。

在springboot中使用KafkaTemplate。

一,在pom.xml中加入kafka包
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

二、properties文件

# =============== Kafka配置 Begin ===============
# 集群地址列表
spring.kafka.bootstrap-servers=192.168.11.152:9092(我本地的kafka地址来的)
# 连接超时时间
spring.kafka.session.timeout.ms=1000
#请求失败重试次数
spring.kafka.retries=5
#spring.kafka.batch.size=16384
spring.linger.ms=1
spring.kafka.auto-offset-reset=earliest

# 开启手动 ack
spring.kafka.listener.ack-mode=manual

#生产者设置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#重试次数.
spring.kafka.producer.retries=10
#只需要 leader 许可 
spring.kafka.producer.acks=1

spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432


#消费组设置
#指定默认消费者group id
spring.kafka.consumer.group-id=tradeGroup
spring.kafka.consumer.max-poll-records=3000
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# =============== kafka配置 End ===============

三、发送消息

@Component
public class KafkaProductService {

    @Autowired 
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public boolean send(UserModel userModel) {
         kafkaTemplate.send(KafkaTopic.KAFKA_TEST, JsonUtil.bean2String(userModel));
         return true;
    }

}

直接使用KafkaTemplate类的send方法进行消息的发送。参数就是topic主题和所发送的消息。

四、接收消息

@Component
@Slf4j
public class KafkaConsumerService {

    @KafkaListener(topics={KafkaTopic.KAFKA_TEST})
    public void receiverDataListener(ConsumerRecord<String, String> record ,Acknowledgment ack){
        Optional<String> kafkaMessgger=Optional.ofNullable(record.value());
        if (kafkaMessgger.isPresent()) {
            String messger=kafkaMessgger.get();
            try {
                log.info("消費成功",messger);
            } catch (Exception e) {
                log.error("消费失败 offset:{},message:{}",record);
            }
        }
        ack.acknowledge();
    }
}
注解 @KafkaListener(topics={KafkaTopic.KAFKA_TEST})就是消费topic主题是KAFKA_TEST的消息。

ConsumerRecord<String, String> record中的record就是所接收的消息来的。打印出来的信息如下

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/575509
推荐阅读
相关标签
  

闽ICP备14008679号