当前位置:   article > 正文

Kafka(三)生产者发送JSON消息+使用统一序列化器+提升吞吐量_kafka json序列化

kafka json序列化

生产者发送思路

如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是

  1. ack使用默认的all
  2. 开启重试
  3. 在一定时间内重试不成功,则入库,后续由定时任务继续发送
  4. 这里在某些异常情况下一定会生产重复消息,如何确保消息只消费一次,后续在Consumer实现中详细展开
  5. 这里我们只要确保生产的消息,不论重试多少次,最终都只会被发送到同一分区。Kafka的确定消息的分区策略是: 如果提供了key,则根据hash(key)计算分区。由于我们每个消息都有一个消息ID,不管是重试多少次,ID是不会变的,同时我们不会在消息高峰阶段调整分区数量。所以基于这些,我们保证一个消息无论多少次,都会发送到同一分区。

有MQ开发经验的同学大概都知道,消息被发送到Broker之后不一定会马上被持久化到磁盘上,基本上都是会写入到操作系统的缓存中,由操作系统决定什么时候将数据刷新到磁盘上。Kafka也是同理,所以严格意义上来说,即使我们采用上述5个步骤,也不一定保证发送给Kafka的数据不会丢失。Kafka提供了几个log.flush参数和flush.message、flush.ms来控制刷新磁盘的时间和写入log时间,具体参数和配置参考Kafka官方文档 > Broker配置和Kafka官方文档 > Topic配置。

Kafka官方是不建议设置的,让操作系统自己决定。并且官方提到了这样一句话

We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported

也就是官方认为数据的可靠性靠副本机制来保证而不是强制本地磁盘刷新

当然了如果你没有副本只是一个单机节点的话可以考虑设置磁盘刷新相关配置。

关于flush的更多细节推荐阅读 Kafka 官方文档 > Application vs. OS Flush Management以及后面的几节

使用统一序列化器

消息格式为JSON, 需要使用Jackson将类序列化为JSON字符串。但是如果我们有多种POJO消息,每一个都去实现官方的Serializer接口,显然不太好,能不能利用泛型的帮我们完成呢?开源的Kafka版本没有提到这一部分,好在我在confluent官方GitHub仓库找到了对应实现
只需要引入依赖即可

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-json-serializer</artifactId>
            <version>7.5.1</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

同时要指明仓库URL

    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

具体配置见下面详细代码的这一行

result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class.getName());
  • 1

配置生产者参数

有几点需要注意

  1. 开启压缩
  2. retries 官方建议不配置, 官方建议使用delivery.timeout.ms 参数来控制重试时间, 默认2分钟
  3. buffer.memory 如果没有什么特别情况,使用默认的即可, 32MB
  4. ack使用默认的all
  5. 配置client.id 防止防止InstanceAlreadyExistsException
    /**
     * 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景吞吐量需求 自己调整
     * 如果是本地, bootstrap.server的IP地址和docker-compose.yml中的EXTERNAL保持一致
     * 关于消息压缩类型,官方建议选择lz4,详情见博文 https://www.confluent.io/blog/apache-kafka-message-compression/
     * 
     * 建议设置client.id, 防止InstanceAlreadyExistsException 异常,
     * 如果不设置, kafka会自动生成一个client.id, 默认格式是producer-1, 代码逻辑见{@link ProducerConfig#maybeOverrideClientId(Map)}
     * kafka Java client 会使用client.id生成JMX的ObjectName, 代码逻辑见{@link KafkaProducer#KafkaProducer(ProducerConfig, Serializer, Serializer, ProducerMetadata, KafkaClient, ProducerInterceptors, Time)} 中的registerAppInfo
     * 如果多个应用(也就是多个进程)都没有设置client.id, 使用默认的client.id的规则生成的client.id则重复, 会抛出InstanceAlreadyExistsException
     * 如果是同一应用(也就是同一进程)创建多个producer, 不设置client.id的话不会抛出InstanceAlreadyExistsException, 因为其内部有一个自动递增的计数器{@link ProducerConfig#PRODUCER_CLIENT_ID_SEQUENCE}
     */
    public static Properties loadProducerConfig(String valueSerializer) {
        Properties result = new Properties();
        result.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9093");
        // 建议设置client.id
        result.put(ProducerConfig.CLIENT_ID_CONFIG, SERVER_ID);
        result.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class.getName());
        result.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.name);
        // 每封邮件消息大小大约20KB, 使用默认配置吞吐量不高,下列配置增加kafka的吞吐量
        // 默认16384 bytes,太小了,这会导致邮件消息一个一个发送到kafka,达不到批量发送的目的,不符合发送邮件的场景
        result.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576 * 10);
        // 默认1048576 bytes,限制的是一个batch的大小,对于20KB的消息来说,消息太小
        result.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576 * 10);
        // 等10ms, 为了让更多的消息聚合到一个batch中,提高吞吐量
        result.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        return result;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

提升吞吐量

  • 在实际场景中,我们的邮件消息一个大概20KB,而batch.size默认是16KB,也就是说,在不修改该参数的情况下,生产者只能一个一个的发消息,这会导致我们的吞吐量上不去, 所以修改batch.size为10MB
  • 只修改这个参数还不行, max.request.size 限制了单次请求的大小,默认为1MB,也就是说即使batch.size为10MB,但是由于一次只能最多发1MB,吞吐量也上不去,所以这里将max.request.size也改为10MB
  • 由于我们将一个批次可发送的数量大大提高,所以可以让生产者等一会再发,等更多的数据到达。linger.ms默认是为0,也就是立刻发送,根据实际情况适当增加等待时间

发送消息

@Log
public class MessageProducer {
    
    public static final KafkaProducer<String, UserDTO> PRODUCER = new KafkaProducer<>(KafkaConfiguration.loadProducerConfig(UserDTOSerializer.class.getName()));
    
    private MessageFailedService messageFailedService = new MessageFailedService();

    /**
     * kafka producer 发送失败时会进行重试,相关参数 retries 和 delivery.timeout.ms, 官方建议使用delivery.timeout.ms,默认2分钟
     * callback函数只有在最后一次重试之后才会调用, 如果你想在本地测试Kafka生产者的重试,详情可以看https://lists.apache.org/thread/nwg326bxpo7ry116nqhxmsmc3bokc6hm
     * @param userDTO
     */
    public void sendMessage(final UserDTO userDTO) {
        ProducerRecord<String, UserDTO> user = new ProducerRecord<>("email", userDTO.getMessageId(),  userDTO);
        try {
            PRODUCER.send( user, (recordMetadata, e) -> {
                if (Objects.nonNull(e)) {
                    log.severe("message has sent failed");
                    MessageFailedEntity messageFailedEntity = new MessageFailedEntity();
                    messageFailedEntity.setMessageId(userDTO.getMessageId());
                    ObjectMapper mapper = new ObjectMapper();
                    try {
                        messageFailedEntity.setMessageContentJsonFormat(mapper.writeValueAsString(userDTO));
                    } catch (JsonProcessingException jsonProcessingException) {
                        log.severe("message content json format failed");
                    }
                    messageFailedEntity.setMessageType(MessageType.EMAIL);
                    messageFailedEntity.setMessageFailedPhrase(MessageFailedPhrase.PRODUCER);
                    messageFailedEntity.setFailedReason(e.getMessage());
                    // 如果sendMessage传进来的是个list,也同理,不能放到list.foreach外面
                    // 如果放在主线程里,由于kafka producer是异步的,
                    // kafka producer的执行速度可能慢于主线程,可能拿到的值是空的是有问题的,例如拿到的failedReason是空的
                    messageFailedService.saveOrUpdateMessageFailed(messageFailedEntity);
                } else {
                    log.info("message has sent to topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition() );
                }
            });
        } catch (TimeoutException e) {
            log.info("send message to kafka timeout, message: ");
            // TODO: 自定义逻辑,比如发邮件通知kafka管理员
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

对上述代码做几点解释

  1. 我们使用异步的方式发送,如果发送成功,打印一条消息
  2. 关键在于重试,callback函数只有在最后一次重试之后才会调用。不会重试多少次就调用多少次callback, 这个问题我发邮件问过社区, 详情见这里的 邮件,想要本地测试或debug重试相关逻辑的话,可以将min.insync.replicas改大一点,例如我只有一个Kafka节点,那么我设置min.insync.replicas为大于1的值,并且设置Kafka的Producer的确认设置是all,那么这时候发送消息,就会看到重试相关日志

关闭生产者

实现ServletContextListener接口, 然后在web.xml的listener元素中配置

public class KafkaListener implements ServletContextListener {

    private static final List<KafkaProducer> KAFKA_PRODUCERS = new LinkedList<>();

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        KAFKA_PRODUCERS.add(MessageProducer.PRODUCER);
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        KAFKA_PRODUCERS.forEach(KafkaProducer::close);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee
                      https://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"
         version="6.0">

  <listener>
    <listener-class>com.business.server.listener.KafkaListener</listener-class>
  </listener>
</web-app>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

结语

  1. 在实际编码过程中,可以参考官方写的Kafka权威指南对应章节书写,或者参考各大云服务厂商的Kafak的开发者文档。不过我建议还是看Kafka权威指南, 我看了阿里云和华为云的,虽然都号称兼容开源Kafka,但是发现其版本和开源版本之间存在一定的滞后性,许多最佳实践已经过时
  2. Kafka生产者端没什么特别的,主要是根据业务场景设计消息格式,以及如何尽可能的减小消息体积
  3. 如果你的消息很大,比我的场景还大,达到了1M以上,生产者的吞吐量是个问题,消费者的消费速度也是个问题。你要是问我有什么好的想法,没有具体场景,我确实想不出什么好的方式

示例源码仓库

  1. Github地址
  2. 项目下business-server module代表生产者
  3. 运行时IDEA配置如下在这里插入图片描述
    注意Application context的路径, 启动之后访问端口+Application context, 例如
http://localhost:8999/business-server
  • 1

下一篇博文,将介绍消费者消费消息,以及消费者的重要参数配置,还有消费逻辑的重试机制等。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/710631
推荐阅读
相关标签
  

闽ICP备14008679号