赞
踩
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- spring:
- kafka:
- bootstrap-servers: ip:port,ip:port,ip:port,ip:port
- producer:
- # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0 如果不配置事物 retries: 0 要放开
- #retries: 1
- #一个批次可以使用的内存大小
- batch-size: 16384
- # 设置生产者内存缓冲区的大小。
- buffer-memory: 33554432
- # 键的序列化方式
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- # 值的序列化方式
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all 如果不配置事物要放开
- #acks: all
- #事务id 如果不配置事物要注释掉
- transaction-id-prefix: tran
- consumer:
- # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
- auto-commit-interval: 1S
- # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
- auto-offset-reset: earliest
- # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
- enable-auto-commit: false
- # 键的反序列化方式
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- # 值的反序列化方式
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- listener:
- # 在侦听器容器中运行的线程数。
- concurrency: 4
- #listner负责ack,手动调用Acknowledgment.acknowledge()后立即提交
- ack-mode: manual_immediate
- #避免出现主题未创建报错
- missing-topics-fatal: false
- private static final String TOPIC_NAME = "***";
-
- //注入KafkaTemplate
- @Autowired
- private KafkaTemplate<String, Object> kafkaTemplate;
- //注入式
- @GetMapping("/testInjection")
- @Transactional(rollbackFor = RuntimeException.class)
- public void testInjection(int num) {
- kafkaTemplate.send(TOPIC_NAME, "发送消息:" + num);
- if (num == 0) {
- throw new RuntimeException();
- }
- kafkaTemplate.send(TOPIC_NAME, "发送消息:" + num);
- }
- //声明式
- @GetMapping("/testStatement")
- public void testStatement(int num) {
- kafkaTemplate.executeInTransaction((KafkaOperations.OperationsCallback<String, Object, Object>) kafkaOperations -> {
- kafkaOperations.send(TOPIC_NAME, "发送消息:" + num);
- if (num == 0) {
- throw new RuntimeException();
- }
- kafkaOperations.send(TOPIC_NAME, "发送消息:" + num);
- return true;
- });
- }
- kafkaTemplate.send(TOPIC_NAME, "发送消息:" + num).addCallback(success -> {
- //成功回调topic, partition, offset, success.getProducerRecord().value());
- }, failure -> {
- //失败回调
- });
- @Component
- public class MQListener {
-
- @KafkaListener(topics = {"topic自定义"}, groupId = "自定义")
- public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
- System.out.println("消费收到消息:" + record.topic() + "-" + record.partition() + "-消息体:{" + record.value() + "}");
- //提交事物
- ack.acknowledge();
- }
-
- // @KafkaListener(topics = {"topic自定义"}, groupId = "自定义")
- // public void onMessage1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
- // System.out.println("消费7:" + record.topic() + "-" + record.partition() + "-" + record.value());
- // //提交事物
- // ack.acknowledge();
- // }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。