赞
踩
如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是
有MQ开发经验的同学大概都知道,消息被发送到Broker之后不一定会马上被持久化到磁盘上,基本上都是会写入到操作系统的缓存中,由操作系统决定什么时候将数据刷新到磁盘上。Kafka也是同理,所以严格意义上来说,即使我们采用上述5个步骤,也不一定保证发送给Kafka的数据不会丢失。Kafka提供了几个log.flush参数和flush.message、flush.ms来控制刷新磁盘的时间和写入log时间,具体参数和配置参考Kafka官方文档 > Broker配置和Kafka官方文档 > Topic配置。
Kafka官方是不建议设置的,让操作系统自己决定。并且官方提到了这样一句话
We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported
也就是官方认为数据的可靠性靠副本机制来保证而不是强制本地磁盘刷新
当然了如果你没有副本只是一个单机节点的话可以考虑设置磁盘刷新相关配置。
关于flush的更多细节推荐阅读 Kafka 官方文档 > Application vs. OS Flush Management以及后面的几节
消息格式为JSON, 需要使用Jackson将类序列化为JSON字符串。但是如果我们有多种POJO消息,每一个都去实现官方的Serializer接口,显然不太好,能不能利用泛型的帮我们完成呢?开源的Kafka版本没有提到这一部分,好在我在confluent官方GitHub仓库找到了对应实现
只需要引入依赖即可
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-serializer</artifactId>
<version>7.5.1</version>
</dependency>
同时要指明仓库URL
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
具体配置见下面详细代码的这一行
result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class.getName());
有几点需要注意
/** * 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景吞吐量需求 自己调整 * 如果是本地, bootstrap.server的IP地址和docker-compose.yml中的EXTERNAL保持一致 * 关于消息压缩类型,官方建议选择lz4,详情见博文 https://www.confluent.io/blog/apache-kafka-message-compression/ * * 建议设置client.id, 防止InstanceAlreadyExistsException 异常, * 如果不设置, kafka会自动生成一个client.id, 默认格式是producer-1, 代码逻辑见{@link ProducerConfig#maybeOverrideClientId(Map)} * kafka Java client 会使用client.id生成JMX的ObjectName, 代码逻辑见{@link KafkaProducer#KafkaProducer(ProducerConfig, Serializer, Serializer, ProducerMetadata, KafkaClient, ProducerInterceptors, Time)} 中的registerAppInfo * 如果多个应用(也就是多个进程)都没有设置client.id, 使用默认的client.id的规则生成的client.id则重复, 会抛出InstanceAlreadyExistsException * 如果是同一应用(也就是同一进程)创建多个producer, 不设置client.id的话不会抛出InstanceAlreadyExistsException, 因为其内部有一个自动递增的计数器{@link ProducerConfig#PRODUCER_CLIENT_ID_SEQUENCE} */ public static Properties loadProducerConfig(String valueSerializer) { Properties result = new Properties(); result.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9093"); // 建议设置client.id result.put(ProducerConfig.CLIENT_ID_CONFIG, SERVER_ID); result.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class.getName()); result.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.name); // 每封邮件消息大小大约20KB, 使用默认配置吞吐量不高,下列配置增加kafka的吞吐量 // 默认16384 bytes,太小了,这会导致邮件消息一个一个发送到kafka,达不到批量发送的目的,不符合发送邮件的场景 result.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576 * 10); // 默认1048576 bytes,限制的是一个batch的大小,对于20KB的消息来说,消息太小 result.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576 * 10); // 等10ms, 为了让更多的消息聚合到一个batch中,提高吞吐量 result.put(ProducerConfig.LINGER_MS_CONFIG, 10); return result; }
@Log public class MessageProducer { public static final KafkaProducer<String, UserDTO> PRODUCER = new KafkaProducer<>(KafkaConfiguration.loadProducerConfig(UserDTOSerializer.class.getName())); private MessageFailedService messageFailedService = new MessageFailedService(); /** * kafka producer 发送失败时会进行重试,相关参数 retries 和 delivery.timeout.ms, 官方建议使用delivery.timeout.ms,默认2分钟 * callback函数只有在最后一次重试之后才会调用, 如果你想在本地测试Kafka生产者的重试,详情可以看https://lists.apache.org/thread/nwg326bxpo7ry116nqhxmsmc3bokc6hm * @param userDTO */ public void sendMessage(final UserDTO userDTO) { ProducerRecord<String, UserDTO> user = new ProducerRecord<>("email", userDTO.getMessageId(), userDTO); try { PRODUCER.send( user, (recordMetadata, e) -> { if (Objects.nonNull(e)) { log.severe("message has sent failed"); MessageFailedEntity messageFailedEntity = new MessageFailedEntity(); messageFailedEntity.setMessageId(userDTO.getMessageId()); ObjectMapper mapper = new ObjectMapper(); try { messageFailedEntity.setMessageContentJsonFormat(mapper.writeValueAsString(userDTO)); } catch (JsonProcessingException jsonProcessingException) { log.severe("message content json format failed"); } messageFailedEntity.setMessageType(MessageType.EMAIL); messageFailedEntity.setMessageFailedPhrase(MessageFailedPhrase.PRODUCER); messageFailedEntity.setFailedReason(e.getMessage()); // 如果sendMessage传进来的是个list,也同理,不能放到list.foreach外面 // 如果放在主线程里,由于kafka producer是异步的, // kafka producer的执行速度可能慢于主线程,可能拿到的值是空的是有问题的,例如拿到的failedReason是空的 messageFailedService.saveOrUpdateMessageFailed(messageFailedEntity); } else { log.info("message has sent to topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition() ); } }); } catch (TimeoutException e) { log.info("send message to kafka timeout, message: "); // TODO: 自定义逻辑,比如发邮件通知kafka管理员 } } }
对上述代码做几点解释
实现ServletContextListener接口, 然后在web.xml的listener元素中配置
public class KafkaListener implements ServletContextListener {
private static final List<KafkaProducer> KAFKA_PRODUCERS = new LinkedList<>();
@Override
public void contextInitialized(ServletContextEvent sce) {
KAFKA_PRODUCERS.add(MessageProducer.PRODUCER);
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
KAFKA_PRODUCERS.forEach(KafkaProducer::close);
}
}
<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee
https://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"
version="6.0">
<listener>
<listener-class>com.business.server.listener.KafkaListener</listener-class>
</listener>
</web-app>
http://localhost:8999/business-server
下一篇博文,将介绍消费者消费消息,以及消费者的重要参数配置,还有消费逻辑的重试机制等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。