赞
踩
本文介绍使用Kafka监听和订阅两种不同方式进行数据消费
spring: kafka: bootstrap-servers: 192.168.1.16:9092 #消费者 consumer: group-id: alarmService max-poll-records: 10 # 一次 poll 最多返回的记录数 enable-auto-commit: false auto-commit-interval: 1000ms properties: max.poll.interval.ms: 360000 session.timeout.ms: 150000 #以下为kafka用户名密码的配置,不开启sasl时将以下配置删除 # SASL鉴权方式 sasl.mechanism: PLAIN # 加密协议 security.protocol: SASL_PLAINTEXT # 设置jaas帐号和密码 sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password"; key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest # listener: # type: batch # concurrency: 6 #生产者 producer: retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送 batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置 buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置 key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类 value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类 properties: session.timeout.ms: 15000 sasl.mechanism: PLAIN security.protocol: SASL_PLAINTEXT sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
@Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.enable-auto-commit}") private String autoCommit; @Value("${spring.kafka.consumer.auto-commit-interval}") private String interval; @Value("${spring.kafka.consumer.key-deserializer}") private String key; @Value("${spring.kafka.consumer.value-deserializer}") private String value; @Value("${spring.kafka.consumer.properties.security.protocol}") private String securityProtocol; @Value("${spring.kafka.consumer.properties.sasl.mechanism}") private String SASLMechanism; @Value("${spring.kafka.consumer.properties.sasl.jaas.config}") private String SASLJaasConfig; @Value("${spring.kafka.consumer.auto-offset-reset}") private String offsetReset; @Value("${spring.kafka.consumer.max-poll-records}") private Integer records; @Value("${spring.kafka.consumer.properties.session.timeout.ms}") private Integer timeout; @Value("${spring.kafka.consumer.properties.max.poll.interval.ms}") private Integer pollInterval; /** * 消费者 * @param topic 主题 * @param groupId group.id */ public void kafkaConsumer(String topic, String groupId) { Properties props = new Properties(); //Kafka集群 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //消费者组,只要group.id相同,就属于同一个消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, key); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, timeout); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000); //用户密码认证参数 props.put("security.protocol", securityProtocol); props.put("sasl.mechanism", SASLMechanism); props.put("sasl.jaas.config", SASLJaasConfig); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消费者订阅主题 consumer.subscribe(Arrays.asList(topic)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); JSONObject json = JSON.parseObject(message.toString()); //处理逻辑 //同步提交,当前线程会阻塞直到offset提交成功 consumer.commitSync(); } } } } finally { consumer.close(); } }
@KafkaListener(topicPattern="#{'${spring.kafka.consumer.topics}'}",groupId = "#{'${spring.kafka.consumer.group-id}'}") public void kafkaConsumer(ConsumerRecord<?,?> record) { System.out.println("--------------kafka----------------"); //获取小区id List<String> communityIds = communityBaseinfoMapper.getCommunityBaseinfoCommunityId(); Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); JSONObject json = JSON.parseObject(message.toString()); ApplyAccess papplyAccess = json.toJavaObject(ApplyAccess.class); String communityId = papplyAccess.getCommunityId(); if (communityIds.contains(communityId)){ //数据存储 String idCard = papplyAccess.getIdCard().replace("*", ""); peopleBaseinfoService.savePapplyAccess(papplyAccess,idCard); } } }
消费者要从头开始消费某个topic的全量数据,需要满足2个条件(spring-kafka):
(1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);
(2)指定"auto.offset.reset"参数的值为earliest;
auto.offset.reset具体含义:(注意版本不同,配置参数会有所不一致,具体参考官网)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。