赞
踩
什么是kafka
kafka是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点
kafka各属性介绍
Producer(生产者) Topic(标签) Consumer(消费者) Broker(容器)之间关系
举个列子:生产者(producer)生产一个萝卜,兔子(Consumer)吃一个萝卜,假设兔子(Consumer)吃的着急噎住了(宕机),生产者(producer)还在生产鸡蛋,兔子(Consumer)来不及吃了,此时萝卜丢掉了(消息丢失)。生产者(producer)每次生产100个萝卜,兔子(Consumer)每次只能消化10个,一段时间兔子(Consumer)撑着了消化不良(消息堵塞),兔子(Consumer)拒绝继续吃,此时萝卜丢失(数据丢失),这个时候在兔子(Consumer)和生产者(producer)中间放个麻袋(Broker)存放萝卜,兔子(Consumer)吃完就去麻袋(Broker)里拿,这样萝卜永远都在麻袋(Broker)里不会丢失。
Topic
Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)。
Producer
发布消息的对象称之为主题生产者(Kafka topic producer)
生产者往某个Topic上发布消息。生产者也负责选择发布到Topic上的哪一个分区。最简单的方式从分区列表中轮流选择。也可以根据某种算法依照权重选择分区。开发者负责如何选择分区的算法。
Consumer
订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
每个Consumer实例都属于一个消费者组(consumer group),多个Consumer实例可以存在于不同的进程或机器上(Consumer实例可类比于java类的实例对象),一个消息记录只会发送给有对应主题订阅的消费者组中的一个Consumer实例!
在一个消费者组中,每个分区至多只能发送到同一消费者的一个实例上,但一个消费者实例可以消费多个分区,因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息,所以分区(partition)数必须大于等于消费者组中的实例数量。下图中,具有2个server的kafka集群,拥有同一个topic的4个分区,并对接2个消费者组,如果A或B组中Consumer都是同一消费者的实例,则轮询均衡消费,若同组都是不同的消费者实例,则相当于广播消息
Broker
已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
partition(分区)
partition(分区)是kafka的一个核心概念,kafka将1个topic分成了一个或多个分区,每个分区在物理上对应一个目录,分区目录下存储的是该分区的日志段(segment),包括日志的数据文件和两个索引文件。然后每个分区又对应一个或多个副本,由一个ISR列表来维护。 注意:分区数可以大于节点数,但是副本数不能大于节点数,因为副本需要分不到不同的节点上,才能达到备份的目的。
生产者写入partition
生产者在向某个主题发送消息时,会根据分配策略将消息发送到对应的分区,比如可以指定某个partiton by的key值,通过对该key值进行哈希决定写入哪个partition,这种策略要注意可能会出现的热点数据问题;如果不指定分区的key值,则默认以轮询的方式均匀的写入到各个分区。
kafka保证同一个分区内的数据是有序的,我们也可以认为一个分区就是一个有序的消息队列。
消费者与partition
每个主题的某一个分区只能被同一个消费组下的其中一个消费者消费,因此我们可以说分区是消费并行度的基本单位。从消费者的角度讲,我们订阅消费了一个主题,也就订阅了该主题的所有分区。
扩容
消息队列满了,其实就是麻袋满了,”萝卜“ 放不下了,那赶紧多放几个麻袋,其实就是kafka的扩容。
防止数据丢失
producr端可以通过设置request.required.acks参数来保证数据不丢失,
1)asks = 0时,只要消息发送成功就会发送下一条数据,吞吐量最高,但这种情况即使数据丢失我们也无法知道;
2)asks = 1时,消息发送成功,并且leader接收成功后才会发送下一条数据,这种情况如果leader刚接收到数据,还没有同步到follower时,假如leader节点挂掉也会导致数据的丢失;
3)asks = -1时,消息发送成功,要等待leader把消息同步到follower之后才会发送下一条数据,吞吐量最低,但最可靠。但是会产生一个数据重复,当Leader在同步的过程中挂掉了,没有给到应答给生产者,这时候生产者就会认为数据没有传给Leader,但其实这个时候已经是有一份数据,再来一份数据就重复了;
数据一致性
ack机制
为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据
何时发送ack?
确保有follower与leader同步完成,leader再发送ack,这样才能保证leader挂掉之后,能在follower中选举出新的leader
isr机制
Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间 未 向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 lead
kafka四个核心
.\bin\windowska-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test\kaf
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
.\bin\windows\kafka-producer-perf-test.bat --topic test --num-records 100000 --record-size 1000 --throughput 2000 --producer-props bootstrap.servers=192.168.0.243:9092
.\bin\windows\kafka-consumer-perf-test.bat --broker-list 192.168.0.243:9092 --topic test --fetch-size 1048576 --messages 5000000
每秒9.3W条记录 | |
吞吐速率 | 每秒约89MB数据 |
平均延迟时间 | 346.62 ms |
最大延迟时间 | 1003.00 ms |
data.consumed.in.MB 共计消费的数据 | 4768.3716MB |
MB.sec 每秒消费的数量 | 每秒445MB |
data.consumed.in.nMsg 共计消费的数量 | 5000000 |
nMsg.sec 每秒的数量 | 每秒46.7W条 |
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer<String, String> producer = new KafkaProducer<>(props);
- for(int i = 0; i < 100; i++)
- producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
- producer.close();
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", "localhost:9092");
- props.setProperty("group.id", "test");
- props.setProperty("enable.auto.commit", "true");
- props.setProperty("auto.commit.interval.ms", "1000");
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList("foo", "bar"));
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records)
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- props.put("security.protocol", "SSL");
- props.put("ssl.truststore.password", "test1234");
- props.put("ssl.truststore.location","/Users/weiwei/Downloads/client.truststore.jks");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。