当前位置:   article > 正文

Kafka 开飙了!5分钟,带你体验一把“速度与激情”

Kafka 开飙了!5分钟,带你体验一把“速度与激情”

前置条件:你的电脑已经安装 Docker

主要内容:

  1. 使用 Docker 安装
  2. 使用命令行测试消息队列的功能
  3. zookeeper和kafka可视化管理工具
  4. Java 程序中简单使用Kafka

使用 Docker 安装搭建Kafka环境

单机版

下面使用的单机版的Kafka 来作为演示,推荐先搭建单机版的Kafka来学习。

以下使用 Docker 搭建Kafka基本环境来自开源项目:github.com/simplesteph… 。当然,你也可以按照官方提供的来:github.com/wurstmeiste… 。

新建一个名为
zk-single-kafka-single.yml 的文件,文件内容如下:

  1. version: '2.1'
  2. services:
  3. zoo1:
  4. image: zookeeper:3.4.9
  5. hostname: zoo1
  6. ports:
  7. - "2181:2181"
  8. environment:
  9. ZOO_MY_ID: 1
  10. ZOO_PORT: 2181
  11. ZOO_SERVERS: server.1=zoo1:2888:3888
  12. volumes:
  13. - ./zk-single-kafka-single/zoo1/data:/data
  14. - ./zk-single-kafka-single/zoo1/datalog:/datalog
  15. kafka1:
  16. image: confluentinc/cp-kafka:5.3.1
  17. hostname: kafka1
  18. ports:
  19. - "9092:9092"
  20. environment:
  21. KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
  22. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
  23. KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
  24. KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
  25. KAFKA_BROKER_ID: 1
  26. KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
  27. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  28. volumes:
  29. - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
  30. depends_on:
  31. - zoo1

运行以下命令即可完成环境搭建(会自动下载并运行一个 zookeeper 和 kafka )

docker-compose -f zk-single-kafka-single.yml up

如果需要停止Kafka相关容器的话,运行以下命令即可:

docker-compose -f zk-single-kafka-single.yml down

集群版

以下使用 Docker 搭建Kafka基本环境来自开源项目:github.com/simplesteph… 。

新建一个名为
zk-single-kafka-multiple.yml 的文件,文件内容如下:

  1. version: '2.1'
  2. services:
  3. zoo1:
  4. image: zookeeper:3.4.9
  5. hostname: zoo1
  6. ports:
  7. - "2181:2181"
  8. environment:
  9. ZOO_MY_ID: 1
  10. ZOO_PORT: 2181
  11. ZOO_SERVERS: server.1=zoo1:2888:3888
  12. volumes:
  13. - ./zk-single-kafka-multiple/zoo1/data:/data
  14. - ./zk-single-kafka-multiple/zoo1/datalog:/datalog
  15. kafka1:
  16. image: confluentinc/cp-kafka:5.4.0
  17. hostname: kafka1
  18. ports:
  19. - "9092:9092"
  20. environment:
  21. KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
  22. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
  23. KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
  24. KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
  25. KAFKA_BROKER_ID: 1
  26. KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
  27. volumes:
  28. - ./zk-single-kafka-multiple/kafka1/data:/var/lib/kafka/data
  29. depends_on:
  30. - zoo1
  31. kafka2:
  32. image: confluentinc/cp-kafka:5.4.0
  33. hostname: kafka2
  34. ports:
  35. - "9093:9093"
  36. environment:
  37. KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
  38. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
  39. KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
  40. KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
  41. KAFKA_BROKER_ID: 2
  42. KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
  43. volumes:
  44. - ./zk-single-kafka-multiple/kafka2/data:/var/lib/kafka/data
  45. depends_on:
  46. - zoo1
  47. kafka3:
  48. image: confluentinc/cp-kafka:5.4.0
  49. hostname: kafka3
  50. ports:
  51. - "9094:9094"
  52. environment:
  53. KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
  54. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
  55. KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
  56. KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
  57. KAFKA_BROKER_ID: 3
  58. KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
  59. volumes:
  60. - ./zk-single-kafka-multiple/kafka3/data:/var/lib/kafka/data
  61. depends_on:
  62. - zoo1

运行以下命令即可完成 1个节点 Zookeeper+3个节点的 Kafka 的环境搭建。

docker-compose -f zk-single-kafka-multiple.yml up

如果需要停止Kafka相关容器的话,运行以下命令即可:

docker-compose -f zk-single-kafka-multiple.yml down

使用命令行测试消息的生产和消费

一般情况下我们很少会用到 Kafka 的命令行操作。

1.进入 Kafka container 内部执行 Kafka 官方自带了一些命令

docker exec -ti docker_kafka1_1 bash

2.列出所有 Topic

root@kafka1:/# kafka-topics --describe --zookeeper zoo1:2181

3.创建一个 Topic

  1. root@kafka1:/# kafka-topics --create --topic test --partitions 3 --zookeeper zoo1:2181 --replication-factor 1
  2. Created topic test.

我们创建了一个名为 test 的 Topic, partition 数为 3, replica 数为 1。

4.消费者订阅主题

  1. root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
  2. send hello from console -producer

我们订阅了 名为 test 的 Topic。

5.生产者向 Topic 发送消息

  1. root@kafka1:/# kafka-console-producer --broker-list localhost:9092 --topic test
  2. >send hello from console -producer
  3. >

我们使用 kafka-console-producer 命令向名为 test 的 Topic 发送了一条消息,消息内容为:“send hello from console -producer”

这个时候,你会发现消费者成功接收到了消息:

  1. root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
  2. send hello from console -producer

IDEA相关插件推荐

Zoolytic-Zookeeper tool

这是一款 IDEA 提供的 Zookeeper 可视化工具插件,非常好用! 我们可以通过它:

  1. 可视化ZkNodes节点信息
  2. ZkNodes节点管理-添加/删除
  3. 编辑zkNodes数据
  4. ......实际使用效果如下:

 

使用方法:

  1. 打开工具:View->Tool windows->Zoolytic;
  2. 点击 “+” 号后在弹出框数据:“127.0.0.1:2181” 连接 zookeeper;
  3. 连接之后点击新创建的连接然后点击“+”号旁边的刷新按钮即可!

Kafkalytic

IDEA 提供的 Kafka 可视化管理插件。这个插件为我们提供了下面这写功能:

  1. 多个集群支持
  2. 主题管理:创建/删除/更改分区
  3. 使用正则表达式搜索主题
  4. 发布字符串/字节序列化的消息
  5. 使用不同的策略消费消息

实际使用效果如下:

使用方法:

  1. 打开工具:View->Tool windows->kafkalytic;
  2. 点击 “+” 号后在弹出框数据:“127.0.0.1:9092” 连接;

Java 程序中简单使用Kafka

代码地址:github.com/Snailclimb/…

Step 1:新建一个Maven项目

Step2: pom.xml 中添加相关依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.2.0</version>
  5. </dependency>

Step 3:初始化消费者和生产者

KafkaConstants常量类中定义了Kafka一些常用配置常量。

  1. public class KafkaConstants {
  2. public static final String BROKER_LIST = "localhost:9092";
  3. public static final String CLIENT_ID = "client1";
  4. public static String GROUP_ID_CONFIG="consumerGroup1";
  5. private KafkaConstants() {
  6. }
  7. }

ProducerCreator 中有一个 createProducer() 方法方法用于返回一个 KafkaProducer对象

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.Producer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import java.util.Properties;
  6. /**
  7. * @author shuang.kou
  8. */
  9. public class ProducerCreator {
  10. public static Producer<String, String> createProducer() {
  11. Properties properties = new Properties();
  12. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);
  13. properties.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
  14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  15. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  16. return new KafkaProducer<>(properties);
  17. }
  18. }

ConsumerCreator 中有一个createConsumer() 方法方法用于返回一个 KafkaConsumer 对象

  1. import org.apache.kafka.clients.consumer.Consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import java.util.Properties;
  6. public class ConsumerCreator {
  7. public static Consumer<String, String> createConsumer() {
  8. Properties properties = new Properties();
  9. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);
  10. properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID_CONFIG);
  11. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  12. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  13. return new KafkaConsumer<>(properties);
  14. }
  15. }

Step 4:发送和消费消息

生产者发送消息:

  1. private static final String TOPIC = "test-topic";
  2. Producer<String, String> producer = ProducerCreator.createProducer();
  3. ProducerRecord<String, String> record =
  4. new ProducerRecord<>(TOPIC, "hello, Kafka!");
  5. try {
  6. //send message
  7. RecordMetadata metadata = producer.send(record).get();
  8. System.out.println("Record sent to partition " + metadata.partition()
  9. + " with offset " + metadata.offset());
  10. } catch (ExecutionException | InterruptedException e) {
  11. System.out.println("Error in sending record");
  12. e.printStackTrace();
  13. }
  14. producer.close();

消费者消费消息:

  1. Consumer<String, String> consumer = ConsumerCreator.createConsumer();
  2. // 循环消费消息
  3. while (true) {
  4. //subscribe topic and consume message
  5. consumer.subscribe(Collections.singletonList(TOPIC));
  6. ConsumerRecords<String, String> consumerRecords =
  7. consumer.poll(Duration.ofMillis(1000));
  8. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  9. System.out.println("Consumer consume message:" + consumerRecord.value());
  10. }
  11. }

Step 5:测试

运行程序控制台打印出:

  1. Record sent to partition 0 with offset 20
  2. Consumer consume message:hello, Kafka!

开源项目推荐

作者的其他开源项目推荐:

  1. JavaGuide:【Java学习+面试指南】 一份涵盖大部分Java程序员所需要掌握的核心知识。
  2. springboot-guide : 适合新手入门以及有经验的开发人员查阅的 Spring Boot 教程(业余时间维护中,欢迎一起维护)。
  3. programmer-advancement : 我觉得技术人员应该有的一些好习惯!
  4. spring-security-jwt-guide :从零入门 !Spring Security With JWT(含权限验证)后端部分代码。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/757386
推荐阅读
相关标签
  

闽ICP备14008679号