当前位置:   article > 正文

从零搭建开发脚手架 Spring Boot集成Kafka实现生产者消费者的多种方式_springboot kafka生产者

springboot kafka生产者

发布空闲消费者事件之间的时间(未收到数据)。

#idle-event-interval:

#是否在初始化期间记录容器配置(INFO 级别)。

#log-container-config:

检查无响应消费者的时间间隔。 如果未指定持续时间后缀,则将使用秒。

#monitor-interval:

乘数应用于“pollTimeout”以确定消费者是否无响应。

#no-poll-threshold:

#轮询消费者时使用的超时。

#poll-timeout:

#侦听器类型。默认single,可选batch

#type: single

创建Kafka主题


方式一(不推荐): 自动创建主题

在配置文件里指定好kafka的topic之后,调用send方法或者KafkaListener指定topic会自动帮我们创建好topic,只是创建的topic默认是1个副本和1个分区的,这一般不能满足我们的要求,所以我们还需要在kafka的server.properties里增加或修改以下参数:

auto.create.topics.enable=true

num.partitions=3

default.replication.factor=3

之后,kafka自动帮我们创建的主题都会包含3个副本和3个分区。

方式二:可以提前运行命令行工具在 Kafka 中创建主题:

$ bin/kafka-topics.sh --create \

–zookeeper localhost:2181 \

–replication-factor 1 --partitions 1 \

–topic mytopic

方式三:随着Kafka中_AdminClient_的引入,我们现在可以以编程方式创建主题。

我们需要添加KafkaAdmin Spring bean,它将自动为NewTopic类型的所有 bean 添加主题:

@Configuration

public class KafkaTopicConfig {

@Value(value = “${kafka.bootstrapAddress}”)

private String bootstrapAddress;

@Bean// 使用 Spring Boot 时,KafkaAdmin会自动注册一个bean,因此您只需要NewTopic @Bean

public KafkaAdmin kafkaAdmin() {

Map<String, Object> configs = new HashMap<>();

configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);

return new KafkaAdmin(configs);

}

@Bean

public NewTopic topic1() {

// 主题名称、分区数、副本数 或者使用TopicBuilder

return new NewTopic(“baeldung”, 1, (short) 1);

}

}

生产者


@Configuration

public class KafkaProducerConfig {

@Bean

public ProducerFactory<String, String> producerFactory() {

Map<String, Object> configProps = new HashMap<>();

configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);

configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

return new DefaultKafkaProducerFactory<>(configProps);

}

@Bean

public KafkaTemplate<String, String> kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());

}

}

简单发布消息

我们可以使用_KafkaTemplate_类发送消息:

@Autowired

private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {

kafkaTemplate.send(topicName, msg);

}

带有回调发布消息

在发送API返回的ListenableFuture对象可以阻塞发送线程并获取发送消息的结果,线程将等待结果,但它会减慢生产者的速度。

Kafka 是一个快速的流处理平台。因此,最好异步处理结果,以便后续消息不会等待上一条消息的结果。

我们可以通过回调来做到这一点:

public void sendMessage(String message) {

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);

future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

// 发送成功的处理

@Override

public void onSuccess(SendResult<String, String> result) {

System.out.println(“Sent message=[” + message +

“] with offset=[” + result.getRecordMetadata().offset() + “]”);

}

@Override

public void onFailure(Throwable ex) {

// 发送失败的处理

System.out.println(“Unable to send message=[”

  • message + "] due to : " + ex.getMessage());

}

});

}

消费者


简单消费者

@KafkaListener(topics = “topicName”, groupId = “foo”)

public void listenGroupFoo(String message) {

System.out.println("Received Message in group foo: " + message);

}

我们可以为一个主题实现多个侦听器,每个侦听器都有不同的组 ID。此外,一个消费者可以监听来自不同主题的消息:

@KafkaListener(topics = “topic1, topic2”, groupId = “foo”)

Spring 还支持在侦听器中使用@Header注释检索一个或多个消息头:

@KafkaListener(topics = “topicName”)

public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {

System.out.println(“Received Message: " + message” + "from partition: " + partition);

}

@KafkaListener可以接受的参数有:

  • data : 对于data值的类型其实并没有限定,根据KafkaTemplate所定义的类型来决定。 data为List集合的则是用作批量消费。

  • ConsumerRecord:具体消费数据类,包含Headers信息、分区信息、时间戳等

  • Acknowledgment:用作Ack机制的接口

  • Consumer:消费者类,使用该类我们可以手动提交偏移量、控制消费速率等功能

public void listen1(String data)

public void listen2(ConsumerRecord<K,V> data)

public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment)

public void listen4(ConsumerRecord<K,V> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)

public void listen5(List data)

public void listen6(List<ConsumerRecord<K,V>> data)

public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment)

public void listen8(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)

消费特定分区的消息

对于具有多个分区的主题,@KafkaListener可以显式订阅具有初始偏移量的主题的特定分区:

@KafkaListener(

topicPartitions = @TopicPartition(topic = “topicName”,

partitionOffsets = {

@PartitionOffset(partition = “0”, initialOffset = “0”),

@PartitionOffset(partition = “3”, initialOffset = “0”)}),

containerFactory = “partitionsKafkaListenerContainerFactory”)

public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {

System.out.println(“Received Message: " + message” + "from partition: " + partition);

}

由于此侦听器中的initialOffset已设置为 0,因此每次初始化此侦听器时,将重新使用来自分区 0 和 3 的所有先前消耗的消息。

如果我们不需要设置偏移量,我们可以使用@TopicPartition注解的partitions属性,只设置没有偏移量的分区:

@KafkaListener(topicPartitions

= @TopicPartition(topic = “topicName”, partitions = { “0”, “1” }))

为监听器添加消息过滤器

我们可以通过添加自定义过滤器来配置侦听器以使用特定类型的消息。这可以通过将RecordFilterStrategy设置到KafkaListenerContainerFactory来完成:

@Bean

public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

factory.setRecordFilterStrategy(record -> record.value().contains(“World”));

return factory;

}

然后我们可以配置一个监听器来使用这个容器工厂:

@KafkaListener(topics = “topicName”, containerFactory = “filterKafkaListenerContainerFactory”)

public void listenWithFilter(String message) {

System.out.println("Received Message in filtered listener: " + message);

}

在这个监听器中,所有匹配过滤器消息都将被丢弃。

自定义消息转换器


前面消息发送和接收都是字符串。但是,我们也可以发送和接收自定义 Java 对象。这需要在ProducerFactory中配置适当的序列化器,在ConsumerFactory中配置反序列化器。

让我们看一个简单的 bean 类*,*我们将其作为消息发送:

public class Greeting {

private String msg;

private String name;

}

发送自定义消息

我们使用JsonSerializer

ProducerFactoryKafkaTemplate的代码:

@Bean

public ProducerFactory<String, Greeting> greetingProducerFactory() {

configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);

return new DefaultKafkaProducerFactory<>(configProps);

}

@Bean

public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {

return new KafkaTemplate<>(greetingProducerFactory());

}

我们可以使用这个新的KafkaTemplate来发送Greeting消息:

kafkaTemplate.send(topicName, new Greeting(“Hello”, “World”));

消费自定义消息

同样,让我们修改ConsumerFactoryKafkaListenerContainerFactory以正确反序列化 Greeting 消息:

@Bean

public ConsumerFactory<String, Greeting> greetingConsumerFactory() {

// …

return new DefaultKafkaConsumerFactory<>(props,new StringDeserializer(), new JsonDeserializer<>(Greeting.class));

}

@Bean

public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(greetingConsumerFactory());

return factory;

}

spring-kafka JSON 序列化器和反序列化器使用Jackson库,它也是 spring-kafka 项目的可选 Maven 依赖项。

所以,让我们将它添加到我们的_pom.xml 中_:

com.fasterxml.jackson.core

jackson-databind

监听器代码:

@KafkaListener(

topics = “topicName”,

containerFactory = “greetingKafkaListenerContainerFactory”)

public void greetingListener(Greeting greeting) {

// process greeting message

}

JavaConfig方式的生产者、消费者


@Configuration

public class Kafka_Config {

@Value(“${kafka.broker.list}”)

public String brokerList;

public static final String topic = “TOPIC_LIN_LIANG”;

public final String groupId = “group.01”;

public Properties customerConfigs() {

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动位移提交

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);//自动位移提交间隔时间

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);//消费组失效超时时间

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “latest”);//位移丢失和位移越界后的恢复起始位置

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

StringDeserializer.class.getName());

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

StringDeserializer.class.getName());

return props;

}

public Properties producerConfigs() {

Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 20000000);//20M 消息缓存

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)

img

最后

光给面试题不给答案不是我的风格。这里面的面试题也只是凤毛麟角,还有答案的话会极大的增加文章的篇幅,减少文章的可读性

Java面试宝典2021版

最常见Java面试题解析(2021最新版)

2021企业Java面试题精选

《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!
正体系化!**

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)

img

最后

光给面试题不给答案不是我的风格。这里面的面试题也只是凤毛麟角,还有答案的话会极大的增加文章的篇幅,减少文章的可读性

Java面试宝典2021版

[外链图片转存中…(img-CsNKXKHq-1711882636181)]

[外链图片转存中…(img-tR2cZQiA-1711882636181)]

最常见Java面试题解析(2021最新版)

[外链图片转存中…(img-13X5PDBW-1711882636182)]

[外链图片转存中…(img-Rj6fnzud-1711882636182)]

2021企业Java面试题精选

[外链图片转存中…(img-dSGqQ2MX-1711882636182)]

[外链图片转存中…(img-udeBFt2P-1711882636183)]

《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/533399
推荐阅读
相关标签
  

闽ICP备14008679号