赞
踩
kafka 是最初由 linkedin 公司开发的,使用 scala 语言编写,kafka 是一个分布式,分区的,多副本的,
多订阅者的日志系统(分布式 MQ 系统),可以用于搜索日志,监控日志,访问日志等。
kafka 目前支持多种客户端的语言:java、python、c++、php 等
apache kafka 是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使能够将
消息从一个端点传递到另一个端点,kafka 适合离线和在线消息消费。kafka 消息保留在磁盘上,并在
集群内复制以防止数据丢失。kafka 构建在 zookeeper 同步服务之上。它与 apache 和 spark 非常好的
集成,应用于实时流式数据分析。
RabbitMQ
Redis
ZeroMQ
ActiveMQ
可靠性:分布式的,分区,复制和容错的。
可扩展性:kafka 消息传递系统轻松缩放,无需停机。
耐用性:kafka 使用分布式提交日志,这意味着消息会尽可能快速的保存在磁盘上,因此它是
持久的。
性能:kafka 对于发布和定于消息都具有高吞吐量。即使存储了许多 TB 的消息,他也爆出稳
定的性能。
kafka 非常快:保证零停机和零数据丢失。
kafka 通常用于操作监控数据。这设计聚合来自分布式应用程序的统计信息, 以产生操作的数据
集中反馈
kafka 可用于跨组织从多个服务器收集日志,并使他们以标准的合适提供给多个服务器。
流式处理框架(spark,storm,flflink)重主题中读取数据,对齐进行处理,并将处理后的数据写入新
的主题,供 用户和应用程序使用,kafka 的强耐久性在流处理的上下文中也非常的有用。
生产者API
允许应用程序发布记录流至一个或者多个 kafka 的主题(topics)。
消费者API
允许应用程序订阅一个或者多个主题,并处理这些主题接收到的记录流。
StreamAPI
允许应用程序充当流处理器(stream processor),从一个或者多个主题获取输入流,并生产一个输出
流到一个或 者多个主题,能够有效的变化输入流为输出流。
ConnectorAPI
允许构建和运行可重用的生产者或者消费者,能够把 kafka 主题连接到现有的应用程序或数据系统。
例如:一个连 接到关系数据库的连接器可能会获取每个表的变化。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mMeWVYHN-1656401089030)(…/…/…/AppData/Roaming/Typora/typora-user-images/image-20220623165612253.png)]
说明:kafka 支持消息持久化,消费端为拉模型来拉取数据,消费状态和订阅关系有客户端负责维护,消
息消费完 后,不会立即删除,会保留历史消息。因此支持多订阅时,消息只会存储一份就可以了。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fJbeuDdy-1656401089031)(…/…/…/AppData/Roaming/Typora/typora-user-images/image-20220623165825047.png)]
一个典型的 kafka 集群中包含若干个 Producer,若干个 Broker,若干个 Consumer,以及一个
zookeeper 集群; kafka 通过 zookeeper 管理集群配置,选举 leader,以及在 Consumer Group
发生变化时进行 Rebalance(负载均 衡);Producer 使用 push 模式将消息发布到 Broker;
Consumer 使用 pull 模式从 Broker 中订阅并消费消息。
Broker:kafka 集群中包含一个或者多个服务实例,这种服务实例被称为 Broker
Topic:每条发布到 kafka 集群的消息都有一个类别,这个类别就叫做 Topic
Partition:Partition 是一个物理上的概念,每个 Topic 包含一个或者多个 Partition
Producer:负责发布消息到 kafka 的 Broker 中。
Consumer:消息消费者,向 kafka 的 broker 中读取消息的客户端
Consumer Group:每一个 Consumer 属于一个特定的 Consumer Group(可以为每个 Consumer
指定 groupName)
kafka 将消息以 topic 为单位进行归类
topic 特指 kafka 处理的消息源(feeds of messages)的不同分类。
topic 是一种分类或者发布的一些列记录的名义上的名字。kafka 主题始终是支持多用户订阅的;
也就是说,一 个主题可以有零个,一个或者多个消费者订阅写入的数据。
在 kafka 集群中,可以有无数的主题。
生产者和消费者消费数据一般以主题为单位。更细粒度可以到分区级别。
Partitions**:分区数**
Partitions:分区数:控制 topic 将分片成多少个 log,可以显示指定,如果不指定则会使用
一个 broker 服务下,是否可以创建多个分区?
可以的,broker 数与分区数没有关系; 在
kafka 中,每一个分区会有一个编号:编号从 0 开始
某一个分区的数据是有序的
说明-数据是有序 如何保证一个主题下的数据是有序的?(生产是什么样的顺序,那么消费的时
候也是什么样的顺序)
topic 的 Partition 数量在创建 topic 时配置。
Partition 数量决定了每个 Consumer group 中并发消费者的最大数量。
Consumer group A 有两个消费者来读取 4 个 partition 中数据;Consumer group B 有四个
消费者来读取 4 个 partition 中的数据
kafka 分区副本数(kafka Partition Replicas)
副本数(replication-factor)
副本数(replication-factor):控制消息保存在几个 broker(服务器)上,一般情况下等于broker 的个数
一个 broker 服务下,是否可以创建多个副本因子?
不可以;创建主题时,副本因子应该小于等于可用的 broker 数。
副本因子操作以分区为单位的。每个分区都有各自的主副本和从副本;主副本叫做 leader,
从副本叫做 follower(在有多个副本的情况下,kafka 会为同一个分区下的分区,设定角色
关系:一个 leader 和 N 个 follower),处于同步状态的副本叫做 in-sync
replicas(ISR);follower 通过拉的方式从 leader 同步数据。消费 者和生产者都是从 leader 读写
数据,不与 follower 交互。
副本因子的作用:让 kafka 读取数据和写入数据时的可靠性。
副本因子是包含本身|同一个副本因子不能放在同一个 Broker 中。
如果某一个分区有三个副本因子,就算其中一个挂掉,那么只会剩下的两个钟,选择一个
leader,但不会在其 他的 broker 中,另启动一个副本(因为在另一台启动的话,存在数据传
递,只要在机器之间有数据传递,就 会长时间占用网络 IO,kafka 是一个高吞吐量的消息系
统,这个情况不允许发生)所以不会在零个 broker 中启 动。
如果所有的副本都挂了,生产者如果生产数据到指定分区的话,将写入不成功。
lsr 表示:当前可用的副本
任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称为
offffset(偏移量),
offffset 是一个 long 类型数字,它唯一标识了一条消息,消费者通过(offffset,partition,topic)跟
踪记录
消费组: 由一个或者多个消费者组成,同一个组中的消费者对于同一条消息只消费一次
某一个主题下的分区数,对于消费组来说,应该小于等于该主题下的分区数。如下所示:
如:某一个主题有 4 个分区,那么消费组中的消费者应该小于 4,而且最好与分区数成整数倍
1 2 4
同一个分区下的数据,在同一时刻,不能同一个消费组的不同消费者消费
总结:分区数越多,同一时间可以有越多的消费者来进行消费,消费数据的速度就会越快,提高消费的性能
192.168.100.100 node01
192.168.100.110 node02
192.168.100.120 node03
安装jdk、安装zookeeper
http://archive.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
将下载好的安装包上传到 node01 服务器的/export/softwares 路径下,然后进行解压
node01 执行以下命令进行解压安装包
cd /export/softwares
tar -zxvf kafka_2.11-0.10.0.0.tgz -C /export/servers/
node01 执行以下命令进入到 kafka 的配置文件目录,修改配置文件
cd /export/servers/kafka_2.11-0.10.0.0/config
vim server.properties
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/export/servers/kafka_2.11-0.10.0.0/logs num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=node01:2181,node02:2181,node03:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true host.name=node01
node01 执行以下命令创建数据文件存放目录
mkdir -p /export/servers/kafka_2.11-0.10.0.0/logs
node01 执行以下命令,将 node01 服务器的 kafka 安装包发送到 node02 和 node03 服务器上面去
cd /export/servers/
scp -r kafka_2.11-0.10.0.0/ node02:$PWD
scp -r kafka_2.11-0.10.0.0/ node03:$PWD
node02 与 node03 服务器修改 kafka 配置文件
node02 使用以下命令修改 kafka 配置文件
cd /export/servers/kafka_2.11-0.10.0.0/con·fig
vim server.properties
broker.id=1 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/export/servers/kafka_2.11-0.10.0.0/logs num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=node01:2181,node02:2181,node03:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true host.name=node02
node03 使用以下命令修改 kafka 配置文件
cd /export/servers/kafka_2.11-0.10.0.0/config
vim server.properties
broker.id=2 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/export/servers/kafka_2.11-0.10.0.0/logs num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=node01:2181,node02:2181,node03:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true host.name=node03
注意事项: 在kafka启动前,一定要让zookeeper启动起来。
node01 服务器执行以下命令来启动 kafka 集群
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-server-start.sh config/server.properties
node01 执行以下命令将 kafka 进程启动在后台
cd /export/servers/kafka_2.11-0.10.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
node01 执行以下命令便可以停止 kakfa 进程
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-server-stop.sh
创建了一个名字为 test 的主题, 有三个分区,有两个副本
node01 执行以下命令来创建 topic
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --parrtitions 3 --topic test
查看 kafka 当中存在的主题
node01 使用以下命令来查看 kafka 当中存在的 topic 主题
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 --topic test
模拟生产者来生产数据
node01 服务器执行以下命令来模拟生产者进行生产数据
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092
node2服务器执行一下命令来模拟消费者进行消费数据
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-console-consumer.sh --from-beginning --topic test --zookeeper node01:2181,node02:2181,node03:2181
node01 执行以下命令运行 describe 查看 topic 的相关信息
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
任意 kafka 服务器执行以下命令可以增加 topic 分区数
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
增加配置
动态修改kafka的配置
任意kafka服务器执行以下命令可以增加topic分区数
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1
删除配置
动态删除kafka集群配置
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages
删除topic
目前删除 topic 在默认情况下知识打上一个删除的标记,在重新启动 kafka 后才删除。如果需要立即删除,则需要在
server.properties 中配置:
delete.topic.enable=true
然后执行以下命令进行删除 topic
kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName
数据从生产-消费-提交 offffset 过程,分以下几个阶段来进行
生产者是一个向kafka Cluster发布记录的客户端;生产者是线程安全的,跨线程共享单个生产者实例通常比具有多个实例更快。
生产者要进行生产数据到kafka Cluster中,必要条件有以下三个:
# 1. 地址
bootstrap.servers=node01:9092
# 2. 序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
流程描述:
1、总体流程:
Producer 连接任意活着的 Broker,请求指定 Topic,Partion 的 Leader 元数据信息,然后直接与对应的 Broker 直接连接,发布数据
2、开放分区接口(生产者数据分发策略):
2.1、用户可以指定分区函数,使得消息可以根据 key,发送到指定的 Partition 中。
2.2、kafka 在数据生产的时候,有一个数据分发策略。默认的情况使用 DefaultPartitioner.class 类。
这个类中就定义数据分发的策略。
2.3、如果是用户制定了 partition,生产就不会调用 DefaultPartitioner.partition()方法
2.4、当用户指定 key,使用 hash 算法。如果 key 一直不变,同一个 key 算出来的 hash 值是个固定值。如果是固定 值,这种 hash 取模就没有意义
Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
2.5 、 当用既没有指定 partition 也没有 key。
2.6、数据分发策略的时候,可以指定数据发往哪个 partition。当 ProducerRecord 的构造参数中有 partition 的时 候,就可以发送到对应 partition 上。
生产者数据分发策略有如下四种:(总的来说就是调用了一个方法,参数不同而已)
// 可根据主题和内容发送
public ProduceRecord(String topic,V value)
// 可根据主题,key、内容发送
public ProducerRecord(String topic, K key V value)
// 根据主题、分区、key、内容发送
public ProducerRecord(String topic,Integer partition , K key , V value)
// 根据主题、分区、时间戳、key、内容发送
public ProducerRecord(String topic,Integer partition,Locng timestamp,K key , V value)
a、可根据主题和内容发送
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//可根据主题和内容发送
producer.send(new ProducerRecord<String, String>("my-topic","具体的数据"));
b、根据主题,key、内容发送
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//可根据主题、key、内容发送
producer.send(new ProducerRecord<String, String>("my-topic","key","具体的数据"));
c、根据主题、分区、key、内容发送
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//可根据主题、分区、key、内容发送
producer.send(new ProducerRecord<String, String>("my-topic",1,"key","具体的数据"));
d、根据主题、分区、时间戳、key,内容发送
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//可根据主题、分区、时间戳、key、内容发送
producer.send(new ProducerRecord<String, String>("my-topic",1,12L,"key","具体的数据"));
消费者是一个从 kafka Cluster 中消费数据的一个客户端;该客户端可以处理 kafka brokers 中的故障问题,并且可以适应在集群内的迁移的 topic 分区;该客户端还允许消费者组使用消费者组来进行负载均衡。
消费者维持一个 TCP 的长连接来获取数据,使用后未能正常关闭这些消费者问题会出现,因此消费者不是线程安全的 。
消费者要从 kafka Cluster 进行消费数据,必要条件有以下四个
# 1. 地址
bootstrap.servers=node01:9092
# 2. 序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# 3. 主题topic,需要制定具体的某个topic(order)即可.
# 4. 消费组 group.id=test
/** * 消费订单数据--- javaben.tojson */ public class OrderConsumer { public static void main(String[] args) { // 1\连接集群 Properties props = new Properties(); props.put("bootstrap.servers", "hadoop-01:9092"); props.put("group.id", "test"); //以下两行代码 ---消费者自动提交 offset 值 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String> (props); // 2、发送数据 发送数据需要,订阅下要消费的 topic。 order kafkaConsumer.subscribe(Arrays.asList("order")); while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100); // jdk queue offer插入、poll 获取元素。 blockingqueue put 插入原生, take 获取元素 for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println("消费的数据为:" + record.value()); } } } }
如果 Consumer 在获取数据后,需要加入处理,数据完毕后才确认 offffset,需要程序来
控制 offffset 的确认? 关闭自动提交确认选项
props.put("enable.auto.commit", "false");
手动提交 offffset 值
kafkaConsumer.commitSync();
完整代码如下所示:
props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); //关闭自动提交确认选项 props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); // 手动提交 offset 值 consumer.commitSync(); buffer.clear(); } }
上面的示例使用 commitSync 将所有已接收的记录标记为已提交。 在某些情况下,您
可能希望通过明确指定偏移量 来更好地控制已提交的记录。 在下面的示例中,我们在完成处理每个分区中的记录后提交偏移量。
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size()-1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
注意事项:
提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。 因此,在调用commitSync(偏移量)时,应该 在最后处理的消息的偏移量中添加一个.
1、如果进程正在维护与该分区关联的某种本地状态(如本地磁盘上的键值存储),那么它应该只获取它在磁盘上 维护的分区的记录。
2、如果进程本身具有高可用性,并且如果失败则将重新启动(可能使用 YARN,Mesos 或 AWS 工具等集群管理框 架,或作为流处理框架的一部分)。 在这种情况下,Kafka 不需要检测故障并重新分配分区,因为消耗过程将在另 一台机器上重新启动。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //consumer.subscribe(Arrays.asList("foo", "bar")); //手动指定消费指定分区的数据---start String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1)); //手动指定消费指定分区的数据---end while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
注意事项:
1、要使用此模式,您只需使用要使用的分区的完整列表调用 assign(Collection),而不
是使用 subscribe 订阅 主题。
2、主题与分区订阅只能二选一。
说明:
已经消费的数据对于 kafka 来说,会将消费组里面的 offset 值进行修改,那什么时候进行修改了?是在数据消费 完成之后,比如在控制台打印完后自动提交;
提交过程:是通过 kafka 将 offset 进行移动到下个 message 所处的 offset 的位置。
拿到数据后,存储到 hbase 中或者 mysql 中,如果 hbase 或者 mysql 在这个时候连接不上,就会抛出异常,如果在处理数据的时候已经进行了提交,那么 kafka 伤的offffset 值已经进行了修改了,但是 hbase 或者 mysql 中没有数据,这个时候就会出现数据丢失。
什么时候提交 offset 值?在 Consumer 将数据处理完成之后,再来进行 offset 的修改提交。默认情况下 offset 是 自动提交,需要修改为手动提交 offset 值。
如果在处理代码中正常处理了,但是在提交 offset 请求的时候,没有连接到 kafka 或
者出现了故障,那么该次修 改 offset 的请求是失败的,那么下次在进行读取同一个分区
中的数据时,会从已经处理掉的 offset 值再进行处理一 次,那么在 hbase 中或者 mysql
中就会产生两条一样的数据,也就是数据重复。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DgLAbgTQ-1656401089031)(…/…/…/AppData/Roaming/Typora/typora-user-images/image-20220628151941411.png)]
说明:有多少个分区,就启动多少个线程来进行同步数据
可以采用同步或者异步的方式-过程图
可以采用同步或者异步的方式
同步:发送一批数据给 kafka 后,等待 kafka 返回结果
1、生产者等待 10s,如果 broker 没有给出 ack 相应,就认为失败。
2、生产者重试 3 次,如果还没有相应,就报错
异步:发送一批数据给 kafka,只是提供一个回调函数。
1、先将数据保存在生产者端的 buffer 中。buffer 大小是 2 万条
2、满足数据阈值或者数量阈值其中的一个条件就可以发送数据。
3、发送一批数据的大小是 500 条
说明:如果 broker 迟迟不给 ack,而 buffer 又满了,开发者可以设置是否直接清空buffer 中的数据。
生产者数据不抵事,需要服务端返回一个确认码,即 ack 响应码;ack 的响应有三个状态值
0:生产者只负责发送数据,不关心数据是否丢失,响应的状态码为 0(丢失的数据,需要再次发送 )
1:partition 的 leader 收到数据,响应的状态码为 1
-1:所有的从节点都收到数据,响应的状态码为-1
说明:如果 broker 端一直不给 ack 状态,producer 永远不知道是否成功;producer可以设置一个超时时间 10s,超 过时间认为失败。
在 broker 中,保证数据不丢失主要是通过副本因子(冗余),防止数据丢失
在消费者消费数据的时候,只要每个消费者记录好 offffset 值即可,就能保证数据不丢失。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。