当前位置:   article > 正文

Kafka学习笔记_在 kafka ha 中,当 partition 对应的 leader 宕机时需要从 followe

在 kafka ha 中,当 partition 对应的 leader 宕机时需要从 follower 中选举出

Kafka介绍

Kafka是由LinkedIn开发的一个分布式的消息系统,最初是用作LinkedIn的活动流(Activity Stream)和运营数据处理的基础。

活动流数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。

运营数据指的是服务器的性能数据(CPU、IO使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。

Kafka使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布

式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。

综上,Kafka是一种分布式的,基于发布/订阅的消息系统,能够高效并实时的吞吐数据,以及通过分布式集群及数据复制冗余机制(副本冗余机制)实现数据的安全

常用Message Queue对比

RabbitMQ

RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP, XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。

Redis

Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value 数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。

ZeroMQ

ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty(NIO)作为传输模块)。

ActiveMQ

ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实

现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。

适用场景

Messaging
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性"“消息传输担保(消息确认机制)”"消息分组"等企业级特性;kafka只能使用作为"常 规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)

Website activity tracking
kafka可以作为"网站活性跟踪"的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等

Metric
Kafka通常被用于可操作的监控数据。这包括从分布式应用程序来的聚合统计用来生产集中的运营数据提要。

Log Aggregatio
kafka的特性决定它非常适合作为"日志收集中心";application可以将操作日志"批量""异
步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等, 这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统

Kafka配置

实现步骤:
1.从官网下载安装包 http://kafka.apache.org/downloads 2.上传到01虚拟机,解压
3.进入安装目录下的config目录4.对server.properties进行配置配置示例:
broker.id=0 log.dirs=/home/software/kafka/kafka-logs
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181

delete.topic.enable=true advertised.host.name=192.168.234.21 advertised.port=9092

5.保存退出后,别忘了在安装目录下创建 kafka-logs目录
6.配置其他两台虚拟机,更改配置文件的broker.id编号(不重复即可) 7.先启动zookeeper集群
8.启动kafka集群进入bin目录
执行:sh kafka-server-start.sh …/config/server.properties

Kafka在Zookeeper下的路径:

Kafka使用

1.创建自定义的topic
在bin目录下执行:
sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 – partitions 1 --topic enbook

注:副本数量要小于等于节点数量

2.查看所有的topic
执行:sh kafka-topics.sh --list --zookeeper hadoop01:2181

3.启动producer
执 行 :sh kafka-console-producer.sh --broker-list hadoop01:9092 --topic enbook

4.启动consumer
执行:[root@hadoop01 bin]# sh kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic enbook --from-beginning
5.可以通过producer和consumer模拟消息的发送和接收

6.删除topic指令:
进入bin目录,执行:sh kafka-topics.sh --delete --zookeeper hadoop01:2181 – topic enbook

可以通过配置 config目录下的 server.properties文件,加入如下的配置:

配置示例:
delete.topic.enable=true

Kafka架构

1.producer:
消息生产者,发布消息到kafka 集群的终端或服务。
2.broker:
kafka 集群中包含的服务器。broker (经纪人,消费转发服务)
3.topic:
每条发布到kafka 集群的消息属于的类别,即kafka 是面向topic 的。
4.partition:
partition 是物理上的概念,每个topic 包含一个或多个partition。kafka 分配的单位是
partition。
5.consumer:
从kafka 集群中消费消息的终端或服务。
6.Consumer group:
high-level consumer API 中,每个consumer 都属于一个consumer group,每条消息只能被consumer group 中的一个 Consumer 消费,但可以被多个consumer group 消费。
即组间数据是共享的,组内数据是竞争的。
7.replica:
partition 的副本,保障 partition 的高可用。
8.leader:
replica 中的一个角色, producer 和consumer 只跟leader 交互。
9.follower:
replica 中的一个角色,从leader 中复制数据。
10.controller:
kafka 集群中的其中一个服务器,用来进行leader election 以及 各种 failover。
11.zookeeper:
kafka 通过 zookeeper 来存储集群的meta 信息。

Topic与Partition

示意图
在这里插入图片描述
在这里插入图片描述

Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic(主题)。
Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.

Topic在逻辑上可以被认为是一个queue,每条消息都必须指定它的Topic,可以简单理解为必须指明把这条 消息放进哪个queue里。

为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对 应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且 分别有13个和19个分区,如下图所示。
在这里插入图片描述

因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比 随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
在这里插入图片描述

对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略 删除旧数据。一是基于时间,二是基于Partition文件大小。例如可以通过配置
$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时 删除旧数据,配置如下所示。

配置示例:

#The minimum age of a log file to be eligible for deletion log.retention.hours=168
#The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824

#The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
#If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.

log.cleaner.enable=false

Kafka消息流处理

Producer 写入消息序列图
在这里插入图片描述

流程说明:
1.producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
2.producer 将消息发送给该 leader
3.leader 将消息写入本地 log
4.followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
5.leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后commit 的 offset) 并向 producer 发送 ACK。

ISR指的是:比如有三个副本 ,编号是① ② ③ ,其中② 是Leader ① ③是Follower。假设在数据同步过程中,①跟上Leader,但是③出现故障或没有及时同步,则① ②是一个ISR,而
③不是ISR成员。后期在Leader选举时,会用到ISR机制。会优先从ISR中选择Leader

kafka HA

概述

同一个 partition 可能会有多个 replica(对应 server.properties 配置中的default.replication.factor=N)。

没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时producer 也不能再将数据存于其上的 patition。

引入replication 之后,同一个 partition 可能会有多个 replica,而这时需要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从leader 中复制数据。

leader failover

当 partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的所有消息。

由写入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成员才能选为 leader。
对于 f+1 个 replica,一个 partition 可以在容忍 f 个 replica 失效的情况下保证消息不丢失。比如 一个分区 有5个副本,挂了4个,剩一个副本,依然可以工作。
注意:kafka的选举不同于zookeeper,用的不是过半选举。当所有 replica 都不工作时,有两种可行的方案:

1.等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
2.选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。
kafka 0.8.* 使用第二种方式。此外, kafka 通过 Controller 来选举 leader。

Kafka API使用

旧版代码示意:

public class TestDemo {
@Test
public void producer(){
     Properties props=new Properties(); props.put("serializer.class","kafka.serializer.StringEncoder");                  
     pro ps.put("metadata.broker.list","192.168.234.11:9092"); 
     Producer<Integer,String> producer=new Producer<>(new ProducerConfig(props));
     producer.send(new KeyedMessage<Integer, String>("enbook","Think in java"));
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

新版代码示意:
i

mport java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;

public class TestDemo {

    @Test
    public void producer() throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.234.11:9092");
        Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);

        for (int i = 0; i < 100; i++) {
            ProducerRecord<Integer, String> message = new ProducerRecord<Integer, String>("enbook", "" + i);
            kafkaProducer.send(message);
        }
        while (true) ;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

创建Topic代码:

@Test
public void create_topic() {

    ZkUtils zkUtils = ZkUtils.apply("192.168.234.11:2181,192.168.234.210:2181,192.168.234.211:2181",
            30000, 30000, JaasUtils.isZkSecurityEnabled());
    // 创建一个单分区单副本名为t1的topic
    AdminUtils.createTopic(zkUtils, "t1", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
    zkUtils.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
删除Topic代码:
@Test
public void delete_topic() {
    ZkUtils zkUtils = ZkUtils.apply("192.168.234.11:2181,192.168.234.210:2181,192.168.234.211:2181",
            30000, 30000, JaasUtils.isZkSecurityEnabled());
    // 删除topic 't1'
    AdminUtils.deleteTopic(zkUtils, "t1");
    zkUtils.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
创建消费者线程并指定消费者组:
@Test
    public void consumer_1() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.234.11:9092");
        props.put("group.id", "consumer-tutorial");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("enbook", "t2"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records)
                    System.out.println("c1消费:" + record.offset() + ":" + record.value());
            }
        } catch (Exception e) {
        } finally {
            consumer.close();
        }
    }

    @Test
    public void consumer_2() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.234.11:9092");
        props.put("group.id", "consumer-tutorial");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("enbook", "t2"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records)
                    System.out.println("c2消费:" + record.offset() + ":" + record.value());
            }
        } catch (Exception e) {

        } finally {
            consumer.close();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

Kafka在Zookeeper的路径

2018年9月30日	14:46

Kafka offset机制

Consumer消费者的offset存储机制
Consumer在从broker读取消息后,可以选择commit,该操作会在Kakfa中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。 通过这一特性可以保证同一消费者从Kafka中不会重复消费数据。

底层实现原理:
执行:sh kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic enbook --from-beginning – new-consumer

执行:sh kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --list --new-consumer
查询得到的group id

进入kafka-logs目录查看,会发现多个很多目录,这是因为kafka默认会生成50个

consumer_offsets 的目录,用于存储消费者消费的offset位置。
在这里插入图片描述
Kafka会使用下面公式计算该消费者group位移保存在 consumer_offsets的哪个目录上: Math.abs(groupID.hashCode()) % 50

Kafka的索引机制

概述

数据文件的分段与索引

Kafka解决查询效率的手段之一是将数据文件分段,可以配置每个数据文件的最大值,每段放在一个单独的 数据文件里面,数据文件以该段中最小的offset命名。
每个log文件默认是1GB生成一个新的Log文件,比如新的log文件中第一条的消息的offset 16933,则此log 文件的命名为:000000000000000016933.log,此外,每生成一个log文件,就会生成一个对应的索引
(index)文件。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段 中。

数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才 能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文 件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。索引文件中包含若干个索引条目,每个 条目表示数据文件中一条Message的索引——Offset与position(Message在数据文件中的绝对位置)的对应 关系。

稀疏索引+二分查找,可以加快查找速度

在这里插入图片描述

index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的 数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没 有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫 描的范围就很小了。

索引文件被映射到内存中,所以查找的速度还是很快的。

Kafka的消息系统语义

概述

在一个分布式发布订阅消息系统中,组成系统的计算机总会由于各自的故障而不能工作。在Kafka中,一个单独的broker,可能会在生产者发送消息到一个topic的时候宕机,或者出现网络故障,从而导致生产者发送消息失败。根据生产者如何处理这样的失败,产生了不通的语义:

至少一次语义(At least once semantics):

如果生产者收到了Kafka broker的确认(acknowledgement,ack),并且生产者的acks配置项设置为all(或-1),这就意味着消息已经被精确一次写入Kafka topic了。然而,如果生产者接收ack超时或者收到了错误,它就会认为消息没有写入Kafka topic而尝试重新发送消息。如果broker恰好在消息已经成功写入Kafka topic后,发送ack前,出了故障,生产者的重试机制就会导致这条消息被写入Kafka两次,从而导致同样的消息会被消费者消费不止一次。每个人都喜欢一个兴高采烈的给予者,但是这种方式会导致重复的工作和错误的结果。

至多一次语义(At most once semantics):

如果生产者在ack超时或者返回错误的时候不重试发送消息,那么消息有可能最终并没有写入Kafka topic 中,因此也就不会被消费者消费到。但是为了避免重复处理的可能性,我们接受有些消息可能被遗漏处理。

精确一次语义(Exactly once semantics):

即使生产者重试发送消息,也只会让消息被发送给消费者一次。精确一次语义是最令人满意的保证,但也是最难理解的。因为它需要消息系统本身和生产消息的应用程序还有消费消息的应用程序一起合作。比如,在成功消费一条消息后,你又把消费的offset重置到之前的某个offset位置,那么你将收到从那个offset到最新的offset之间的所有消息。这解释了为什么消息系统和客户端程序必须合作来保证精确一次语义。

潜在的问题

假设有一个单进程生产者程序,发送了消息“Hello Kafka“给一个叫做“EoS“的单分区Kafka topic。然后有一个单实例的消费者程序在另一端从topic中拉取消息,然后打印。在没有故障的理想情况下,这能很好的工作,“Hello Kafka“只被写入到EoS topic一次。消费者拉取消息,处理消息,提交偏移量来说明它完成了处理。然后,即使消费者程序出故障重启也不会再收到“Hello Kafka“这条消息了。

然而,我们知道,我们不能总认为一切都是顺利的。在上规模的集群中,即使最不可能发生的故障场景都可能最终发生。比如:

1.broker可能故障:

Kafka是一个高可用、持久化的系统,每一条写入一个分区的消息都会被持久化并且多副本备份(假设有n 个副本)。所以,Kafka可以容忍n-1哥broker故障,意味着一个分区只要至少有一个broker可用,分区就可用。Kafka的副本协议保证了只要消息被成功写入了主副本,它就会被复制到其他所有的可用副本
(ISR)。

2.producer到broker的RPC调用可能失败:

Kafka的持久性依赖于生产者接收broker的ack。没有接收成功ack不代表生产请求本身失败了。broker可能在写入消息后,发送ack给生产者的时候挂了。甚至broker也可能在写入消息前就挂了。由于生产者没有办法知道错误是什么造成的,所以它就只能认为消息没写入成功,并且会重试发送。在一些情况下,这会造成同样的消息在Kafka分区日志中重复,进而造成消费端多次收到这条消息。

新版本Kafka的幂等性实现

一个幂等性的操作就是一种被执行多次造成的影响和只执行一次造成的影响一样的操作。现在生产者发送的操作是幂等的了。如果出现导致生产者重试的错误,同样的消息,仍由同样的生产者发送多次,将只被写到kafka broker的日志中一次。对于单个分区,幂等生产者不会因为生产者或broker故障而发送多条重复消

息。想要开启这个特性,获得每个分区内的精确一次语义,也就是说没有重复,没有丢失,并且有序的语义,只需要设置producer配置中的”enable.idempotence=true”。

这个特性是怎么实现的呢?在底层,它和TCP的工作原理有点像,每发送到Kafka的消息都将包含一个序列号,broker将使用这个序列号来删除重复的发送。和只能在瞬态内存中的连接中保证不重复的TCP不同,这个序列号被持久化到副本日志,所以,即使分区的leader挂了,其他的broker接管了leader,新leader仍可以判断重新发送的是否重复了。这种机制的开销非常低:每批消息只有几个额外的字段。你将在这篇文章的后面看到,这种特性比非幂等的生产者只增加了可忽略的性能开销。

此外,Kafka除了构建于生产者—>broker的幂等性之外,从broker->消费者的精确一次流处理现在可以通过Apache Kafka的流处理API实现了。
你仅需要设置配置:“processing.guarantee = exact_once”。 这可以保证消费者的所有处理恰好发生一次。

这就是为什么Kafka的Streams API提供的精确一次性保证是迄今为止任何流处理系统提供的最强保证。 它为流处理应用程序提供端到端的一次性保证,从Kafka读取的数据,Streams应用程序物化到Kafka的任何状态,到写回Kafka的最终输出。 仅依靠外部数据系统来实现状态支持的流处理系统对于精确一次的流处理提供了较少的保证。 即使他们使用Kafka作为流处理的源并需要从失败中恢复,他们也只能倒回他们的Kafka偏移量来重建和重新处理消息,但是不能回滚外部系统中的关联状态,导致状态不正确,更新不是幂等的。

通过代码来设置消息系统语义

Producer的至多一次:

  @Test
    public void producer() throws ExecutionException {
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.150.137:9092,192.168.150.138:9092");
        props.put("acks", 0);

    Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);
    for (int i = 0; i < 100; i++) {
        ProducerRecord<Integer, String> message = new ProducerRecord<Integer, String>("jpbook", "" + i);
        kafkaProducer.send(message);
    }
    while (true) ;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Producer的至少一次:

 @Test
    public void producer() throws ExecutionException {
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.150.137:9092,192.168.150.138:9092");
        props.put("acks", all);

    Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);
    for (int i = 0; i < 100; i++) {
        ProducerRecord<Integer, String> message
                = new ProducerRecord<Integer, String>("jpbook", "" + i);
        kafkaProducer.send(message);
    }
    while (true) ;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

Producer的精确一次:

@Test
public void producer() throws ExecutionException {

    Properties props = new Properties();
    props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.150.137:9092,192.168.150.138:9092");
    props.put("acks", "all");
    props.put("enable.idempotence", "true");

    Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);
    for (int i = 0; i < 100; i++) {
        ProducerRecord<Integer, String> message
                = new ProducerRecord<Integer, String>("jpbook", "" + i);
        kafkaProducer.send(message);
    }
    while (true) ;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

Consumer的至多一次:
kafka consumer是默认至多一次,consumer的配置是:
1.设置enable.auto.commit 为 true.
2.设置 auto.commit.interval.ms为一个较小的值.
3.consumer不去执行 consumer.commitSync(), 这样, Kafka 会每隔一段时间自动提交offset。

@Test
public void consumer_2() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.150.137:9092,192.168.150.138:9092");
    props.put("group.id", "g1");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "101");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("enbook", "t2"));
    try {

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord<String, String> record : records)
                System.out.println("g1组c2消费者,分区编号:" + record.partition() + "offset:" + record.offset() + ":" + record.value());
        }
    } catch (Exception e) {
        // TODO
    } finally {
        consumer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

Consumer的至少一次:
设置enable.auto.commit 为 false 或者
设置enable.auto.commit为 true 并设置auto.commit.interval.ms为一个较大的值. 处理完后consumer调用consumer.commitSync()

  @Test
    public void consumer_2() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.150.137:9092,192.168.150.138:9092");
        props.put("group.id", "g1");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("enable.auto.commit", "false");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("enbook", "t2"));
    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord<String, String> record : records)
                //--Process……
                //--处理完成后,用户自己手动提交offset
                consumer.commitAsync();
        }
    } catch (Exception e) {
        // TODO
    } finally {
        consumer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

Consumer的精确一次:

@Test
public void consumer_2() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.150.137:9092,192.168.150.138:9092");
    props.put("group.id", "g1");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("enable.auto.commit", "false");
    props.put("processing.guarantee", "exact_once");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("enbook", "t2"));

    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord<String, String> record : records)
            //--Process……
            //--处理完成后,用户自己手动提交offset
                consumer.commitAsync();
        }
    } catch (Exception e) {
        // TODO
    } finally {
        consumer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

Kafka参数配置

PropertyDefaultDescription
broker.id每个broker都可以用一个唯一的非负整数id进行标识;这 个id可以作为broker的“名字”,并且它的存在使得broker无须混淆consumers就可以迁移到不同的host/port上。你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。
log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。
zookeeper.connectnullZooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;为了当某个host宕掉之后你能通过其他ZooKeeper节点进行连接,你可以按照一下方式制定多个hosts:hostname1:port1, hostname2:port2, hostname3:port3.ZooKeeper 允许你增加一个“chroot”路径,将集群中所有kafka数据存放在特定的路径下。当多个Kafka集群或者 其他应用使用相同ZooKeeper集群时,可以使用这个方式 设置数据存放路径。这种方式的实现可以通过这样设置连 接 字 符 串 格 式 , 如 下 所 示 : hostname1:port1,hostname2:port2, hostname3:port3/chroot/path这样设置就将所有kafka集群数据存放在/chroot/path路径下。注意,在你启动broker之前,你必须创建这个路径, 并且consumers必须使用相同的连接格式。
num.partitions1如果创建topic时没有给出划分partitions个数,这个数字将是topic下partitions数目的默认数值。主题的分区数直接决定了消费的并行度,所以在实际生产中,分区数一般都在几十个或几百个。此外注意:提供分区的副本数,并不能提高并发度,因为无论是生成者还是消费者,都是和副本的Leader交互。
log.segment.bytes102410241024topic partition的日志存放在某个目录下诸多文件中,这些文件将partition的日志切分成一段一段的;这个属性就是每个文件的最大尺寸;当尺寸达到这个数值时,就会创建新文件。此设置可以由每个topic基础设置时进行覆盖。
log.roll.hours24 * 7即使文件没有到达log.segment.bytes,只要文件创建时间到达此属性,就会创建新文件。这个设置也可以有topic层 面的设置进行覆盖;
log.index.size.max.bytes1010241024每个log segment的最大尺寸。注意,如果log尺寸达到这个数值,即使尺寸没有超过log.segment.bytes限制,也需要产生新的log segment。
replica.lag.time.max.ms10000如果一个follower在这个时间内没有发送fetch请求, leader将从ISR中移除这个follower
replica.fetch.max.bytes1024*1024备份时每次fetch的最大值
unclean.leader.election.enabletrue指明了是否能够使不在ISR中replicas设置用来作为leader
delete.topic.enablefalse能够删除topic
boostrap.servers用于建立与kafka集群连接的host/port组。数据将会在所 有servers上均衡加载,不管哪些server是指定用于bootstrapping。这个列表仅仅影响初始化的hosts(用于发 现 全 部 的 servers) 。 这 个 列 表 格 式 : host1:port1,host2:port2,…因为这些server仅仅是用于初始化的连接,以发现集群所 有成员关系(可能会动态的变化),这个列表不需要包含所有的servers(你可能想要不止一个server,尽管这样, 可能某个server宕机了)。如果没有server在这个列表出 现,则发送数据会一直失败,直到列表可用。
acks1producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为 常用选项:(1)acks=0: 设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功 接收数据,同时重试配置不会发生作用(因为客户端不知 道是否失败)回馈的offset会总是设置为-1; (2)acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此 时leader又挂掉,则消息会丢失。(3)acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢 失数据。这是最强的保证。(4)其他的设置,例如acks=2也是可以的,这将需要给 定的acks数量,但是这种策略一般很少用。
batch.size16384(字节) 即:16kbproducer将试图批处理消息记录,以减少请求次数。这项配置控制默认的批量处理消息字节数。此参数控制的是生成者的批处理大小,此参数越大,会使生成的吞吐量越大,同时也会占用过大的内存空间。注意:调节测试是1024整数倍
linger.ms0Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。官网解释翻译: producer会将request传输之间到达的所有records聚合到一个批请求。通常这个值发生在欠负载情况下,record到 达速度快于发送。但是在某些场景下,client即使在正常负载下也期望减少请求数量。这个设置就是如此,通过人工 添加少量时延,而不是立马发送一个record,producer会等待所给的时延,以让其他records发送出去,这样就会被聚合在一起。这个类似于TCP的Nagle算法。该设置给了batch的时延上限:当我们获得一个partition的batch.size 大小的records,就会立即发送出去,而不管该设置;但是如果对于这个partition没有累积到足够的record,会linger指定的时间等待更多的records出现。该设置的默认 值为0(无时延)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/784951
推荐阅读
相关标签
  

闽ICP备14008679号