赞
踩
在使用 Pulsar 过程中任何节点都有可能出现异常甚至宕机,当 Producer 生产消息时,pulsar 集群可能会发生 Broker 或 Bookie 异常不可用,或者网络突然中断等异常情况。根据在发生异常时 Producer 处理消息的方式,系统可以具备以下三种消息语义。
Producer 通过接收 Broker 的 ACK (消息确认)通知来确保消息成功写入 Pulsar Topic。然而,当 Producer 接收 ACK 通知超时,或者收到 Broker 出错信息时,会尝试重新发送消息。如果 Broker 正好在成功把消息写入到 Topic,但还没有给 Producer 发送 ACK 时宕机,Producer 重新发送的消息会被再次写入到 Topic,最终导致消息被重复分发至 Consumer。
当 Producer 在接收 ACK 超时,或者收到 Broker 出错信息时不重发消息,那就有可能导致这条消息丢失,没有写入到 Topic 中,也不会被 Consumer 消费到。在某些场景下,为了避免发生重复消费,我们可以容许消息丢失的发生。
Exactly-once 语义保证了即使 Producer 多次发送同一条消息到服务端,服务端也仅仅会记录一次。
pulsar 中 Exactly-once 语义并不包含 consumer 端只消费一次的场景。因为真正意义上的Exactly-Once依赖消息系统的服务端、消息系统的客户端和用户消费逻辑这三者状态的协调。消息系统不可能保证 consumer 只会接收一次消息,在 consumer 由于网络等原因未及时 ack 消息时,pulsar broker 就会重复投递消息给 consumer,这时候需要做的是保证 consumer 消费幂等。可能使用 “有效一次” 来描述更恰当些。
从 Pulsar 1.20.0-incubating 版本开始可以通过幂等性 Producer 和 pulsar server 端消息去重来保证单个 Topic 上的 Exactly-once 语义。
什么是幂等性 Producer ?幂等性就是指对于同一操作发起的一次或者多次请求的结果是一致的,不会因为多次操作而产生不同的结果。当出现由于异常导致 Producer 重发消息时,重复的消息只会在 Broker 中写入一次。
可以通过以下方式来开启消息去重和设置幂等性 producer:
bin/pulsar-admin namespaces set-deduplication \
public/default \
--enable # or just -e
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer producer = pulsarClient.newProducer()
.producerName("producer-1")
.topic("persistent://public/default/topic-1")
.sendTimeout(0, TimeUnit.SECONDS)
.create();
实现原理:每条发送给 Pulsar 的消息都会带有一个唯一的序列号,Pulsar Broker 利用这个序列号来判断和去除重复的消息,当接收的消息的 sequenceId 小于等于 pulsar 记录的最大 sequenceId,即为重复消息。 Pulsar 会把消息体中的序列号保存到 Topic 中,并且记录最新接收到的序列号。所以哪怕 Broker 节点出现异常宕机了,另一个重新接管处理该 Topic 的 Broker 节点也可以判断消息是否重复。
幂等性 Producer 只能保证单个 topic 上的 exactly-once 语义,当一条消息要发送到多个 topic 时,就不能保证多个操作的原子性。
Pulsar 2.8.0 引入事务消息,我们可以通过事务 API 来实现多个 topic 发送消息的原子操作,要么都成功要么都失败。也可以在一个事务中对多个 topic 上的消息进行 ACK 确认。
在流处理系统中,常见的操作是 read-process-write。即从一个或多个 topic 中读取消息,进过程序加工处理后得出结果,最后把结果写入另一个 topic。在这个过程中如果不使用事务消息,就有可能出现结果重复或者消息丢失的情况。
如果执行流程是 producer 先发送消息,然后 consumer 再 ACK 消息,程序在 ACK 前发生异常,consumer 未 ACK 成功,程序恢复后会再次消费消息,发送新计算的结果给 topic,这就会导致消息重复。
如果执行流程是 consumer 先 ACK 消息,然后 producer 再发送消息,程序在 producer 发送前发生异常,程序恢复后由于消息已经 ACK ,消息将不会再次消费,这就会导致消息丢失问题。
利用事务消息实现端到端的 Exactly-once 语义。
//start PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .enableTransaction(true) .build(); Transaction txn = pulsarClient .newTransaction() .withTransactionTimeout(1, TimeUnit.MINUTES) .build() .get(); //receive Message<String> message1 = consumer1.receive(); Message<String> message2 = consumer2.receive(); //process String result = message1.getData() + " " + message1.getData(); System.out.println(result); //publish producer.newMessage(txn).value(result.getBytes()).send(); //ack consumer1.acknowledgeAsync(message1.getMessageId(), txn).get(); consumer2.acknowledgeAsync(message2.getMessageId(), txn).get(); //commit txn.commit().get();
事务协调器(TC)是运行在 Pulsar Broker 中的一个模块。
所有事务元数据都保存在事务日志中。 事务日志由 Pulsar 主题记录。 如果事务协调器崩溃,它可以从事务日志恢复事务元数据。
向事务内的主题分区生成的消息存储在该主题分区的事务缓冲区(TB)中。 在提交事务之前,事务缓冲区中的消息对消费者不可见。 提交事务后,事务缓冲区中的消息对消费者可见。 当事务中止时,事务缓冲区中的消息将被丢弃。
事务ID(TxnID)标识Pulsar中的唯一事务。 事务 ID 长度是 128-bit。 最高 16 位保留给事务协调器的 ID,其余位用于每个事务协调器中单调递增的数字。
待确认状态是消息 ack 后,事务未 commit 前的状态。
1、https://segmentfault.com/a/1190000041013432
2、https://pulsar.apache.org/docs/zh-CN/txn-how/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。