当前位置:   article > 正文

kafka简单搭建和基本使用介绍_kafka-map

kafka-map

使用场景 处理大规模的消息,大数据,事件采集,日志收集等,不过使用延迟消息比较麻烦对比其他的消息队列的话。高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition。每个消费组 对分区进行消费

- 可扩展性:kafka集群支持热扩展

- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

- 高并发:支持数千个客户端同时读写

基本概念

1、消费者:(Consumer):主动从Broker拉数据,从而消费这些已发布的消息

2、生产者:(Producer)  :向broker发布消息的应用程序

3、AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于kafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例

4、话题(Topic):是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名;

5、分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。

特征

  1. kafka支持消息持久化
  2. 消费端是主动拉取数据,消费状态和订阅关系由客户端负责维护
  3. 消息消费完后,不会立即删除,会保留历史消息
  4. topic中所有的分区的消费顺序是随机的但是,在单个分区内的消费顺序是固定的

使用kafka中如何保证消息的有序性

目前的做法都是保证发送的消息发送到同一个partition,这里顺序消费又涉及到consumer单线程和多线程消费的情况。

在单线程的情况下

同一个topic,同一个分区:

Kafka的消息在分区内是严格有序的。也就是说我们可以把同一笔订单的所有消息,按照生成的顺序一个个发送到同一个topic的同一个分区。那么consumer就能顺序的消费到同一笔订单的消息。但是这种情况下提升不了系统的吞吐量,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。

在多线程的情况下

写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性

要保证顺序其实就是在消费者获取到消息的时候在进行一次类似kafka的分区操作,发送到不同的队列,然后我们开启多个线程去消费对应队列里面的数据。

 

使用kafka中如何保证不丢数据

Producer

使用异步发送或者同步发送 获取消息发送结果获取,加入消息发送失败重试机制

1)同步模式:有3种状态保证消息被安全生产,但是在配置为1(只保证写入leader成功)的话,如果刚好leader partition挂了,数据就会丢失。

确认机制设置为-1,也就是让消息写入leader和所有的副本。

2)异步模式:当缓冲区满了,如果配置为0(还没有收到确认的情况下,缓冲池一满,就清空缓冲池里的消息),数据就会被立即丢弃掉。如果消息发出去了,但还没有收到确认的时候,缓冲池满了,在配置文件中设置成不限制阻塞超时的时间,也就说让生产端一直阻塞,这样也能保证数据不会丢失。

broker端

acks设置为0:broker接收消息立即返回,当消息还没写入磁盘宕机时,容易丢失数据

acks设置为1:等待broker的ack,如果leader落盘了就返回ack,如果follower同步完成前leader挂了就会丢失未同步的数据

acks设置为-1:等待所有leader和follower都落盘后返回ack,如果follower已同步,但是broker返回ack前leader挂了,则会重复发送消息。

Consumer端

可以优化成手动提交offset。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。可以通过enable.auto.commit设置为false,关闭闭自动提交 offset,每次在真正消费完消息之后之后再自己手动提交 offset 。
 

使用kafka中如何保证不重复消费

生产者端的话 

幂等可以保证单个生产者向同一个集群中同一个topic投递的数据发送的消息,不会丢失,而且不会重复。

为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。

PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。
Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number

Kafka 从 0.11.0 开始可以配置

enable.idempotence=true

主要是消费端的幂等

消费者端的话

首先Kafka Broker上存储的消息,都有一个Offset标记。

然后kafka的消费者是通过offSet标记来维护当前已经消费的数据,

每消费一批数据,Kafka Broker就会更新OffSet的值,避免重复消费。

默认情况下,消息消费完以后,会自动提交Offset的值,避免重复消费。

Kafka消费端的自动提交逻辑有一个默认的5秒间隔,也就是说在5秒之后的下一次向Broker拉取消息的时候提交。

所以在Consumer消费的过程中,应用程序被强制kill掉或者宕机,可能会导致Offset没提交,从而产生重复提交的问题。

除此之外,还有另外一种情况也会出现重复消费。

在Kafka里面有一个Partition Balance机制,就是把多个Partition均衡的分配给多个消费者。

Consumer端会从分配的Partition里面去消费消息,如果Consumer在默认的5分钟内没办法处理完这一批消息。

就会触发Kafka的Rebalance机制,从而导致Offset自动提交失败。

而在重新Rebalance之后,Consumer还是会从之前没提交的Offset位置开始消费,也会导致消息重复消费的问题。
 

重复消费实现可以自己做幂等操作,建立去重表等。

使用docker安装一下kafka 因为kafka需要依赖zookeeper

使用docker-compose运行一个只有一个ZooKeeper node和一个Kafka broker的开发环境

使用一下网上的yml文件

  1. version: '2'
  2. services:
  3. zoo1:
  4. image: wurstmeister/zookeeper
  5. restart: unless-stopped
  6. hostname: zoo1
  7. ports:
  8. - "2181:2181"
  9. container_name: zookeeper
  10. # kafka version: 1.1.0
  11. # scala version: 2.12
  12. kafka1:
  13. image: wurstmeister/kafka
  14. ports:
  15. - "9092:9092"
  16. environment:
  17. KAFKA_ADVERTISED_HOST_NAME: 10.225.137.105 //ip地址
  18. KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
  19. KAFKA_BROKER_ID: 1
  20. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  21. #KAFKA_CREATE_TOPICS: "test1_topic,test2_topic"
  22. depends_on:
  23. - zoo1
  24. container_name: kafka

启动容器
在docker-compose.yml所在的目录执行以下命令:

docker-compose up -d

docker ps -a 可以查看到容器已经启动

 再安装一下kafka的map管理web页面

docker run -d -p 8089:8080 -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin     --name kafka-map dushixiang/kafka-map:latest

kafka-map: 一个美观简洁且强大的kafka web管理工具。

进入kafka容器

docker exec -it kafka /bin/bash

创建topic

$KAFKA_HOME/bin/kafka-topics.sh --create --topic ly --zookeeper zoo1:2181 --replication-factor 1 --partitions 1

在map页面查看一下

 进入kafka容器进行手动脚本发送消息测试

  1. $KAFKA_HOME/bin/kafka-console-producer.sh --topic=ly --broker-list kafka1:9092
  2. 1
  3. 1111
  4. 2222
  5. 3333

查看一下map页面topic的消息是否发送成功

 手动脚本消费一下从offset等于0开始消费

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic ly

  按offset消费

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --partition 0 --offset 3 --topic ly

 kafka消费过的数据不会立马删除

kafka删除数据有两种方式
  • 按照时间,超过一段时间后删除过期消息
  • 按照消息大小,消息数量超过一定大小后删除最旧的数据 kafka删除数据的最小单位:segment

下面使用代码进行简单的kafka使用

php版 使用compose 安装

  1. # 安装 kafka-php
  2. composer require nmred/kafka-php

pruducer.php

  1. <?php
  2. /*
  3. * @Author: yue
  4. * @Date: 2022/7/1 9:52
  5. * @LastEditTime: 2022/7/1 9:52
  6. * @LastEditors: yue
  7. * @Description: 生产者
  8. */
  9. require_once '../vendor/autoload.php';
  10. /* 创建一个配置实例 */
  11. $config = \Kafka\ProducerConfig::getInstance();
  12. /* Topic的元信息刷新的间隔 */
  13. $config->setMetadataRefreshIntervalMs(10000);
  14. /* 设置broker的地址 */
  15. $config->setMetadataBrokerList('10.225.137.105:9092');
  16. /* 设置broker的代理版本 */
  17. $config->setBrokerVersion('1.0.0');
  18. /* 只需要leader确认消息 */
  19. $config->setRequiredAck(1);
  20. /* 选择异步 */
  21. $config->setIsAsyn(false);
  22. /*500毫秒发送消息 */
  23. $config->setProduceInterval(500);
  24. /* 创建一个生产者实例 */
  25. $producer = new \Kafka\Producer();
  26. for ($i = 0; $i < 100; $i++) {
  27. $producer->send([
  28. [
  29. 'topic' => 'code_demo',
  30. 'value' => 'test' . $i,
  31. ],
  32. ]);
  33. }

进入kafka-map查看一下是否发送成功

 consumer.php

  1. <?php
  2. /*
  3. * @Author: yue
  4. * @Date: 2022/7/1 10:06
  5. * @LastEditTime: 2022/7/1 10:06
  6. * @LastEditors: yue
  7. * @Description: 消费者
  8. */
  9. require_once '../vendor/autoload.php';
  10. $config = \Kafka\ConsumerConfig::getInstance();
  11. $config->setMetadataRefreshIntervalMs(10000);
  12. $config->setMetadataBrokerList('10.225.137.105:9092');
  13. $config->setGroupId('test');
  14. $config->setBrokerVersion('1.0.0');
  15. $config->setTopics(['code_demo']);
  16. $consumer = new \Kafka\Consumer();
  17. $consumer->start(function ($topic, $part, $message) {
  18. var_dump($topic,$part,$message);
  19. });

 可以看到已经消费成功了

go版

用kafka-map让分区扩个容 增加到2个分区

producer.go  使用 sarama第三方库

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/Shopify/sarama"
  5. )
  6. func main() {
  7. config := sarama.NewConfig()
  8. config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
  9. config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
  10. config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
  11. // 连接kafka
  12. client, err := sarama.NewSyncProducer([]string{"10.225.137.105:9092"}, config)
  13. if err != nil {
  14. fmt.Println("producer closed, err:", err)
  15. return
  16. }
  17. defer client.Close()
  18. for i := 100; i < 110; i++ {
  19. //创建消息
  20. var s string
  21. msg := &sarama.ProducerMessage{}
  22. msg.Topic = "code_demo"
  23. s = "test" +fmt.Sprintf("%d",i) + "go"
  24. msg.Value = sarama.StringEncoder(s)
  25. //发送消息
  26. pid, offset, err := client.SendMessage(msg)
  27. if err != nil {
  28. fmt.Println("send message failed,", err)
  29. return
  30. }
  31. fmt.Printf("pid:%v offset:%v\n", pid, offset)
  32. }
  33. }

执行一下

可以看到有些进入分区0 分区1 

进入kafka-map查看一下是否发送成功

分区0

分区1

 

 

发送消息成功

  consumer.go  需要遍历所有分区 每个分区创建一个消费者实现消费  

 

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/Shopify/sarama"
  5. "sync"
  6. )
  7. var wg sync.WaitGroup
  8. func main() {
  9. consumer, err := sarama.NewConsumer([]string{"10.225.137.105:9092"}, nil)
  10. if err != nil {
  11. fmt.Println("consumer connect err:", err)
  12. return
  13. }
  14. defer consumer.Close()
  15. //获取 kafka 主题
  16. partitions, err := consumer.Partitions("code_demo")
  17. if err != nil {
  18. fmt.Println("get partitions failed, err:", err)
  19. return
  20. }
  21. for _, p := range partitions {
  22. //sarama.OffsetNewest:从当前的偏移量开始消费,sarama.OffsetOldest:从最老的偏移量开始消费
  23. partitionConsumer, err := consumer.ConsumePartition("code_demo", p, sarama.OffsetNewest)
  24. if err != nil {
  25. fmt.Println("partitionConsumer err:", err)
  26. continue
  27. }
  28. wg.Add(1)
  29. go func() {
  30. for m := range partitionConsumer.Messages() {
  31. fmt.Printf("key: %s, message: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset)
  32. }
  33. wg.Done()
  34. }()
  35. }
  36. wg.Wait()
  37. }

 消费成功

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号