当前位置:   article > 正文

kafka安装和使用的入门教程_kafka下载安装

kafka下载安装

这篇文章简单介绍如何在ubuntu上安装kafka,并使用kafka完成消息的发送和接收。

一、安装kafka

访问kafka官网Apache Kafka,然后点击快速开始

紧接着,点击Download

最后点击下载链接下载安装包

如果下载缓慢,博主已经把安装包上传到百度网盘:

链接:https://pan.baidu.com/s/1nZ1duIt64ZVUsimaQ1meZA?pwd=3aoh
提取码:3aoh
--来自百度网盘超级会员V3的分享

二、启动kafka

经过上一步下载完成后,按照页面的提示启动kafka

1、通过远程连接工具,如finalshell、xshell上传kafka_2.13-3.6.0.tgz到服务器上的usr目录

2、切换到usr目录,解压kafka_2.13-3.6.0.tgz

  1. cd /usr
  2. tar -zxzf kafka_2.13-3.6.0.tgz

3、启动zookeeper

修改配置文件confg/zookeeper.properties,修改一下数据目录

dataDir=/usr/local/zookeeper

然后通过以下命令启动kafka自带的zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

4、启动kafka

修改配置文件confg/server.properties,修改一下kafka保存日志的目录

log.dirs=/usr/local/kafka/logs

然后新开一个连接窗口,通过以下命令启动kafka

bin/kafka-server-start.sh config/server.properties

三、kafka发送、接收消息

创建topic

bin/kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092

生产消息

往刚刚创建的topic里发送消息,可以一次性发送多条消息,点击Ctrl+C完成发送

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello

消费消息

消费最新的消息

新开一个连接窗口,在命令行输入以下命令拉取topic为hello上的消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

消费之前的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic hello

指定偏移量消费

 指定从第几条消息开始消费,这里--offset参数设置的偏移量是从0开始的。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 1 --topic hello

消息的分组消费
每个消费者都可以指定一个消费者组, kafka 中的同一条消息,只能被同一个消费者组下的某一个消费 者消费。而不属于同一个消费者组的其他消费者,也可以消费到这一条消息。
通过以下命令在启动消费者时设置分组:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=helloGroup --topic hello

四、Java中使用kafka

通过maven官网搜索kafka的maven依赖版本

https://central.sonatype.com/search?q=kafkaicon-default.png?t=N7T8https://central.sonatype.com/search?q=kafka然后通过IntelliJ IDEA创建一个maven项目kafka,在pom.xml中添加kafka的依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>kafka</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>org.apache.kafka</groupId>
  16. <artifactId>kafka_2.12</artifactId>
  17. <version>3.6.0</version>
  18. </dependency>
  19. </dependencies>
  20. </project>

创建消息生产者

生产者工厂类
  1. package producer;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.Producer;
  4. import org.apache.kafka.clients.producer.ProducerConfig;
  5. import java.util.Properties;
  6. /**
  7. * 消息生产者工厂类
  8. * @author heyunlin
  9. * @version 1.0
  10. */
  11. public class MessageProducerFactory {
  12. private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";
  13. public static Producer<String, String> getProducer() {
  14. //PART1:设置发送者相关属性
  15. Properties props = new Properties();
  16. // 此处配置的是kafka的端口
  17. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  18. // 配置key的序列化类
  19. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  20. // 配置value的序列化类
  21. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  22. return new KafkaProducer<>(props);
  23. }
  24. }

测试发送消息
  1. package producer;
  2. import org.apache.kafka.clients.producer.Callback;
  3. import org.apache.kafka.clients.producer.Producer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.clients.producer.RecordMetadata;
  6. /**
  7. * @author heyunlin
  8. * @version 1.0
  9. */
  10. public class MessageProducer {
  11. private static final String TOPIC = "hello";
  12. public static void main(String[] args) {
  13. ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "1", "Message From Producer.");
  14. Producer<String, String> producer = MessageProducerFactory.getProducer();
  15. // 同步发送消息
  16. producer.send(record);
  17. // 异步发送消息
  18. producer.send(record, new Callback() {
  19. @Override
  20. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  21. String topic = recordMetadata.topic();
  22. long offset = recordMetadata.offset();
  23. int partition = recordMetadata.partition();
  24. String message = recordMetadata.toString();
  25. System.out.println("topic = " + topic);
  26. System.out.println("offset = " + offset);
  27. System.out.println("message = " + message);
  28. System.out.println("partition = " + partition);
  29. }
  30. });
  31. // 加上这行代码才会发送消息
  32. producer.close();
  33. }
  34. }

创建消息消费者

消费者工厂类
  1. package consumer;
  2. import org.apache.kafka.clients.consumer.Consumer;
  3. import org.apache.kafka.clients.consumer.ConsumerConfig;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.util.Properties;
  6. /**
  7. * 消息生产者工厂类
  8. * @author heyunlin
  9. * @version 1.0
  10. */
  11. public class MessageConsumerFactory {
  12. private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";
  13. public static Consumer<String, String> getConsumer() {
  14. //PART1:设置发送者相关属性
  15. Properties props = new Properties();
  16. //kafka地址
  17. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  18. //每个消费者要指定一个group
  19. props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloGroup");
  20. //key序列化类
  21. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  22. //value序列化类
  23. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  24. return new KafkaConsumer<>(props);
  25. }
  26. }

测试消费消息
  1. package consumer;
  2. import org.apache.kafka.clients.consumer.Consumer;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import java.time.Duration;
  6. import java.util.Collections;
  7. /**
  8. * @author heyunlin
  9. * @version 1.0
  10. */
  11. public class MessageConsumer {
  12. private static final String TOPIC = "hello";
  13. public static void main(String[] args) {
  14. Consumer<String, String> consumer = MessageConsumerFactory.getConsumer();
  15. consumer.subscribe(Collections.singletonList(TOPIC));
  16. while (true) {
  17. ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
  18. for (ConsumerRecord<String, String> record : records) {
  19. System.out.println(record.key() + ": " + record.value());
  20. }
  21. // 提交偏移量,避免消息重复推送
  22. consumer.commitSync(); // 同步提交
  23. // consumer.commitAsync(); // 异步提交
  24. }
  25. }
  26. }

五、springboot整合kafka

开始前的准备工作

然后通过IntelliJ IDEA创建一个springboot项目springboot-kafka,在pom.xml中添加kafka的依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

然后修改application.yml,添加kafka相关配置

  1. spring:
  2. kafka:
  3. bootstrap-servers: 192.168.254.128:9092
  4. producer:
  5. acks: 1
  6. retries: 3
  7. batch-size: 16384
  8. properties:
  9. linger:
  10. ms: 0
  11. buffer-memory: 33554432
  12. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  13. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  14. consumer:
  15. group-id: helloGroup
  16. enable-auto-commit: false
  17. auto-commit-interval: 1000
  18. auto-offset-reset: latest
  19. properties:
  20. request:
  21. timeout:
  22. ms: 18000
  23. session:
  24. timeout:
  25. ms: 12000
  26. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  27. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

创建消息生产者

  1. package com.example.springboot.kafka.producer;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RequestMethod;
  6. import org.springframework.web.bind.annotation.RestController;
  7. /**
  8. * @author heyunlin
  9. * @version 1.0
  10. */
  11. @RestController
  12. @RequestMapping(path = "/producer", produces = "application/json;charset=utf-8")
  13. public class KafkaProducer {
  14. private final KafkaTemplate<String, Object> kafkaTemplate;
  15. @Autowired
  16. public KafkaProducer(KafkaTemplate<String, Object> kafkaTemplate) {
  17. this.kafkaTemplate = kafkaTemplate;
  18. }
  19. @RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
  20. public String sendMessage(String message) {
  21. kafkaTemplate.send("hello", message);
  22. return "发送成功~";
  23. }
  24. }

创建消息消费者

  1. package com.example.springboot.kafka.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author heyunlin
  7. * @version 1.0
  8. */
  9. @Component
  10. public class KafkaConsumer {
  11. @KafkaListener(topics = "hello")
  12. public void receiveMessage(ConsumerRecord<String, String> record) {
  13. String topic = record.topic();
  14. long offset = record.offset();
  15. int partition = record.partition();
  16. System.out.println("topic = " + topic);
  17. System.out.println("offset = " + offset);
  18. System.out.println("partition = " + partition);
  19. }
  20. }

然后访问网址http://localhost:8080/producer/sendMessage?message=hello往topic为hello的消息队列发送消息。控制台打印了参数,成功监听到发送的消息。

 

文章涉及的项目已经上传到gitee,按需获取~

Java中操作kafka的基本项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/kafka.git

springboot整合kafka案例项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/springboot-kafka.git

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/419104
推荐阅读
相关标签
  

闽ICP备14008679号