赞
踩
一、Kafka的Producer小案例
假设我们现在有一个电商系统,凡是能登录系统的用户都是会员,会员的价值体现在,消费了多少钱,就会累计相应的积分。积分可以兑换礼品包,优惠券···等等。
又到了我们的画图时间????。首先我们得先来一个订单系统,那这个订单系统中肯定就会有数据日志产生,它现在就是把这些日志写到Kafka里面,日志我们使用json的方式记录。图中的statement表示订单状态,此时是已支付。
此时担任我们消费者的肯定就是会员系统了,它要对这个id为1的会员进行积分累计。当然必须要考虑到的情况是,这个会员有可能也会进行退款操作,那相应的积分也会减少。statement此时为cancel取消
我们上一讲中的设置参数中,提到我们可以给每一个消息设置一个key,也可以不指定,这个key跟我们要把这个消息发送到哪个主题的哪个分区是有关系的。比如我们现在有一个主题叫 tellYourDream,主题下面有两个分区,两个分区分别存在两个副本(此时我们不关注follower,因为它的数据是同步leader的)
- Topic:tellYourDream
- p0:leader partition <- follower partition
- p1:leader partition <- follower partition
如果是不指定key的时候,发送的一条消息会以轮询的方式发送到分区里面。也就是比如说,我第一条消息是one,那这个one就发送到了p0里面,第二条是two,就发送到了p1里面,之后的three就是p0,four就是p1···依次类推。
如果指定key,比如我的key为message1,Kafka就会取得这个key的hash值,取到的数字再对我们的分区数取模,然后根据取模的值来决定哪个分区(例如我们现在是p0,p1两个分区,取模的值就只会是0,1),取模为0,就发送到p0,取模为1,就发送到p1,这样的做法可以保证key相同的消息一定会被发送到同一个分区(也可以使用这个特性来规定某些消息一定会发送到指定的分区)。这个做法和MapReduce的shuffle是不是又类似了,所以这些大数据框架,真的互通点很多。
对于我们刚刚提到的会员系统,如果此时用户下单时的消息发送到了p0,而退款的消息发送到了p1,难免有时会发生消费者先消费到p1中的消息的情况,此时用户的积分还没有增加,就已经扣除1000了,显示就会出现问题。所以为了保证同一个用户的消息发送到同一个分区中,我们需要将其指定key。
代码部分
因为在 Kafka的生产者原理及重要参数说明 中我们已经把下面的prop.put的所有配置都已经解释过了,所以这次就直接ctrl+c,ctrl+v上来。其实就是把那时候的创建生产者的代码抽取出来成为一个createProducer()方法而已。
- public class OrderProducer {
- public static KafkaProducer<String, String> createProducer() {
- Properties props = new Properties();
- props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("buffer.memory", 33554432);
- props.put("compression.type", "lz4");
- props.put("batch.size", 32768);
- props.put("linger.ms", 100);
- props.put("retries", 10);//5 10
- props.put("retry.backoff.ms", 300);
- props.put("request.required.acks", "1");
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
- return producer;
- }
这里就是一段生产JSON格式的消息代码而已,也抽取成一个方法。
- public static JSONObject createRecord() {
- JSONObject order=new JSONObject();
- order.put("userId", 12344);
- order.put("amount", 100.0);
- order.put("statement", "pay");
- return order;
- }
这里就是直接创建生产者和消息,此时key使用userId或者订单id都行,问题不大。
- public static void main(String[] args) throws Exception {
- KafkaProducer<String, String> producer = createProducer();
- JSONObject order=createRecord();
- ProducerRecord<String, String> record = new ProducerRecord<>(
- "tellYourDream",order.getString("userId") ,order.toString());
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if(exception == null) {
- System.out.println("消息发送成功");
- } else {
- //进行处理
- }
- }
- });
- Thread.sleep(10000);
- producer.close();
- }
- }
此时如果进行过重试机制后,消息还存在异常的话,公司比较严谨的项目都会有备用链路,比如把数据存到MySQL,Redis···等来保证消息不会丢失。
补充:自定义分区(可自行了解)
因为其实Kafka自身提供的机制已经基本满足生产环境中的使用了,所以这块就不展开详细的说明了。此外还有自定义序列化,自定义拦截器,这些在工作当中使用得频率不高,如果用到大概可以进行百度自行学习。
例如,通话记录中,给客服打电话的记录要存到一个分区中,其余的记录均分的分布到剩余的分区中。我们就这个案例来进行演示,要自定义的情况就要实现Partition接口,然后实现3个方法,说是实现3个,其实主要也就实现partition()这个方法而已。
- package com.bonc.rdpe.kafka110.partitioner;
- import java.util.List;import java.util.Map;
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.common.Cluster;
- import org.apache.kafka.common.PartitionInfo;
- /**
- * @Title PhonenumPartitioner.java
- * @Description 自定义分区器
- * @Date 2018-06-25 14:58:14
- */
- public class PhonenumPartitioner implements Partitioner{
- @Override
- public void configure(Map<String, ?> configs) {
- // TODO nothing
- }
-
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // 得到 topic 的 partitions 信息
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- int numPartitions = partitions.size();
- // 模拟某客服
- if(key.toString().equals("10000") || key.toString().equals("11111")) {
- // 放到最后一个分区中
- return numPartitions - 1;
- }
- String phoneNum = key.toString();
- return phoneNum.substring(0, 3).hashCode() % (numPartitions - 1);
- }
-
-
- @Override
- public void close() {
- // TODO nothing
- }
使用自定义分区器
- package com.bonc.rdpe.kafka110.producer;
- import java.util.Properties;
- import java.util.Random;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
-
-
-
- /**
- * @Title PartitionerProducer.java
- * @Description 测试自定义分区器
- * @Date 2018-06-25 15:10:04
- */public class PartitionerProducer {
- private static final String[] PHONE_NUMS = new String[]{
- "10000", "10000", "11111", "13700000003", "13700000004",
- "10000", "15500000006", "11111", "15500000008",
- "17600000009", "10000", "17600000011"
- };
-
-
-
-
- public static void main(String[] args) throws Exception {
-
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
- // 设置分区器
- props.put("partitioner.class", "com.bonc.rdpe.kafka110.partitioner.PhonenumPartitioner");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer<String, String> producer = new KafkaProducer<>(props);
-
- int count = 0;
- int length = PHONE_NUMS.length;
-
- while(count < 10) {
- Random rand = new Random();
- String phoneNum = PHONE_NUMS[rand.nextInt(length)];
- ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", phoneNum, phoneNum);
- RecordMetadata metadata = producer.send(record).get();
- String result = "phonenum [" + record.value() + "] has been sent to partition " + metadata.partition();
- System.out.println(result);
- Thread.sleep(500);
- count++;
- }
- producer.close();
- }
- }
自定义分区结果:
二、Kafka消费者原理解析
1.offset 偏移量
此时再次请出我们的kafka集群,有多个消费者同时去消费集群中的信息
如果程序一直在稳定执行,那我们的整个流程是不会出现啥问题的,可是现在如果程序停止执行了呢?有可能是程序出现了bug,也有可能是因为我们进行修改手动停止了程序。那下一次恢复的时候,消费者又该从哪个地方开始消费?
- Topic:tellYourDream ConsumerA
- tellYourDream:p0(10000)
- tellYourDream:p1(10001)
offset就类似于数组下标的那种理解类似,比如数组的下标为什么要从0开始,基于数组的内存模型。就是所处数组位置离首地址的距离而定。array[0]就是偏移为0的位置,也就是首地址。array[k]也就是偏移为k的位置。kafka中的offset也是同样的理解,这个偏移量其实就是记录一个位置而使用的。用来标识消费者这次消费到了这个位置。
在kafka里面,kafka是不帮忙维护这个offset偏移量的,这个offset需要consumer自行维护。kafka提供了两个关于offset的参数,一个是enable_auto_commit,当这个参数设置为true的时候,每次重启kafka都会把所有的数据重新消费一遍。再一个是auto_commit_interval_ms,这个是每次提交offset的一个时间间隔。
这个offset的存储位置在0.8版本(再次划重点,0.8之前的kafka尽量不要使用)之前,是存放在zookeeper里面的。这个设计明显是存在问题的,整个kafka集群有众多的topic,而系统中又有成千上万的消费者去消费它们,如果offset存放在zookeeper上,消费者每次都要提交给zookeeper这个值,这样zookeeper能顶得住吗?如果这时候觉得没啥问题的同学,那你就是没认真去读 插曲:Kafka的集群部署实践及运维相关 中的 3.4—消费信息 啦,赶快去复习一下????。
在0.8版本之后,kafka就把这个offset存在了内部的一个主题里面,这个主题的名字叫做 consumer_offset。这个内部主题默认有50个分区,我们知道,消费者组是有它们的一个group.id的。提交过去的时候,key是group.id+topic+分区号(这是为了保证Kakfa集群中同分区的数据偏移量都提交到consumer_offset的同一个分区下)。这句话有点绕口,不过请务必读懂。
value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact。也就是每个group.id+topic+分区号就保留最新的那条数据即可。而且因为这个 consumer_offsets可能会接收高并发的请求,所以默认分区50个,这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力,就好很多。
2.Coordinator
每个consumer group都会选择一个broker作为自己的coordinator,负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance,
根据内部的一个选择机制,会挑选一个对应的Broker,Kafka会把各个消费组均匀分配给各个Broker作为coordinator来进行管理,consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的coordinator所在的broker进行通信,然后由coordinator分配分区给这个consumer来进行消费。coordinator会尽可能均匀的分配分区给各个consumer来消费。
2.1 如何选择哪台是coordinator?
首先对消费组的groupId进行hash,接着对consumer_offsets的分区数量取模,默认是50,可以通过offsets.topic.num.partitions来设置,找到你的这个consumer group的offset要提交到consumer_offsets的哪个分区。比如说:groupId,“membership-consumer-group” -> hash值(数字)-> 对50取模(结果只能是0~49,又是以往的那个套路) -> 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,找到consumer_offsets的一个分区(这里consumer_offset的分区的副本数量默认来说1,只有一个leader),然后对这个分区找到对应的leader所在的broker,这个broker就是这个consumer group的coordinator了,consumer接着就会维护一个Socket连接跟这个Broker进行通信。
其实简单点解释,就是找到consumer_offsets中编号和它对应的一个分区而已。取模后是2,那就找consumer_offsets那50个分区中的第二个分区,也就是p1。取模后是10,那就找consumer_offsets那50个分区中的第十个分区,也就是p9.
2.2 coordinator完成了什么工作
然后这个coordinator会选出一个leader consumer(谁先注册上来,谁就是leader),这时候coordinator也会把整个Topic的情况汇报给leader consumer,,由leader consumer来制定消费方案。之后会发送一个SyncGroup请求把消费方案返回给coordinator。
用一小段话再总结一遍吧:
首先有一个消费者组,这个消费者组会有一个它们的group.id号,根据这个可以计算出哪一个broker作为它们的coodinator,确定了coordinator之后,所有的consumer都会发送一个join group请求注册。之后coordinator就会默认把第一个注册上来的consumer选择成为leader consumer,把整个Topic的情况汇报给leader consumer。之后leader consumer就会根据负载均衡的思路制定消费方案,返回给coordinator,coordinator拿到方案之后再下发给所有的consumer,完成流程。
consumer都会向coordinator发送心跳,可以认为consumer是从节点,coordinator是主节点。当有consumer长时间不再和coordinator保持联系,就会重新把分配给这个consumer的任务重新执行一遍。如果断掉的是leader consumer,就会重新选举新的leader,再执行刚刚提到的步骤。
2.3 分区方案的负载均衡
如果临时有consumer加入或退出,leader consumer就需要重新制定消费方案。
比如我们消费的一个主题有12个分区:
p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11
假设我们的消费者组里面有三个消费者
2.3.1 range策略
range策略就是按照partiton的序号范围
- p0~3 consumer1
- p4~7 consumer2
- p8~11 consumer3
2.3.2.round-robin策略
- consumer1:0,3,6,9
- consumer2:1,4,7,10
- consumer3:2,5,8,11
但是前面的这两个方案有个问题:
假设consuemr1挂了:p0-5分配给consumer2,p6-11分配给consumer3
这样的话,原本在consumer2上的的p6,p7分区就被分配到了 consumer3上。
2.3.3.sticky策略
最新的一个sticky策略,就是说尽可能保证在rebalance的时候,让原本属于这个consumer
的分区还是属于他们,
然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略
- consumer1:0-3
- consumer2: 4-7
- consumer3: 8-11
假设consumer3挂了
- consumer1:0-3,+8,9
- consumer2: 4-7,+10,11
2.3.4 Rebalance分代机制
在rebalance的时候,可能你本来消费了partition3的数据,结果有些数据消费了还没提交offset,结果此时rebalance,把partition3分配给了另外一个consumer了,此时你如果提交partition3的数据的offset,能行吗?必然不行,所以每次rebalance会触发一次consumer group generation,分代,每次分代会加1,然后你提交上一个分代的offset是不行的,那个partiton可能已经不属于你了,大家全部按照新的partiton分配方案重新消费数据。
以上就是比较重要的事情了,之后到了轻松愉快的代码时间。
三、消费者代码部分
其实和生产者不能说它们一模一样可是结构完全就是一样的,所以会比生产者的时候更加简短点。因为已经知道有这些东西了,很多东西通过搜索引擎就不难解决了。
- public class ConsumerDemo {
- private static ExecutorService threadPool = Executors.newFixedThreadPool(20);
-
- public static void main(String[] args) throws Exception {
- KafkaConsumer<String, String> consumer = createConsumer();
-
- //指定消费的主题
- consumer.subscribe(Arrays.asList("order-topic"));
- try {
- while(true) {
- //这里设置的是一个超时时间
- ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE);
- //对消费到的数据进行业务处理
- for(ConsumerRecord<String, String> record : records) {
- JSONObject order = JSONObject.parseObject(record.value());
- threadPool.submit(new CreditManageTask(order));
- }
- }
- } catch(Exception e) {
- e.printStackTrace();
- consumer.close();
- }
- }
-
- private static KafkaConsumer<String, String> createConsumer() {
- //设置参数的环节
- Properties props = new Properties();
- props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
- props.put("group.id", "test-group");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("heartbeat.interval.ms", 1000); // 这个尽量时间可以短一点
- props.put("session.timeout.ms", 10 * 1000); // 如果说kafka broker在10秒内感知不到一个consumer心跳
- props.put("max.poll.interval.ms", 30 * 1000); // 如果30秒才去执行下一次poll
- // 就会认为那个consumer挂了,此时会触发rebalance
- // 如果说某个consumer挂了,kafka broker感知到了,会触发一个rebalance的操作,就是分配他的分区
- // 给其他的cosumer来消费,其他的consumer如果要感知到rebalance重新分配分区,就需要通过心跳来感知
- // 心跳的间隔一般不要太长,1000,500
- props.put("fetch.max.bytes", 10485760);
- props.put("max.poll.records", 500); // 如果说你的消费的吞吐量特别大,此时可以适当提高一些
- props.put("connection.max.idle.ms", -1); // 不要去回收那个socket连接
- // 开启自动提交,他只会每隔一段时间去提交一次offset
- // 如果你每次要重启一下consumer的话,他一定会把一些数据重新消费一遍
- props.put("enable.auto.commit", "true");
- // 每次自动提交offset的一个时间间隔
- props.put("auto.commit.ineterval.ms", "1000");
- // 每次重启都是从最早的offset开始读取,不是接着上一次
- props.put("auto.offset.reset", "earliest");
-
-
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
- return consumer;
- }
-
- static class CreditManageTask implements Runnable {
- private JSONObject order;
- public CreditManageTask(JSONObject order) {
- this.order = order;
- }
- @Override
- public void run() {
- System.out.println("对订单进行积分的维护......" + order.toJSONString());
- // 就可以做一系列的数据库的增删改查的事务操作
- }
- }
- }
3.1 消费者的核心参数
3.1.1 【heartbeat.interval.ms】
consumer心跳时间,必须得保持心跳才能知道consumer是否故障了,然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作
3.1.2 【session.timeout.ms】
kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒
3.1.3 【max.poll.interval.ms】
如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一遍来说结合你自己的业务处理的性能来设置就可以了
3.1.4【fetch.max.bytes】
获取一条消息最大的字节数,一般建议设置大一些
3.1.5 【max.poll.records】
一次poll返回消息的最大条数,默认是500条
3.1.6 【connection.max.idle.ms】
consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收
3.1.7 【auto.offset.reset】
- earliest:
- 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- topicA -> partition0:1000
- partitino1:2000
- latest:
- 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从当前位置开始消费
- none:
- topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
注:我们生产里面一般设置的是latest
3.1.8 【enable.auto.commit】
这个就是开启自动提交唯一
3.1.9 【auto.commit.ineterval.ms】
这个指的是多久条件一次偏移量
四、加餐时间:补充第一篇没提到的内容
日志二分查找
其实这也可以被称作稀松索引。也是一个类似跳表的结构。打开某主题下的分区,我们能看到这样的一些文件
- 00000000000000000000.index(偏移量的索引)
- 00000000000000000000.log(日志文件)
- 00000000000000000000.timeindex(时间的索引)
日志段文件,.log文件会对应一个.index和.timeindex两个索引文件。kafka在写入日志文件的时候,同时会写索引文件,就是.index和.timeindex,一个是位移索引,一个是时间戳索引。
默认情况下,有个参数log.index.interval.bytes限定了在日志文件写入多少数据,就要在索引文件写一条索引,默认是4KB,写4kb的数据然后在索引里写一条索引,所以索引本身是稀疏格式的索引,不是每条数据对应一条索引的。而且索引文件里的数据是按照位移和时间戳升序排序的,所以kafka在查找索引的时候,会用二分查找,时间复杂度是O(logN),找到索引,就可以在.log文件里定位到数据了。
上面的0,2039···这些代表的是物理位置。为什么稀松索引会比直接一条条读取速度快,它不是每一条数据都会记录,是相隔几条数据的记录方式,但是就比如现在要消费偏移量为7的数据,就直接先看这个稀松索引上的记录,找到一个6时,7比6大,然后直接看后面的数据,找到8,8比7大,再看回来,确定7就是在6~8之间,而6的物理位置在9807,8的物理位置在12345,直接从它们中间去找。就提升了查找物理位置的速度。就类似于普通情况下的二分查找。
ISR机制
光是依靠多副本机制能保证Kafka的高可用性,但是能保证数据不丢失吗?不行,因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,当时刚才的数据已经丢失了。
ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的数量,只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader,因为在这个ISR列表里代表他的数据跟leader是同步的。
如果要保证写入kafka的数据不丢失,首先需要保证ISR中至少有一个follower,其次就是在一条数据写入了leader partition之后,要求必须复制给ISR中所有的follower partition,才能说代表这条数据已提交,绝对不会丢失,这是Kafka给出的承诺
那什么情况下副本会被踢出出ISR呢,如果一个副本超过10s没有去和leader同步数据的话,那么它就会被踢出ISR列表。但是这个问题如果解决了(网络抖动或者full gc···等),follower再次和leader同步了,leader会有一个判断,如果数据差异小就会让follower重新加入,那么怎么才算差异大,怎么才算小呢,咱们留到源码时说明。
finally
这次的篇幅非常非常长,而且需要理解的地方也不少,后面其实本来在kafka的内核里还有个HW&LEO原理的,可自己都懒得继续写了hhh。下次源码篇的时候咱们再聊吧。
————————————————
出处:https://blog.csdn.net/qq_33440092/article/details/103864064
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。