赞
踩
cd /opt/zookeeper/apache-zookeeper-3.6.3-bin/bin
--> ./zkServer.sh start
/opt/kafka/kafka1/config/server.properties
#配置标识
broker.id=1(2,3)
#配置可删除
delete.topic.enable=true
#配置日志
log.dirs=/opt/kafka/tmp/kafka1(2,3)/kafka-logs
#配置端口
port=9092(9093,9094)
#配置可被外部服务器访问
listeners=PLAINTEXT://192.168.248.100:9092
advertised.listeners=PLAINTEXT://192.168.248.100:9092
#配置清除数据日志文件策略
log.cleanup.policy=delete
log.retention.hours=168
log.segment.bytes=1073741824
cd /opt/kafka/kafka1/bin
-->./kafka-server-start.sh /opt/kafka/kafka1/config/server.properties
分区
,以提高吞吐效率,但同时会增大kafka服务器的压力
轮询
入分区,该策略可以配置。leader
节点。如三个分区,将会有三个leader节点。各自leader节点
进行吞吐,其他集群节点充当副本
、同步数据
不参与吞吐。partition
只能供一个消费者消费,以保证消费的顺序性。broker
指定主题的客户端broker
指定主题消息的客户端主题-分区-消费组
”的偏移量。__consumer_offsets
,默认保存7天,到期删除。consumerGroupId+topic+分区号
,value就是当前offsets
值hash(consumerGroupId)%(_consumer_offsets主题的分区数)
broker
接到消息后会异步调用callback
回调方法处理成功或者失败的操作。_consumer_offsets
,间隔默认1s。这样子可能造成“消息丢失”和“消息重复消费”的风险。一般使用手动提交,提交的信息为:所属消费组+消费主题+消费分区+消费偏移量。leader
节点,其他的都是副本节点broker
启动时会向zk创建一个临时序号节点,获得序号最小的那个broker
会作为集群中的controller
,用来管理集群中的所有分区和副本状态。如1. 某个分区的副本leader
故障,由该控制器进行选举新的leader
2. 集群中的broker
新增或者减少,controller
会同步信息给其他broker
。 3.集群中的分区增加或减少,broker
会同步给其他broker
。rebalance
机制。这个机制就是调整消费者消费哪个分区。分区也有三种:1.range,通过公式计算消费者消费哪个分区。 2.轮询,轮流消费。 3.stick,在消费者消费的原分区不变的基础上进行调整。cd E:\kafka\kafka\user\kafka_2.12-2.8.1
--> .\bin\windows\kafka-server-start.bat .\config\server.properties
(linux 可以加个”-daemon“,以守护进程方式启动)E:\kafka\kafka\user\kafka_2.12-2.8.1\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic test
kafka-topics.bat --list --zookeeper localhost:2181
kafka-console-producer.bat --broker-list localhost:9092 --topic test
E:\kafka\kafka\user\tmp\kafka-logs\主题-分区(分区由创建主题时定)\00000000000000000000.log
中kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test
kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list
kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group testGroup
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic test
kafka-topics.bat --describe --zookeeper localhost:2181 --topic test
replicas
–>副本存在的broker
节点leader
–>副本才有,即需要一个领导节点,其他作为备份节点。读写操作都发生在leader
节点,如果leader挂了,则从备份节点选一个当leader
。isr
–>可以同步的和已同步的broker
节点,如果isr中的节点性能较差,会被踢出集合kafka-topics.bat --zookeeper localhost:2181 --delete --topic test_topic
(有bug,请勿尝试。如果出错,需要关闭kafka,删除kafka的本地数据log文件,同时删除zookeeper上的相关主题,即可!)kafka-console-producer.bat --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test
kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --consumer-property group.id=testGroup --topic test --from-beginning
kafka集群新增机制
1.1、增加节点不需要重启,扩展分区或者新建主题时,分区会自动分配至新增节点上
1.2、topic创建之后,已存在的主题分区副本永远不会自动改变,需要自己迁移
创建topics-to-move.json:
{"topics":
[
{"topic":"testjq"}
],
"version": 1
}
执行脚本:./kafka-reassign-partitions.sh --zookeeper 192.168.6.160:2181 --topics-to-move-json-file topics-to-move.json --broker-list “2” --generate
将执行脚本后得到的两来哪配置保存下来,一份是当前分区(备份防止出错可以及时回滚),一份是建议分区
current-topic-reassignment.json:
{"version":1,"partitions":[{"topic":"testjq","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"testjq","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"testjq","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"testjq","partition":3,"replicas":[2],"log_dirs":["any"]},{"topic":"testjq","partition":4,"replicas":[1],"log_dirs":["any"]},{"topic":"testjq","partition":5,"replicas":[2],"log_dirs":["any"]}]}
proposed-topic-reassignment.json:
{"version":1,"partitions":[{"topic":"testjq","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"testjq","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"testjq","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"testjq","partition":3,"replicas":[2],"log_dirs":["any"]},{"topic":"testjq","partition":4,"replicas":[2],"log_dirs":["any"]},{"topic":"testjq","partition":5,"replicas":[2],"log_dirs":["any"]}]}
开始迁移:./kafka-reassign-partitions.sh --zookeeper 192.168.6.160:2181 --reassignment-json-file proposed-topic-reassignment.json --execute
判断是否完成:./kafka-reassign-partitions.sh --zookeeper 192.168.6.160:2181 --reassignment-json-file proposed-topic-reassignment.json --verify
server: port: 8081 spring: application: name: kafka kafka: bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 # kafka集群信息 producer: # 生产者配置 retries: 3 # 重试次数,默认Integer.MAX_VALUE batch-size: 16384 # kafka会从缓冲区读取数据,批量发送给broker,批量发送消息的大小(默认16K) buffer-memory: 33554432 # 生产者内存缓存区大小(32M)。如果设置了该缓冲区,消息会先发送到缓冲区,可以提升发送性能。 # 默认是1 # 0:客户端发送了,认为是发送成功。这种容易丢失消息。这种但是效率最高 # 1:客户端发送了,leader收到了并且写入了本地log,认为是发送成功。这种性能和效率是均衡的 # all: 客户端发送了,分区leader收到了,其他副本follower也有了,认为是发送成功。最安全,但是性能最差。如果只开一台kafka,和“1”性能相等。 acks: 1 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 指定消息key的编解码方式 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 指定消息体的编解码方式。这种是针对于对象传输 properties: linger: ms: 10 # 如果kafka迟迟不发送消息(这里指的是缓冲区消息没堆积到指定数量),那么过了这个时间(单位:毫米)开始发 consumer: group-id: defautGroup # 默认消费者组,即消费的时候不指定组的话,默认使用该组 enable-auto-commit: false # 关闭自动提交。这个自动提交指的是偏移量自动提交,auto.commit.interval.ms为自动提交的时间间隔,默认1s。自动提交方虽然便,但是这样子可能造成“消息丢失”和“消息重复消费”的风险 auto-offset-reset: earliest #默认latest。earliest:从头开始消费。latest:从最新的开始消费,当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # key反序列化(默认,可以不设置) value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # value反序列化(默认,可以不设置) max-poll-records: 500 #一次性最大poll的条数。默认500 heartbeat-interval: 1000 #consumer给broker发送心跳的间隔 listener: concurrency: 1 #在侦听器容器中运行的线程数,多线程消费 # RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 # BATCH:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交 # COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交 # COUNT_TIME:TIME | COUNT 有一个条件满足时提交 # MANUAL:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交 # MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种 AckMode: MANUAL_IMMEDIATE poll-timeout: 10000 #轮训消费者时的超时时间,ms monitor-interval: 10000 #监控间隔时间 ack-count: 5 #当ackMode为“COUNT”或“COUNT_TIME”时,偏移提交之间的记录数 ack-time: 10000 #当ackMode为“TIME”或“COUNT_TIME”时,偏移提交之间的时间(以毫秒为单位) # 在spring-kafka中没有明确的配置对应,但是预留了一个properties属性,可以设置所有的kafka配置 # properties: # session: # timeout: # ms: 10000 # kafka超过10s没收到消费者的心跳,将其剔除消费组,将分区给其他消费者 max: poll: interval: ms: 30000 #kafka两次poll的时间超过30s的时间间隔,则kafka会认为改消费者能力弱,将其剔除消费组,将分区给其他消费者 # enable: # idempotence: true #开启幂等性
依赖最好跟服务端匹配,我这里部署的kafka版本是2.12-2.8.1
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>kafka</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.2</version> <relativePath></relativePath> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- 覆盖 低版本 产生的错误 --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <!-- 热部署 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <!--引入kafka依赖--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</version> </dependency> </dependencies> </project>
package com.kafka; import lombok.NonNull; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.concurrent.ExecutionException; /** * @author 天真热 * @create 2022-03-24 15:42 * @desc **/ @RestController public class KafkaProducer { @Resource private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("/send") public void send() throws ExecutionException, InterruptedException { //===============================同步发送一段消息=================================== kafkaTemplate.send(Constants.TOPIC_NAME1, "同步 kafka").get(); //===============================异步发送一段消息=============================== //异步发送消息容易造成消息丢失,可设置监听 ListenableFuture<SendResult<String, String>> sendYb = kafkaTemplate.send(Constants.TOPIC_NAME1, "异步 kafka"); sendYb.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(@NonNull Throwable throwable) { System.out.println("结果失败"); } @Override public void onSuccess(SendResult<String, String> result) { ProducerRecord<String, String> producerRecord = result.getProducerRecord(); System.out.println("结果成功"); } }); //===============================指定分区发送消息=============================== //key可以不要,key的作用是在没有指定分区的情况下,计算出要往哪个分区发送数据。分区=hash(key)%(_consumer_offsets主题的分区数) //主题、分区、时间错、key、值 kafkaTemplate.send(Constants.TOPIC_NAME2, 1, 100l, null, "指定分区1 kafka"); } }
package com.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; /** * @author 天真热 * @create 2022-03-24 15:44 * @desc **/ @Component public class KafkaConsumer { /** * 消费者消费一个主题 * * @param record * @param ack */ //实际也是按批拿的数据,只不过对一批数据中的单条进行具体操作 @KafkaListener(topics = Constants.TOPIC_NAME1, groupId = "group1") public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println("group1:" + value); System.out.println(record.toString()); //手动提交offset ack.acknowledge(); } /** * 消费者消费两个主题 * * @param record * @param ack */ @KafkaListener(topics = {Constants.TOPIC_NAME1, Constants.TOPIC_NAME2}, groupId = "group2") public void listenGroup3(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println("group2:" + value); //手动提交offset ack.acknowledge(); } /** * 1.消费者消费两个主题 * 2.消费者消费主题1的0分区 * 2.消费者消费主题2的0分区,1分区并且偏移量从2开始 * * @param record * @param ack */ @KafkaListener(id = "consumer1", groupId = "group3", topicPartitions = { @TopicPartition(topic = Constants.TOPIC_NAME1, partitions = {"0"}), @TopicPartition(topic = Constants.TOPIC_NAME2, partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "2") ) }, concurrency = "2") public void listenGroup4(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println("group4:" + value); //手动提交offset ack.acknowledge(); } }
package com.authority; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Scope; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.Collection; import java.util.HashMap; import java.util.Map; /** * 动态创建消费者: * 1.设置监听器,动态创建消费者,注入监听器即可 * 2.直接创建消费者、设置监听器 * * @author 天真热 * @create 2022-03-27 10:28 * @desc **/ @RestController @RequestMapping("consumer") public class ConsumerController { @Resource private KafkaTemplate<String, String> kafkaTemplate; /** * 应用程序上行文 */ @Autowired ApplicationContext context; /** * 监听器容器工厂 */ @Resource ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory; /** * 所有@KafkaListener这个注解所标注的方法都会被注册在这里面中 */ @Resource KafkaListenerEndpointRegistry registry; /** * 创建消费者分组 */ @GetMapping("/create") public void create() { //动态创建三个消费者分组 String[] groupIds = {"group-0", "group-1", "group-2"}; for (String groupId : groupIds) { // 初始化当前消费者分组的配置 Map<String, Object> consumerProps = new HashMap<>(); Utils.getProperties(consumerProps); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 设置监听器容器工厂 containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps)); // 获取监听类实例 context.getBean(MyListener.class); } } /** * 停止所有消费监听 */ @GetMapping("/stop") public void stop() { registry.getListenerContainers().forEach(container -> { System.out.println(container.getGroupId()); System.out.println(container.getListenerId()); container.stop(); }); } /** * 启动所有消费监听 */ @GetMapping("/start") public void start() { Collection<MessageListenerContainer> x = registry.getListenerContainers(); registry.getListenerContainers().forEach(container -> { System.out.println(container.getGroupId()); System.out.println(container.getListenerId()); container.start(); }); } /** * 获取监听类实例 */ @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public MyListener listener() { return new MyListener(); } }
package com.authority; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; /** * @author 天真热 * @create 2022-03-27 10:27 * @desc **/ public class MyListener { @KafkaListener(topics = "TOPIC1") public void listen(@Payload String data, @Header(KafkaHeaders.GROUP_ID) String groupId, Acknowledgment ack) { //从上下文获取bean,然后执行数据库操作 //private T service = (T) ApplicationContextGetBeanHelper.getBean("serviceImpl"); System.out.println(groupId + ":" + data); //手动提交offset ack.acknowledge(); } }
/** * 创建主题 */ public static void createTopic() { Properties prop = new Properties(); prop.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); // 创建kafka客户端连接 AdminClient admin = AdminClient.create(prop); // 创建主题集合 ArrayList<NewTopic> topics = new ArrayList<NewTopic>(); // 声明主题 参数:主题名称、分区数、副本数 NewTopic newTopic = new NewTopic("TOPIC2", 3, (short) 3); topics.add(newTopic); //客户端创建主题 CreateTopicsResult result = admin.createTopics(topics); try { result.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
/** * 删除主题(kafka安装在window有bug,linux正常) */ public static void deleteTopic() { // 删除kafka主题 Properties prop = new Properties(); prop.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); // 创建kafka客户端连接 AdminClient client = AdminClient.create(prop); // 创建topic集合 ArrayList<String> topics = new ArrayList<>(); topics.add("test_topic"); // 删除topic DeleteTopicsResult result = client.deleteTopics(topics); try { result.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } // 创建zookeeper连接 ZkClient zkClient = new ZkClient("127.0.0.1:2181", 30000, 30000); // 删除zookeeper主题 zkClient.deleteRecursive(ZkUtils.getTopicPath("test_topic")); }
//创建Topic:createTopics(Collection<NewTopic> newTopics) //删除Topic:deleteTopics(Collection<String> topics) //罗列所有Topic:listTopics() //查询Topic:describeTopics(Collection<String> topicNames) //查询集群信息:describeCluster() //查询ACL信息:describeAcls(AclBindingFilter filter) //创建ACL信息:createAcls(Collection<AclBinding> acls) //删除ACL信息:deleteAcls(Collection<AclBindingFilter> filters) //查询Topic、Broker等的所有配置项信息:describeConfigs(Collection<ConfigResource> resources) //用于修改Topic、Broker等的配置项信息(该方法在新版本中被标记为已过期):alterConfigs(Map<ConfigResource, Config> configs) //修改副本的日志目录:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) //查询节点的日志目录信息:describeLogDirs(Collection<Integer> brokers) //查询副本的日志目录信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) //增加分区:createPartitions(Map<String, NewPartitions> newPartitions) //同样也是用于修改Topic、Broker等的配置项信息,但功能更多、更灵活,用于代替alterConfigs:incrementalAlterConfigs //用于调整Topic的Partition数量,只能增加不能减少或删除,也就是说新设置的Partition数量必须大于等于之前的Partition数量:createPartitions
/** * 查看主题列表 */ public static void listTopic() { Properties prop = new Properties(); prop.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); // 创建kafka客户端连接 AdminClient admin = AdminClient.create(prop); //获取主题列表 ListTopicsResult result = admin.listTopics(); KafkaFuture<Set<String>> future = result.names(); try { System.out.println("==================Kafka Topics===================="); future.get().forEach(name -> System.out.println(name)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
/** * 获取主题下的消费组 * * @param brokerServers "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094" * @param topic "TOPIC1" * @return * @throws ExecutionException * @throws InterruptedException * @throws TimeoutException */ private static List<String> getGroupsForTopic(String brokerServers, String topic) throws ExecutionException, InterruptedException, TimeoutException { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers); try (AdminClient client = AdminClient.create(props)) { //获取所有消费组名称 List<String> allGroups = client.listConsumerGroups().valid().get(10, TimeUnit.SECONDS).stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()); //获取所有的消费组详情 Map<String, ConsumerGroupDescription> allGroupDetails = client.describeConsumerGroups(allGroups).all().get(10, TimeUnit.SECONDS); // 创建消费组集合 final List<String> filteredGroups = new ArrayList<>(); allGroupDetails.entrySet().forEach(entry -> { //获取组名称 String groupId = entry.getKey(); //获取组详情 ConsumerGroupDescription description = entry.getValue(); //是否属于给定的主题的消费组,如果是则加入集合 boolean topicSubscribed = description.members().stream().map(MemberDescription::assignment) .map(MemberAssignment::topicPartitions) .map(tps -> tps.stream().map(TopicPartition::topic).collect(Collectors.toSet())) .anyMatch(tps -> tps.contains(topic)); if (topicSubscribed) filteredGroups.add(groupId); }); return filteredGroups; } }
/** * 获取主题分组的偏移量 * @param server "127.0.0.1:2181" * @param brokerServer "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094" * @param topic “TOPIC1” * @param group 'group1' * @return */ public static Long getOffset(String server, String brokerServer, String topic, String group) { Properties props = new Properties(); props.put("bootstrap.servers", brokerServer); props.put("group.id", group); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); // 创建kafka客户端连接 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); Long commitedOffset = 0L; // 获取指定主题的分区列表 List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionsFor) { // 设置主题、分区 TopicPartition tp = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); // 获取主题、分区对应的提交offset的记录 OffsetAndMetadata committed = consumer.committed(tp); //获取提交的offset commitedOffset += committed.offset(); } return commitedOffset; }
/** * 主题的消息总量(各分区之和) * * @param brokerServer * @param topics * @return */ public static Map<String, Long> counts(String brokerServer, String topics) { //服务器或者主题为空,则直接返回null if (brokers == null || brokers.isEmpty() || topics == null || topics.isEmpty()) { return null; } Map<String, Long> map = Maps.newTreeMap(); // 遍历主题 // 获取主题对应的分区、副本、领导节点、选举节点等 Map<Integer, PartitionMetadata> metadata = findLeader(brokers, topics); long size = 0L; for (Map.Entry<Integer, PartitionMetadata> entry : metadata.entrySet()) { // 分区数据 int partition = entry.getKey(); // leader节点 String leadBroker = entry.getValue().leader().host(); // 客户端,节点+分区 String clientName = "Client_" + topics + "_" + partition; // 发送请求给每个分区的leader SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000, 64 * 1024, clientName); // 获取分区底下的最大偏移量(数据总量) long readOffset = getLastOffset(consumer, topics, partition, kafka.api.OffsetRequest.LatestTime(), clientName); // 总数量叠加 size += readOffset; consumer.close(); } // 插入主题-->最大数量 map.put(topics, size); return map; }
堆积消息=消息总数-当前偏移量(具体可参考第6和第7点)
/** * 获取时间对应的时间戳 * * @param time * @return */ private static long getTime(String time) { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); Date date = null; try { date = format.parse(time); return date.getTime(); } catch (ParseException e) { e.printStackTrace(); } return 0; }
public static Map<TopicPartition, OffsetAndTimestamp> getTimeOffsetMap(String server, String topic, String groupId, long time) { //相关配置信息 Properties props = new Properties(); props.put("bootstrap.servers", server); props.put("group.id", groupId); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //获取kafka请求对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); //获取主题分区数量 Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); int partitionNum = partitionInfos.size(); //遍历分区,将分区和时间信息存入map Map<TopicPartition, Long> partitionMap = new HashMap<>(); for (int i = 0; i < partitionNum; i++) { TopicPartition topicPartition0 = new TopicPartition(topic, i); partitionMap.put(topicPartition0, time); } //获取分区和偏移量集合 Map<TopicPartition, OffsetAndTimestamp> timeOffsetMap = consumer.offsetsForTimes(partitionMap); return timeOffsetMap; }
public static void setOffset(String server, String topic, String groupId, long time) { //相关配置信息 Properties props = new Properties(); props.put("bootstrap.servers", server); props.put("group.id", groupId); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //获取kafka请求对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); //获取主题分区数量 Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); int partitionNum = partitionInfos.size(); //遍历分区,将分区和时间信息存入map Map<TopicPartition, Long> partitionMap = new HashMap<>(); for (int i = 0; i < partitionNum; i++) { TopicPartition topicPartition0 = new TopicPartition(topic, i); partitionMap.put(topicPartition0, time); } //获取分区和偏移量集合 Map<TopicPartition, OffsetAndTimestamp> timeOffsetMap = consumer.offsetsForTimes(partitionMap); //在调用seek方法的时候,需要先获得分区的信息,而分区的信息要通过poll方法来获得. // 如果调用seek方法时,没有分区信息,则会抛出IllegalStateException异常 No current assignment for partition xxxx. ConsumerRecords<String, String> poll = consumer.poll(0); //进行遍历、重新设置主题的各个分区偏移量 Set<Map.Entry<TopicPartition, OffsetAndTimestamp>> entries = timeOffsetMap.entrySet(); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : entries) { if (entry.getValue() == null) { consumer.seek(new TopicPartition(entry.getKey().topic(), entry.getKey().partition()), 99); continue; } //设置偏移量,根据主题,分区来设置 consumer.seek(new TopicPartition(entry.getKey().topic(), entry.getKey().partition()), entry.getValue().offset()); } //提交 consumer.commitSync(); consumer.close(); }
/** * 更新分区 * * @param bootstrapServers * @param loginKey * @param topicName * @param partitions * @throws IOException * @throws ExecutionException * @throws InterruptedException */ public static void createTopicPartitions(String bootstrapServers, String topicName, Integer partitions) throws IOException, ExecutionException, InterruptedException { AdminClient admin = getAdminClient(bootstrapServers); //构建Map Map<String, NewPartitions> newPartitions = new HashMap<>(); //给Map存入Topic名字和想要增加到的partition数量。 // 这里注意increaseTo,参数传入多少就是增加到多少,3就是增加到3,不是1+3=4 newPartitions.put(topicName, NewPartitions.increaseTo(partitions)); //拿到结果 CreatePartitionsResult result = admin.createPartitions(newPartitions); //执行阻塞方法,等待结果完成 result.all().get(); }
ack
设置为1 或者 all,这样子就可以保证至少有一个副本可以被写入数据。producer
已经将消息已经发给kafka
,但是producer
没收到ack
,导致重新发了一次,所以消费者收到了两条消息。prodecer
保留重发机制,防止消息丢失。可以在消费者端配置消息幂等性,如在数据库设置一个联合主键、或者使用redis
锁,保证消息唯一性。poll
后,存到另一个主题中。然后配置多个分区,消费组里配置多个消费者,一起消费。kafka
会保存上次消费的offset
位置,重连后会自动读取offset
后面的数据。concurrency
)和消费者的关系:这个参数来是设置每个@KafkaListener
的并发个数。通俗的说,他指的应该是单个消费者的并发数,如果是3个消费者,线程为3,那其实并发数可以达到9。如果就3个分区,1个消费者的并发数就够了,而并不是指并发多少个消费者。zookeeper
注册的主题,但是kafka
还能读取的到,并且kafka
无法新建主题kafka
的主题,kafka
直接崩溃,因为删除时候需要删除日志文件,此时kafka
还在运行,无权删除,导致崩溃。同时启动会失败,需要删除zookeeper
注册的主题才可启动。并且启动后可以直接创建主题。zookeeper
注册的主题、删除kafka
的主题。kafka
同样会崩溃,但是可以直接启动,启动后可以直接创建主题。zookeeper
注册主题,在kafka
停止后,直接删除kafka
的本地数据log
文件即可。(猜测可能是windows
存在改问题。暂未验证linux
,哪位小伙伴明知道的可以底下留言)linux
的kafka
调试正常,未出现删除kafka
崩溃现象。建议使用linux
环境部署kafkaCopyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。