赞
踩
消息重复消费的问题可以通过多种方法解决,主要包括消费幂等性、消息去重、消息确认机制、消息重试机制、保证消息的顺序性以及将消息进行持久化存储。
消费幂等性
:确保在同一条消息被重复消费时,系统不会产生副作用或影响系统的正确性。这可以通过在消费端使用唯一标识来判断消息是否已经被消费过,例如使用数据库的唯一索引、使用分布式锁等方式来保证幂等性。
消息去重
:针对同一条消息的多次消费,只保留其中一次消费结果。可以在消费者端进行去重,使用缓存或数据库来记录已经处理过的消息,避免重复消费。
消息确认机制
:MQ一般提供消息确认机制,如ACK机制。消费者在成功处理一条消息后,发送ACK给MQ,表示该消息已经被成功消费。如果消费者在处理消息时发生异常或失败,可以不发送ACK,MQ会将该消息重新发送给其他消费者进行处理。
消息重试机制
:当消息处理失败时,可以将消息重新发送给MQ,由MQ重新投递给消费者进行处理。可以在消息的header中添加重试次数的标记,当达到最大重试次数后,可以将消息发送到死信队列进行处理,以避免消息的无限重试。
保证消息的顺序性
:如果消息的顺序性很重要,可以将相关消息发送到同一个分区或同一个队列中,以保证消息的顺序性。(kafka分区)
持久化机制
:为了避免消息丢失,可以将消息进行持久化存储,例如将消息存储到数据库或文件系统中。即使MQ发生故障或重启,也可以通过持久化的消息进行恢复。
这些方法可以单独或结合使用,以有效解决消息重复消费的问题,确保系统的稳定性和数据的准确性。
Kafka事务是怎么实现的?Kafka事务消息原理详解
Kafka生产者可以配置为幂等,确保相同的消息不会被重复发送。
保证在消息重发的时候,消费者不会重复处理。即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。(可以理解为在应用端做了幂等处理。即使重复消息发送过来了,也会判断是否已在处理,从而达到一条消息只会被一个处理)
消息在 MQ 中的传递,大致可以归类为下面三种:
At most once: 至多一次。消息在传递时,最多会被送达一次。是不安全的,可能会丢数据。
At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。
大部分消息队列满足的都是At least once,也就是可以允许重复的消息出现。
针对同一条消息的多次消费,只保留其中一次消费结果。可以在消费者端进行去重,使用缓存或数据库来记录已经处理过的消息,避免重复消费。
例如,使用redis缓存去重:
// 去重
public void removeRepeatProcessMessage(FsConsumer fsConsumer){
RSet<String> set = redisson.getSet("local:fs:downloadFileMsg");
if (set.add(fsConsumer.getMsgId())) {// 成功则处理,失败则抛出异常,记录到死信队列里
// 只有当消息ID被成功添加到集合时才处理消息
processMessage(fsConsumer);
} else {
// 去重处理
// 如果消息ID已存在于集合中,则表示该消息已处理,跳过
System.out.println("Message with ID " + fsConsumer.getMsgId() + " has already been processed.");
}
}
当消息处理失败时,可以将消息重新发送给MQ,由MQ重新投递给消费者进行处理。可以在消息的header中添加重试次数的标记,当达到最大重试次数后,可以将消息发送到死信队列进行处理,以避免消息的无限重试。
@Configuration public class KafkaConfig { @Bean public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); //最大重试三次 ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template); // 设置重试间隔 10秒, 重试次数为 1 次 BackOff backOff = new FixedBackOff(10 * 1000L, 3L); // 失败进入死信,topic和group,都变成${topic}.DLT和${group}.DLT ,如下是进入死信的例子: /* # bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID myGroup.DLT myTopic.DLT 0 4 4 4 test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3 test-0 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID myGroup myTopic 0 800020 800020 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0 myGroup myTopic 1 400012 400012 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0 myGroup myTopic 2 300000 300000 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0 */ factory.setCommonErrorHandler(new DefaultErrorHandler(recoverer,backOff)); return factory; } }
Kafka的消费者分区策略
一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定哪个partition由哪个consumer来消费。Kafka提供了3种消费者分区分配策略:RangeAssigor
、RoundRobinAssignor
、StickyAssignor
。
术语:
partition - 分区
RangeAssignor对每个Topic进行独立的分区分配。对于每一个Topic,首先对分区按照分区ID进行排序,然后订阅这个Topic的消费组的消费者再进行排序,之后尽量均衡的将分区分配给消费者。这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,那么有一些消费者就会多分配到一些分区。分配示意图如下:
T0 是Topic-0
T1 是Topic-1
RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。
StickyAssignor分区分配算法,目的是在执行一次新的分配时,能在上一次分配的结果的基础上,尽量少的调整分区分配的变动,节省因分区分配变化带来的开销。Sticky是“粘性的”,可以理解为分配结果是带“粘性的”——每一次分配变更相对上一次分配做最少的变动。其目标有两点:
分区的分配尽量的均衡。
每一次重分配的结果尽量与上一次分配结果保持一致。
当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出StickyAssignor特性的。
StickyAssignor算法比较复杂,下面举例来说明分配的效果(对比RoundRobinAssignor),前提条件:
有4个Topic:T0、T1、T2、T3,每个Topic有2个分区。
有3个Consumer:C0、C1、C2,所有Consumer都订阅了这4个分区。
Kafka 不直接提供一种机制来确认消费者是否成功消费了消息。但是,它提供了几种策略来保证消息的成功处理:
使用 Kafka 的自动提交功能:可以配置消费者自动定期提交消息的偏移量。
手动提交偏移量:消费者可以在处理完消息后手动提交偏移量,表明该消息已被成功处理。
使用 Kafka 事务:在支持事务的 Kafka 集群上,可以开启事务来保证消费者处理消息的全部成功或失败。
以下是一个简单的示例,展示如何在消费者中手动提交偏移量:
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Arrays; import java.util.Properties; public class ManualOffsetCommit { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { // 处理消息 System.out.println(record.value()); // 处理完后提交当前偏移量 consumer.commitSync(); } } } }
死信,即Dead Letter,是指在Kafka中无法消费的消息。当消息因为以下原因而无法被消费时,可能会变成死信:
为了处理死信,Kafka提供了几种策略:
以下是一个示例,演示如何设置Kafka消费者,以便将死信消息发送到一个特定的主题:
@Bean DefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) { // 失败进入死信,topic和group,都变成${topic}.DLT和${group}.DLT ,如下是进入死信的例子: /* # bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID myGroup.DLT myTopic.DLT 0 4 4 0 test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3 test-0 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID myGroup myTopic 0 800020 800020 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0 myGroup myTopic 1 400012 400012 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0 myGroup myTopic 2 300000 300000 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0 */ // DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, // (r, e) -> { // //死信的结果发送到特定的主题 // return new TopicPartition(r.topic()+".DLT", r.partition()); // // }); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); // ErrorHandler errorHandler = new FallbackBatchErrorHandler(recoverer, new FixedBackOff(0L, 2L)); return new DefaultErrorHandler(recoverer,new FixedBackOff(10*1000L, 2L)); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(DefaultErrorHandler defaultErrorHandler) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 设置消费者工厂 factory.setConsumerFactory(consumerFactory()); // 消费者组中线程数量 factory.setConcurrency(3); // 拉取超时时间 factory.getContainerProperties().setPollTimeout(3000); // 当使用批量监听器时需要设置为true factory.setBatchListener(true); factory.setCommonErrorHandler(defaultErrorHandler); return factory; }
触发死信的业务代码
@Service public class UserConsumerService { @KafkaListener(topics = {"myUser"},groupId = "myUserGroup", containerFactory="kafkaListenerContainerFactory") // public void kafkaListener(String message){ public void kafkaListener(List<ConsumerRecord<String, String>> recordList){ System.out.println("消费列表:"+recordList.size()); for (ConsumerRecord<String, String> consumerRecord : recordList) { kafkaListener(consumerRecord); } } public void kafkaListener(ConsumerRecord<String, String> record){ String key = record.key().toString(); String value = record.value().toString(); System.out.println(record.offset()+" \t "+key+" \t "+ value); if(value.contains("abc")){ System.out.println("存在消费abc"); throw new RuntimeException("存在消费abc"); } } }
// transactional.id = transactionId
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionId");
保证事务原子性操作
事务期间,会发送produce消息到kafka服务器上的Topic,这是未提交状态的消息,你可以看到Topic上多了几条消息。
事务回滚后,消息还是会存在的。所以comsumer需要使用读已提交的方式获取。
以下是 Producer 事务使用示例:
Properties props = new Properties(); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("client.id", "ProducerTranscationnalExample"); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "test-transactional"); props.put("acks", "all"); KafkaProducer producer = new KafkaProducer(props); producer.initTransactions(); try { String msg = "matt test"; producer.beginTransaction(); producer.send(new ProducerRecord(topic, "0", msg.toString())); producer.send(new ProducerRecord(topic, "1", msg.toString())); producer.send(new ProducerRecord(topic, "2", msg.toString())); producer.commitTransaction(); } catch (ProducerFencedException e1) { e1.printStackTrace(); producer.close(); } catch (KafkaException e2) { e2.printStackTrace(); producer.abortTransaction(); } producer.close();
@Transactional
public String sendForTransaction(String userNo, String jsonString) {
System.out.println("sendForTransaction方法中,是否开启事务中:"+kafkaTemplate.inTransaction());
kafkaTemplate.send("myUser","key",jsonString);
return "success";
}
通过使用隔离级别能查看到未提交的消息。
隔离级别:
配置方式:
propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));//# 读取已提交的消息
propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT));//# 读取已提交的消息
Springboot中kafka默认隔离级别是 读未提交
public class ConsumerConfig{
....
public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);
....
}
使用 Kafka 事务:在支持事务的 Kafka 集群上,可以开启事务来保证消费者处理消息的全部成功或失败。
// 自动提交事务
@Transactional
public String sendForTransaction1(String userNo, String jsonString) {
System.out.println("sendForTransaction方法中,是否开启事务中:"+kafkaTemplate.inTransaction());
kafkaTemplate.send("myUser","key",jsonString);
return "success";
}
// 回滚事务
@Transactional
public String sendForTransaction(String userNo, String jsonString) {
System.out.println("sendForTransaction方法中,是否开启事务中:"+kafkaTemplate.inTransaction());
kafkaTemplate.send("myUser","key",jsonString);
throw new RuntimeException();
}
初步:因为分区只会被一个消费者消费,没消费完就不会发送下个消息,所以发送到消费者端,实际上就同时进行事务的就只有一个,相当于同步消费。
进阶:但这样性能会很差,所以后面可以把业务操作区间的,进行隔离归类,就像id余数为1的放1分区,id余数为2的放2分区,这样大大提高了消费速度,可自定义程度高,受制于本人设计,它的瓶颈就是设计人员,如果归类好,可以不存在资源抢占问题。
瓶颈:但消息有个问题,就是一个分区每次只能消费一条记录,所以它并行处理严重依赖于分区数量,但分区数量也不是乱加的,设计不合理,会存在资源操作冲突的问题。
对比其他事务的瓶颈:
是否存在资源抢占
,如果在修改仓库那功能的话,因为存在资源抢占的话,相当于同步处理,其他线程被锁,只有一个线程在处理,这么多服务都只会有一个业务在处理,直接导致seta性能拖慢。个人建议:
消息事务就像是基金一样,稳定处理,
seata就像是股票一样,上下波动,一时快,一时慢。(若使用在合适场景会很快。)
应用场景
:如果不停的区分seata和消息事务会很累的,而且因为费用问题,还有业务一定会有资源抢占问题,所以一般都使用消息事务的方式处理。这是我个人分析。
https://kafka.apache.org/documentation/#producerconfigs
大体上分为Topic和Group,代表的是发布者与订阅者。
Topic 就像是一个数据库表,Partition就是数据库表中的分表存储的记录。
Group指的是消费者组,消费者组里有很多个消费者,消费Topic的Partition表里的消息
例如:
Topics
myOrder
myOrder.DLT
myTopic
myTopic.DLT
myUser
myUser.DLT
Consumer Groups
myGroup
myGroup.DLT
myUserGroup
myUserGroup.DLT
Topic的消息只记录数据,不记录消费记录
Group只记录消息消费情况。
Topic 与 消费组 的关系,就像是聊天室,topic是发布消息A,其他消费组都能收到消息A,消息是一对多的关系,每次增加一组消费者,都会从0offset开始读取Topic
https://blog.51cto.com/u_16213637/9850264
# 查看所有topic bin/kafka-topics.sh --bootstrap-server localhost:9092 --list -------------------- __consumer_offsets myTopic test-javasdk -------------------- # 查看topic详情 > bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:9092 -------------------- Topic: myTopic TopicId: Y_t8v2snRgmirrGykcLYmg PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: myTopic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: myTopic Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: myTopic Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001 -------------------- # 创建topic > bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_topic_name \ --partitions 20 --replication-factor 1 --config x=y 简写:> bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 # 修改topic > bin/kafka-topics.sh --bootstrap-server localhost:9092 -alter --partitions 3 --topic myTopic //> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name myTopic --partitions 20 --alter # 修改topic 的分区 bin/kafka-topics.sh --bootstrap-server localhost:9092 -alter --partitions 3 --topic myTopic # 删除topic > bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name # 添加topic配置,例如修改 partitions 40 > bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \ --partitions 40 # 删除topic配置 > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y # 消费者列表 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --from-beginning -------------------- test-99988 test-99989 test-99990 test-99991 test-99992 test-99993 test-99994 test-99995 -------------------- # 查看消费组里的成员 > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup --members -------------------- GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS myGroup ConsumerTest-1a87e781-b3ce-43c8-ac4a-76c8ccdec5aa /192.168.3.3 ConsumerTest 2 myGroup ConsumerTest2-ed1ddf24-7a68-4136-ad39-d774f531d765 /192.168.3.3 ConsumerTest2 1 -------------------- # 查看消息堆积情况(还可以看出是哪台机器有) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup -------------------- GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID myGroup myTopic 0 664075 700001 35926 ConsumerTest-3f121c59-ca11-4fa8-8fa0-538e0f0d5bdb /192.168.3.3 ConsumerTest myGroup myTopic 1 284334 300000 15666 ConsumerTest-3f121c59-ca11-4fa8-8fa0-538e0f0d5bdb /192.168.3.3 ConsumerTest myGroup myTopic 2 200000 200000 0 ConsumerTest2-4ae32ff0-e67f-4d09-8185-005bf61f3b1f /192.168.3.3 ConsumerTest2 其中:LAG 是待消费记录 --------------------
复制因素控制有多少服务器将复制写入的每条消息。如果您的复制因子为3,那么在您失去对数据的访问权限之前,最多可以有2台服务器出现故障。我们建议您使用2或3的复制因子,这样您就可以在不中断数据消耗的情况下透明地跳转机器。
分区计数控制主题将被切分成多少个日志。分区计数有几个影响。首先,每个分区必须完全适合单个服务器。因此,如果您有20个分区,那么整个数据集(以及读写负载)将由不超过20台服务器处理(不包括副本)。最后,分区计数会影响消费者的最大并行性。这将在概念部分进行更详细的讨论。
每个分片的分区日志都放在Kafka日志目录下自己的文件夹中。这类文件夹的名称由主题名称、加上破折号(-)和分区id组成。由于典型的文件夹名称长度不能超过255个字符,因此主题名称的长度将受到限制。我们假设分区的数量永远不会超过100,000。因此,主题名称不能超过249个字符。这在文件夹名称中留下了足够的空间来放置破折号和可能的5位数长的分区id。
与主题相关的配置既有服务器默认值,也有可选的每个主题覆盖。如果没有给出每个主题的配置,则使用服务器默认值。可以在创建主题时通过提供一个或多个——config选项来设置覆盖。下面的例子创建了一个名为my-topic的主题,并自定义了最大消息大小和刷新速率:
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
以后还可以使用alter configs命令更改或设置覆盖。下面的例子更新了my-topic的最大消息大小:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
--alter --add-config max.message.bytes=128000
要检查主题上设置的覆盖,您可以执行以下操作
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe
要移除覆盖,您可以这样做
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
--alter --delete-config max.message.bytes
以下是主题级配置。服务器对此属性的默认配置在服务器默认属性标题下给出。给定的服务器默认配置值仅适用于没有显式主题配置覆盖的主题。
# 发送消息
> bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
Ctrl-C 终止
# 消费消息
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
数量统计:普通消息和死信消息
死信是应用端生成的,非kafka本身自带。
myGroup.DLT 是死信
myGroup 是普通消息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup
---------------------------------
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myGroup.DLT myTopic.DLT 0 4 4 0 test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3 test-0
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myGroup myTopic 0 800020 800020 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0
myGroup myTopic 1 400012 400012 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0
myGroup myTopic 2 300000 300000 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0
---------------------------------
创建docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
# 证实可以
docker-compose up -d
后面在找了一个
创建docker-compose.yml
version: '2'
services:
zookeeper: # 注意,在我的archlinux系统的机子会有内存泄露问题
image: wurstmeister/zookeeper
ports:
- "2181:2181"2.6. kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
# 启动
docker-compose up -d
https://gitee.com/alvis128/springboot-kafka-demo
官网:https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#using-kafkatransactionmanager
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。