赞
踩
点对点模式
消息发送者生产消息发送到消息队列中,然后消息接收者从消息队列中取出并且消费消息。消息被消费以后,消息队列中不再有存储,所以消息接收者不可能消费到已经被消费的消息。
点对点模式特点:
发布订阅模式
发布/订阅模式特点:
Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:
发布和订阅流数据流,类似于消息队列或者是企业消息传递系统
以容错的持久化方式存储数据流
处理数据流
Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。
Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。
Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到数据库中。
Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
所属社区/公司 | Apache | Mozilla Public License | Apache | Apache/Ali |
成熟度 | 成熟 | 成熟 | 成熟 | 比较成熟 |
生产者-消费者模式 | 支持 | 支持 | 支持 | 支持 |
发布-订阅 | 支持 | 支持 | 支持 | 支持 |
REQUEST-REPLY | 支持 | 支持 | - | 支持 |
API完备性 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持JAVA优先 | 语言无关 | 支持,JAVA优先 | 支持 |
单机呑吐量 | 万级(最差) | 万级 | 十万级 | 十万级(最高) |
消息延迟 | - | 微秒级 | 毫秒级 | - |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 高 |
消息丢失 | - | 低 | 理论上不会丢失 | - |
消息重复 | - | 可控制 | 理论上会有重复 | - |
事务 | 支持 | 不支持 | 支持 | 支持 |
文档的完备性 | 高 | 高 | 高 | 中 |
提供快速入门 | 有 | 有 | 有 | 无 |
首次部署难度 | - | 低 | 中 | 高 |
ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。
PS:Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据
服务器概念
一个Kafka的集群通常由多个broker组成,这样才能实现负载均衡、以及容错
broker是**无状态(Sateless)**的,它们是通过ZooKeeper来维护集群状态
一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能
主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制
在主题中的消息是有结构的,一般一个主题包含某一类消息
一旦生产者发送消息到主题中,这些消息就不能被更新(更改)
生产者负责将数据推送给broker的topic
消费者负责从broker的topic中拉取数据,并自己进行处理
在Kafka集群中,主题被分为多个分区。
生产者生产消息时,如果出现retry时,有可能会一条消息被发送了多次,如果Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。
为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。
PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。
Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。
Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。(consumer-transform-producer模式)
Producer(生产者)接口中定义了以下5个事务相关方法:
initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
beginTransaction(开始事务):启动一个Kafka事务。
sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交。
commitTransaction(提交事务):提交事务。
abortTransaction(放弃事务):取消事务
生产者:
// 配置事务的id,开启了事务会默认开启幂等性
props.put("transactional.id", "first-transactional");
消费者:
// 1. 消费者需要设置隔离级别
props.put("isolation.level","read_committed");
// 2. 关闭自动提交
props.put("enable.auto.commit", "false");
代码:
public static void main(String[] args) { Consumer<String, String> consumer = createConsumer(); Producer<String, String> producer = createProducer(); // 初始化事务 producer.initTransactions(); while(true) { try { // 1. 开启事务 producer.beginTransaction(); // 2. 定义Map结构,用于保存分区对应的offset Map<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>(); // 2. 拉取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord<String, String> record : records) { // 3. 保存偏移量 offsetCommits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); // 4. 进行转换处理 String[] fields = record.value().split(","); fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女"; String message = fields[0] + "," + fields[1] + "," + fields[2]; // 5. 生产消息到dwd_user producer.send(new ProducerRecord<>("dwd_user", message)); } // 6. 提交偏移量到事务 producer.sendOffsetsToTransaction(offsetCommits, "ods_user"); // 7. 提交事务 producer.commitTransaction(); } catch (Exception e) { // 8. 放弃事务 producer.abortTransaction(); } } }
生产者分区写入策略:
乱序问题
轮询策略、随机策略都会导致一个问题,生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储——也就是局部有序,但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。
轮训分区
默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区
如果在生产消息时,key为null,则使用轮询算法均衡地分配分区
随机策略(不用)
随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以基本上很少会使用随机策略。
按key分配策略
按key分配策略,有可能会出现「数据倾斜」,例如:某个key包含了大量的数据,因为key值一样,所有所有的数据将都分配到一个分区中,造成该分区的消息数量远大于其他的分区。
自定义分区策略
Kafka中的Rebalance称之为再均衡,是Kafka中确保Consumer group下所有的consumer如何达成一致,分配订阅的topic的每个分区的机制。
Rebalance的不良影响
Range范围分配策略
Range范围分配策略是Kafka默认的分配策略,它可以确保每个消费者消费的分区数量是均衡的。
注意:Rangle范围分配策略是针对每个Topic的。
RoundRobin轮询策略
RoundRobinAssignor轮询策略是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。
Stricky粘性分配策略
主要目的:
分区分配尽可能均匀。
在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同。
没有发生rebalance时,Striky粘性分配策略和RoundRobin分配策略类似。如果consumer2崩溃了,此时需要进行rebalance。如果是Range分配和轮询分配都会重新进行分配。
粘性特点:
Striky粘性分配策略,保留rebalance之前的分配结果。这样,只是将原先consumer2负责的两个分区再均匀分配给consumer0、consumer1。这样可以明显减少系统资源的浪费,例如:之前consumer0、consumer1之前正在消费某几个分区,但由于rebalance发生,导致consumer0、consumer1需要重新消费之前正在处理的分区,导致不必要的系统开销。(例如:某个事务正在进行就必须要取消了)
副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。因为在其他的Broker上的副本是可用的。
对副本关系较大的就是,producer配置的acks参数了,acks参数表示当生产者生产消息的时候,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。
Properties props = new Properties();
props.put("bootstrap.servers", "node1.itcast.cn:9092");
props.put("acks", "all");
ACK=0:
ACK=1:
当生产者的ACK配置为1时,生产者会等待leader副本确认接收后,才会发送下一条数据,性能中等。
ACK=-1/all
ACK=1:
ACK= -1/all:
指标 | 单分区单副本(ack=0) | 单分区单副本(ack=1) | 单分区单副本(ack=-1/all) |
---|---|---|---|
吞吐量 | 165875.991109/s每秒16.5W条记录 | 93092.533979/s 每秒9.3W条记录 | 73586.766156 /s每秒7.3W调记录 |
吞吐速率 | 158.19 MB/sec | 88.78 MB/sec | 70.18 MB |
平均延迟时间 | 192.43 ms | 346.62 ms | 438.77 ms |
最大延迟时间 | 670.00 ms | 1003.00 ms | 1884.00 ms |
高级API
缺点
低级API
通过使用低级API,我们可以自己来控制offset,想从哪儿读,就可以从哪儿读。而且,可以自己控制连接分区,对分区自定义负载均衡。而且,之前offset是自动保存在ZK中,使用低级API,我们可以将offset不一定要使用ZK存储,我们可以自己来存储offset。例如:存储在文件、MySQL、或者内存中。但是低级API,比较复杂,需要执行控制offset,连接到哪个分区,并找到分区的leader。
两种日志清理方式:
日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。
日志压缩(Log Compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本。
在Kafka的broker或topic配置中
配置项 | 配置值 | 说明 |
---|---|---|
log.cleaner.enable | true(默认) | 开启自动清理日志功能 |
log.cleanup.policy | delete(默认) | 删除日志 |
log.cleanup.policy | compaction | 压缩日志 |
log.cleanup.policy | delete,compact | 同时支持删除、压缩 |
日志删除是以段(segment日志)为单位来进行定期清理的。
定时删除
Kafka日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件。
这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。
日志分段的保留策略有3种
指定如果Kafka中的消息超过指定的阈值,就会将日志进行自动清理:
优先级为 log.retention.ms > log.retention.minutes > log.retention.hours。
删除日志分段时:
日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。可以通过broker端参数 log.retention.bytes 来配置,默认值为-1,表示无穷大。如果超过该大小,会自动将超出部分删除。
注意:
log.retention.bytes 配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。
每个segment日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。
Log Compaction是默认的日志删除之外的清理过时数据的方式。它会将相同的key对应的数据只保留一个版本。
生产者和消费者以极高的速度生产/消费大量数据或产生请求,从而占用broker上的全部资源,造成网络IO饱和。有了配额(Quotas)就可以避免这些问题。Kafka支持配额管理,从而可以对Producer和Consumer的produce&fetch操作进行流量限制,防止个别业务压爆服务器。
为所有client id设置默认值,以下为所有producer程序设置其TPS不超过1MB/s,即1048576/s。
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default
对consumer限速与producer类似,只不过参数名不一样。
为指定的topic进行限速,以下为所有consumer程序设置topic速率不超过1MB/s,即1048576/s。
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default
使用以下命令,删除Kafka的Quota配置
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --delete-config 'producer_byte_rate' --entity-type clients --entity-default
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --delete-config 'consumer_byte_rate' --entity-type clients --entity-default
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/userGets")
public Object gets() {
// send 第一个参数为topic的名称,第二个参数为我们要发送的信息
kafkaTemplate.send("topic.quick.default","1231235");
return "发送成功";
}
@KafkaListener(topics = {"topic1"})
public void onMessage(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}
@KafkaListener(topics = {"topic2"})
public void getMessage(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。