当前位置:   article > 正文

Kafka消息队列_kafka topic下的组

kafka topic下的组

1.Kafka 的基础架构

  1. 为了方便扩展,并提高吞吐量,一个 topic 分为多个 parition
  2. 配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
  3. 为了提高可用性,topic可以划分成多个partition,每个partition可以存在于不同的broker上面,每个partition存放一部分数据。这是天然的分布式消息队列。

下面以一个Kafka集群中2个Broker举例,创建1个topic包含3个Partition,2 Replication;数据Producer流动如图所示:
在这里插入图片描述
1). Producer:消息生产者,就是向 kafka borker 发送消息的客户端
2). Consumer: 消息消费者,向 kafka 取消息的客户端
3). Consumer Group:消费者组,由多个 consumer 组成,消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费,消费者组内互不影响
4). Broker:一台 Kafka 服务器就是一个 broker,一个集群多个 broker 组成
5). Topic:可以理解一个队列,生产者消费者面向的都是一个 topic
6). Partition:为实现扩展性,一个非常大的 topic 可以分布到多个 broker上,一个 topic 可以分成多个 partition,每个 partition 是一个有序队列

Kafka 的启动需要依赖 Zookeeper,查看 Zookeper 的启动状态:

bin/zkServer.sh status

2.Kafka 命令行操作

1.启动 Kafka:

bin/kafka-server-start.sh -daemon config/server.properties

2.查看 kafka 里的 topic 的数量

bin/kafka-topics.sh --zookeeper zk地址 --list

因为kafka 是注册进 Zookeeper 的,查询 topic 和创建 topic 都需要指定 zookeeper

3.创建一个 topic

bin/kafka-topics.sh --zookeeper zk地址 --create --topic first --partition 3 --replication-factor 2

4.查看一个 topic 的描述

bin/kafka-topics.sh --zookeeper zk地址 --describe --topic first

5.发送消息

bin/kafka-console-producer.sh --topic first --broker-list kafka地址

6.消费消息

bin/kafka-console-consumer.sh --topic first --bootstrap-server kafka地址

7.修改分区的数量 (分区的数量只能增加)

待填充…

3.Kafka 的文件存储机制

在这里插入图片描述

4.数据的可靠性保证

1). 何时发送 ack?

确保有 follower 和 leader 同步完成,leader 再发送 ack,这样才能保证 leader 挂掉之后,能在 follower 中选举出新的 leader

2). 多少个 follower 同步完成之后发送 ack ?

半数以上的 follower 同步完成

Kafka 为用户提供了三种可靠性级别,用户根据可靠性和延迟权衡,选择一下的配置:

ack:

0: Producer 不等待 broker 的 ack,这一操作提供一个最低的延迟,broker 一接收到还没有完全写入磁盘就已经返回 ack ,当 broker 故障时就有可能发生 丢失数据

1: Producer 等待 broker 的 ack,partition 的 leader 罗盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么就会发生 丢失数据

-1: Producer 等待 broker 的 ack,partition 的 leader 和 follower 全部罗盘成功后才返回 ack ,但是如果在 follower 同步完成之后, broker 发送 ack 之前,leader 发生故障,会造成 数据重复

5.Kafka 的 ISR

Leader 维护了一个动态的 ISR,意为和 leader 保持同步的 follower 集合,当 ISR 中的 follower 完成数据的同步之后, leader 就会给 follower 发送 ack,如果 follower 长时间未向 leader 同步数据,则该 follower 将被踢出 ISR,该时间阈值由 replia.lag.time.max.ms 参数设定。

Leader 发生了故障,就会从 ISR 中选举新的 leader

ack应答机制:对于某些不太重要的数据,对数据的可靠性要求不高,能够容忍数据的少量丢失,所以没有必要等待 ISR 中的 follower 全部接收成功

6.Kafka 的 LEO 和 HW

LEO:每个副本的最后一个 offset
HW:所有副本中最小的 LEO

在这里插入图片描述

7.Exactly Once 含义

对于某些比较重要的消息,我们需要保证 exactly once 语义,保证每条消息被发送一次,且仅被发送一次

在 0.11 版本之后, kafka 引入了幂等性机制,配合 acks=-1时的 at least once 语义,实现 producer 到 broker 的 exactly once

使用时:只需将 enable.idempotence 属性设置为 true,kafka 自动将 acks 属性设为 -1

8.Kafka是如何实现幂等性的?

kakfa 是给消息加上id,才保证幂等性的

首先是存在多个 producer 给一个 kafka 发送消息,每个 producer 有一个 PID,每条消息到达 kafka 后,有一个 sequence number (这个 sequence number 会先缓存)。如果 PID+sequence number 在缓存内,这条消息就视为重复,会被丢失

9.Kafka 消费者

consumer 采用 pull (拉)模式从 broker 中读取数据

因为 push(推)很难适应消费速率不同的消费者,因为消息发送的速率是由 broker 决定

pull 的不足之处是,如果 kafka 没有数据,消费者可能会小茹循环中,一直返回空数据,针对这一点,kafka 的消费者在消费数据是会传入一个时长参数 timeout,如果当前没有数据可供消费, consumer 会等待一段时间再返回,这段时间即为 timeout

9.1分区分配策略

一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,必然会涉及到 partition 的分配问题

Kafka 有两种分配策略,一种是 roundrin,一个是 range

策略详讲先欠着…忘了

10.Kafka 高效读写数据

1). 顺序写磁盘

kafka 的 producer 生产数据,要写入到 log 文件中,写的过程一直追加到末端,为顺序写,顺序写之所以这么快,是因为省去了大量磁头寻址的时间

2). 零拷贝技术

省去文件拷贝到用户应用层那一段时间

11. Zookeeper 在 Kafka 的作用

kafka 集群中有一个 broker 会被选举成 KafkaController,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等供

12. Kafka API

12.1 Producer Api

kafka 的 producer 发小消息采用的是异步发送的方式,在消息发送的过程中,涉及两个线程: main 线程 和 sender 线程,以及一个 线程共享变量—RecordAccumulator

main 线程将消息发送给 RecordAccumulator ,Sender 线程不断从 RecordAccumulator 中拉取消息到 Kafka broker

在这里插入图片描述
batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据

linger.ms:如果数据迟迟未到达 batch.size,sender 等待 linger.time 之后才会发送数据

Talking is cheap,show my code:

public class CustomeProducer {

    public static void main(String[] args) {

        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.9:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);


        List<String> interceptors=new ArrayList<String>();
        interceptors.add("com..interceptor.CouterInterceptor");
        interceptors.add("com..kafka.interceptor.TimeInterceptor");
        //指定拦截器(拦截器自己定义的)
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);



        //1.创建一个生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        //2.调用send方法
        for (int i=0;i<100;i++){
           producer.send(new ProducerRecord<String, String>("first", i + "", "message_" + i));
        }

        //3.关闭生产者
        producer.close();


    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

12.2 Consumer Api

consumer消费数据时的可靠性是很容日保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题

consumer 在消费过程中可能会出现宕机等故障, consumer 恢复后,需要重故障前的位置继续消费, 所以 consumer 需要实时记录自己消费到了哪个 offset

offset 的维护是 consumer 消费数据时必须考虑的问题

4.2.1 手动提交 offset

(采用方式)

  • KafkaConsumer: 创建一个消费者对象,消费数据
  • ConsumerConfig: 所需的一系列配置参数
  • ConsumerRecord: 每条数据都要封装成一个 ConsumerRecord 对象

4.2.2 自动提交 offset

为了使我们专注自己业务逻辑,kafka 提供自动提交 offset 的功能

enable.auto.commit:是否开启自动提交 offset 功能

auto.commit.interval.ms: 自动提交 offset 的时间间隔

Talking is cheap,show my code:

public class CustomConsumer {

    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.9:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        //消费者组的 id 只要group.id相同,就属于同一个消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"1205");
        //自动提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);


        //1.创建一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //指定消费哪个主题
        consumer.subscribe(Arrays.asList("first"));
        //2.调用消费者的 poll(轮询) 方法
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record:records){
                System.out.println("topic="+ record.topic()+ "  offset="+record.offset()+" value="+record.value());

            }

            //同步提交offset
           // consumer.commitAsync();
            //异步提交offset
            consumer.commitSync();
            /**
             * 两者的区别就是   同步提交是有失败重试机制 ,异步提交没有
             * 一般用异步提交的方式,这样响应会快点
             */
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

13 自定义 Interceptor

13.1 拦截器原理

对于 Producer 而言,interceptor 使得用户在消息发送钱以及 Producer 回调逻辑前有机会对消息做一些定制化需求

定义的方法包括:

1). configure (configs)

获取配置信息和初始化数据时调用

2). onsend (ProducerRecord)

该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中,Producer 确保在消息被序列化以及计算分区前调用该方法,用户可以对消息做任何处理,但是最好保证不要修改消息所属 topic 和分区

3). onAcknowledgement (RecordMetadata ,Excepetion)

该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者消息在发送过程中失败调用, 通常在 Producer 回调之前

4). close
关闭 Interceptor,主要用于执行一些资源清理工作

下面来写两个 Kafka Interceprot

计数拦截器:

/**
 * @author jarvis
 * @date 2020/8/17 0017 9:10
 *
 * Kafka拦截器
 * 统计它发送消息到kafka 成功、失败的次数
 */
public class CouterInterceptor  implements ProducerInterceptor<String,String> {

    private long successNum=0L;
    private long errorNum=0L;

    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if(e==null){
            successNum++;
        }else {
            errorNum++;
        }

    }

    //这个方法,是在 producer生产完消息过后,调用的那个close方法才调用
    public void close() {
        System.out.println("successNum"+successNum);
        System.out.println("errorNum"+errorNum);
    }

    public void configure(Map<String, ?> map) {

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

给消息内容加上时间拦截器:

public class TimeInterceptor implements ProducerInterceptor<String,String> {

    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord<String, String>(record.topic(),record.partition(),record.timestamp(),record.key(),System.currentTimeMillis()+ record.value(),record.headers());
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    public void close() {

    }

    public void configure(Map<String, ?> map) {

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/532112
推荐阅读
相关标签
  

闽ICP备14008679号