当前位置:   article > 正文

Kafka快速入门+SpringBoot简单的秒杀案例

Kafka快速入门+SpringBoot简单的秒杀案例

1. 主题相关

1.1 创建主题

kafka-topics.sh --create --bootstrap-server [服务器地址] --replication-factor [副本数] --partitions [分区数] --topic [主题名]

liber@liber-VMware-Virtual-Platform:/home/zookeeper$ docker-compose exec kafka /bin/bash #进入kafka容器
bash-5.1# kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic liber #创建一个主题名叫liber
Created topic liber. 

注:具有 1 个副本和 3 个分区     

        在 Kafka 中,分区是主题的子集,每个主题可以分为多个分区。每个分区都是一个独立的日志序列,可以被存储在集群中的不同服务器上。

       每个分区有一个领导者副本,负责处理所有读取和写入请求。领导者副本将写入的数据同步到其他副本。除了领导者副本外,其他副本称为追随者副本。它们从领导者那里复制数据,并不直接处理客户端的读写请求。

1.2 查询主题

kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic [主题名]

bash-5.1# kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic liber
Topic: liber    TopicId: tTzq8pWZTIekVoXT35QPWg PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: liber    Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: liber    Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: liber    Partition: 2    Leader: 3       Replicas: 3     Isr: 3
注:如果省略 --topic 参数,则列出所有主题的详细信息。

1.3 修改主题 

kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic [主题名] --partitions [新的分区数]

bash-5.1# kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic liber --partitions 5
bash-5.1# kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic liber
Topic: liber    TopicId: tTzq8pWZTIekVoXT35QPWg PartitionCount: 5       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: liber    Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: liber    Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: liber    Partition: 2    Leader: 3       Replicas: 3     Isr: 3
        Topic: liber    Partition: 3    Leader: 1       Replicas: 1     Isr: 1
        Topic: liber    Partition: 4    Leader: 2       Replicas: 2     Isr: 2

注:修改liber的分区数到 5

1.4 删除主题

kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic [主题名]

bash-5.1# kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic liber
bash-5.1# kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic liber
Error while executing topic command : Topic 'liber' does not exist as expected
[2024-07-22 02:16:33,325] ERROR java.lang.IllegalArgumentException: Topic 'liber' does not exist as expected
        at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:542)
        at kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:317)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:69)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)
 

2. 生产者

       在 Apache Kafka中,生产者(Producer)是负责将数据发送到指定Kafka主题(Topics)的客户端应用程序。生产者可以灵活地发送消息到一个或多个Kafka主题,支持各种发布模式和消息确认机制,以确保消息的可靠性和持久性。

       在 Apache Kafka 的上下文中,broker地址列表指 Kafka 集群中一组或多组 broker(服务器)的地址。这些地址用于初始化生产者(producers)、消费者(consumers)、以及其他客户端连接到Kafka集群的过程。

kafka-console-producer.sh --broker-list [broker地址列表] --topic [主题名]

bash-5.1# kafka-console-producer.sh --broker-list localhost:9092 --topic liber
>This is my first event
>This is my second event

注:Ctrl-C停止生产者客户端。

3. 消费者

       在 Apache Kafka中,消费者(Consumer)是从Kafka主题(Topics)中读取数据的客户端应用。消费者可以独立使用,或者作为一个消费者群组(Consumer Group)的一部分来运行。使用消费者群组可以有效地在多个消费者实例间分配主题的分区(Partitions),从而提升数据处理的并行性和效率。

kafka-console-consumer.sh --bootstrap-server [broker地址列表] --topic [主题名] [其他可选参数]
  • --from-beginning:如果加上这个参数,消费者将从主题的开始读取所有消息,而不是只读取新消息。
  • --group:指定消费者群组的ID,用于在多个消费者间共享主题的分区。

bash-5.1# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic liber --from-beginning
This is my first event
This is my second event

注:Ctrl-C停止消费者客户端。

4. 消费者组

 4.2 隐式创建组

kafka-console-consumer.sh --bootstrap-server [broker地址列表] --topic [主题名] --group [新的或现有的消费者组ID]

        消费者组的创建是隐式进行的,当一个或多个消费者客户端连接到 Kafka 并订阅主题时自动完成的。每个消费者在连接时会指定一个组ID,这个组ID在所有消费者中应该是一致的,以表示他们属于同一个消费者组。

bash-5.1# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic liber --group example_group #创建名为example_group的用户组
注:Ctrl-C停止等待。

 4.1 查询消费组(所有)

kafka-consumer-groups.sh --bootstrap-server [broker地址列表] --list

bash-5.1# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
example_group
KMOffsetCache-cmak

4.2 查询消费组(精确) 

kafka-consumer-groups.sh --bootstrap-server [broker地址列表] --describe --group [消费者组名]

bash-5.1# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group example_group

Consumer group 'example_group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
example_group   liber           0          1               1               0               -               -               -
example_group   liber           1          0               0               0               -               -               -
example_group   liber           2          1               1               0               -               -               -

bash-5.1# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group KMOffsetCache-cmak

GROUP              TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                        HOST            CLIENT-ID
KMOffsetCache-cmak __consumer_offsets 22         -               0               -               consumer-KMOffsetCache-cmak-1-3829d91b-249f-491c-8d69-446462d60d61 /192.168.186.77 consumer-KMOffsetCache-cmak-1
KMOffsetCache-cmak __consumer_offsets 30         -               0               -               consumer-KMOffsetCache-cmak-1-3829d91b-249f-491c-8d69-446462d60d61 /192.168.186.77 consumer-KMOffsetCache-cmak-1
KMOffsetCache-cmak __consumer_offsets 25         -               0               -               consumer-KMOffsetCache-cmak-1-3829d91b-249f-491c-8d69-446462d60d61 /192.168.186.77 consumer-KMOffsetCache-cmak-1
KMOffsetCache-cmak __consumer_offsets 35         -               0               -               consumer-KMOffsetCache-cmak-1-3829d91b-249f-491c-8d69-446462d60d61 /192.168.186.77 consumer-KMOffsetCache-cmak-1
KMOffsetCache-cmak __consumer_offsets 37         -               0               -               consumer-KMOffsetCache-cmak-1-3829d91b-249f-491c-8d69-446462d60d61 /192.168.186.77 consumer-KMOffsetCache-cmak-1
KMOffsetCache-cmak __consumer_offsets 38         -               0               -               consumer-KMOffsetCache-cmak-1-3829d91b-249f-491c-8d69-446462d60d61 /192.168.186.77 consumer-KMOffsetCache-cmak-1

4.3 删除消费组

kafka-consumer-groups.sh --bootstrap-server [broker地址列表] --delete --group [消费者组名]

bash-5.1# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group example_group
Deletion of requested consumer groups ('example_group') was successful.

5. 部分配置(参考)

  1. # Kafka Broker 的基本设置
  2. broker.id=1
  3. # 每个 Kafka broker 需要一个唯一的 ID。在 Kafka 集群中,每个节点都必须有不同的 ID。
  4. port=9092
  5. # Kafka 服务端监听的端口,客户端通过此端口与 Kafka 通信。
  6. num.network.threads=3
  7. # 处理网络请求的线程数,比如接受连接、接受请求、发送响应。调整此值以匹配你的服务器的网络I/O性能。
  8. num.io.threads=8
  9. # 服务器用于读写操作的线程数。这应该与你的磁盘数量相匹配,以平衡磁盘I/O负载。
  10. socket.send.buffer.bytes=102400
  11. socket.receive.buffer.bytes=102400
  12. # Socket 发送和接收缓冲区的大小。增加这些值可以提高网络性能,但会增加内存消耗。
  13. log.dirs=/tmp/kafka-logs
  14. # Kafka 存储消息和日志的目录。可以指定多个目录,Kafka 会平衡跨这些目录的数据。
  15. num.partitions=1
  16. # Kafka 创建新主题时默认的分区数。分区是并行处理的基础,更多的分区意味着更高的并发。
  17. # 数据保留策略
  18. log.retention.hours=168
  19. # Kafka 日志文件保留的最长时间,单位为小时。超过这个时间的日志文件将被自动删除。
  20. log.segment.bytes=1073741824
  21. # Kafka 日志段的大小。当日志文件达到这个大小时,会新建一个日志文件。
  22. log.retention.check.interval.ms=300000
  23. # Kafka 检查日志文件是否需要删除的频率,单位为毫秒。
  24. # 副本和同步
  25. default.replication.factor=1
  26. # 主题的默认副本数。副本数决定了数据的冗余程度和可用性。
  27. min.insync.replicas=1
  28. # 在认为生产请求成功之前,必须有这么多副本同步了数据。
  29. # ZooKeeper 配置
  30. zookeeper.connect=localhost:2181
  31. # Kafka 使用 ZooKeeper 来维护集群状态,如存储所有broker、主题等信息。此项配置ZooKeeper服务的连接信息。
  32. zookeeper.connection.timeout.ms=6000
  33. # 连接到 ZooKeeper 的超时时间,单位为毫秒。
  34. # 日志压缩和清理
  35. log.cleanup.policy=delete
  36. # 日志的清理策略。"delete" 根据时间或文件大小删除日志;"compact" 根据键合并日志。
  37. # 安全性设置
  38. listeners=PLAINTEXT://:9092
  39. # 定义 Kafka 服务的监听地址,支持 PLAINTEXT、SSL 等多种协议。
  40. # 高级SSL和SASL配置
  41. # ssl.keystore.location=/path/to/keystore.jks
  42. # ssl.keystore.password=your-keystore-pass
  43. # ssl.key.password=your-key-pass
  44. # sasl.enabled.mechanisms=PLAIN
  45. # sasl.mechanism.inter.broker.protocol=PLAIN
  46. # 配置 SSL 和 SASL,用于安全的客户端和 broker 之间的通信。

 参考文档

6. 简单案例(秒杀)

6.1 创建主题

bash-5.1# kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic product
Created topic product.
bash-5.1#  kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic product
Topic: product  TopicId: JdkFmgvOQlKBCCsCVDTo1Q PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: product  Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: product  Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: product  Partition: 2    Leader: 3       Replicas: 3     Isr: 3

6.2 项目结构

6.3 Maven依赖 

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>3.3.2</version>
  5. <relativePath/> <!-- lookup parent from repository -->
  6. </parent>
  7. <dependencies>
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-data-jpa</artifactId>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-web</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.springframework.kafka</groupId>
  18. <artifactId>spring-kafka</artifactId>
  19. </dependency>
  20. <dependency>
  21. <groupId>com.mysql</groupId>
  22. <artifactId>mysql-connector-j</artifactId>
  23. <version>8.3.0</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.projectlombok</groupId>
  27. <artifactId>lombok</artifactId>
  28. <optional>true</optional>
  29. </dependency>
  30. </dependencies>

6.4 数据库操作

  1. create database orders;
  2. use orders;
  3. CREATE TABLE products
  4. (
  5. product_id BIGINT AUTO_INCREMENT PRIMARY KEY,
  6. product_name VARCHAR(255) NOT NULL,
  7. price DECIMAL(10, 2) NOT NULL,
  8. stock INT NOT NULL,
  9. description TEXT,
  10. version INT NOT NULL DEFAULT 0
  11. );
  12. INSERT INTO products (products.product_id,product_name, price, stock, description)
  13. VALUES (1,'大白菜', 5.99, 200, '新鲜的大白菜,来自农民的直供'),
  14. (2,'红富士苹果', 3.50, 150, '甜美多汁的红富士苹果,一箱包含20个'),
  15. (3,'五花肉', 45.00, 100, '优质五花肉,适合各种烹饪方式'),
  16. (4,'东北大米', 60.00, 300, '东北粳米,粒粒香甜,适合日常食用'),
  17. (5,'速溶咖啡', 70.00, 80, '进口速溶咖啡,简单快捷,口味纯正');

6.5 application.yml

  1. spring:
  2. application:
  3. name: spring_kafka
  4. datasource:
  5. url: jdbc:mysql://localhost:3306/orders?useSSL=false&serverTimezone=UTC
  6. username: root
  7. password: 123456
  8. driver-class-name: com.mysql.cj.jdbc.Driver
  9. jpa:
  10. hibernate:
  11. ddl-auto: update
  12. show-sql: true
  13. open-in-view: false
  14. kafka:
  15. consumer:
  16. bootstrap-servers: 192.168.186.77:9092,192.168.186.18:9092,192.168.186.216:9092
  17. group-id: secKill-group
  18. auto-offset-reset: earliest
  19. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  20. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  21. producer:
  22. bootstrap-servers: 192.168.186.77:9092,192.168.186.18:9092,192.168.186.216:9092
  23. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  24. value-serializer: org.apache.kafka.common.serialization.StringSerializer

 6.6 SpringKafkaApplication.java

  1. package org.example;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class SpringKafkaApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(SpringKafkaApplication.class, args);
  8. }
  9. }

6.7 Product.java

  1. package org.example.entity;
  2. import jakarta.persistence.*;
  3. import lombok.Getter;
  4. import lombok.Setter;
  5. import java.math.BigDecimal;
  6. @Getter
  7. @Setter
  8. @Entity
  9. @Table(name = "products")
  10. public class Product {
  11. @Id
  12. @Column(name = "product_id", nullable = false)
  13. private Long id;
  14. @Column(name = "product_name", nullable = false)
  15. private String productName;
  16. @Column(name = "price", nullable = false, precision = 10, scale = 2)
  17. private BigDecimal price;
  18. @Column(name = "stock", nullable = false)
  19. private Integer stock;
  20. @Lob
  21. @Column(name = "description")
  22. private String description;
  23. @Version
  24. private int version; // 乐观锁字段
  25. }

6.8 ProductRepository.java

  1. package org.example.repository;
  2. import org.example.entity.Product;
  3. import org.springframework.data.jpa.repository.JpaRepository;
  4. import org.springframework.stereotype.Repository;
  5. @Repository
  6. public interface ProductRepository extends JpaRepository<Product,Long> {
  7. }

6.9 ProductService.java

  1. package org.example.service;
  2. import org.example.entity.Product;
  3. import org.example.repository.ProductRepository;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. import org.springframework.transaction.annotation.Transactional;
  7. @Service
  8. public class ProductService {
  9. @Autowired
  10. private ProductRepository productRepository;
  11. @Transactional
  12. //检查是否还有库存
  13. public boolean attemptPurchase(Long productId, int quantity) {
  14. Product product = productRepository.findById(productId).orElse(null);
  15. if (product != null && product.getStock() >= quantity) {
  16. product.setStock(product.getStock() - quantity);
  17. productRepository.save(product);
  18. return true;
  19. }
  20. return false;
  21. }
  22. //获取全部产品
  23. public Product getProduct(Long productId) {
  24. return productRepository.findById(productId).orElse(null);
  25. }
  26. }

6.10 KafkaMessageService.java

  1. package org.example.service;
  2. import org.example.entity.Product;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.stereotype.Service;
  7. @Service
  8. public class KafkaMessageService {
  9. @Autowired
  10. private KafkaTemplate<String, String> kafkaTemplate;
  11. @Autowired
  12. private ProductService productService;
  13. // 将秒杀请求发送到 Kafka
  14. public Object sendKill(String topic, String productId) {
  15. kafkaTemplate.send(topic, productId);
  16. Product product = productService.getProduct(Long.valueOf(productId));
  17. return product;
  18. }
  19. @KafkaListener(topics = "product", groupId = "secKill-group")
  20. public void receiveKillRequest(String productId) {
  21. boolean success = productService.attemptPurchase(Long.parseLong(productId), 1);
  22. if (success) {
  23. System.out.println("秒杀成功!剩余库存:"+productService.getProduct(Long.valueOf(productId)).getStock());
  24. } else {
  25. System.out.println("秒杀失败!库存不足...")
  26. ;
  27. }
  28. }
  29. }

6.11 killController.java

  1. package org.example.controller;
  2. import org.example.service.KafkaMessageService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.http.ResponseEntity;
  5. import org.springframework.web.bind.annotation.*;
  6. @RestController
  7. @RequestMapping("/kill")
  8. public class killController {
  9. @Autowired
  10. private KafkaMessageService kafkaMessageService;
  11. @GetMapping("/{productId}")
  12. public ResponseEntity<?> initiateSeckill(@PathVariable String productId) {
  13. Object o = kafkaMessageService.sendKill("product", productId);
  14. return ResponseEntity.ok().body(o);
  15. }
  16. }

6.12 项目测试

 6.12.1 网页预览

6.12.2 模拟秒杀

6.12.3 秒杀结果 

7. 总结 

        通过命令行实现kafka的快速入门,并实现简单的秒杀案例,仅供学习参考。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/889622
推荐阅读
相关标签
  

闽ICP备14008679号