当前位置:   article > 正文

springboot 集成kafka 详细教程,看这一篇就够了_springboot kafka

springboot kafka

废话不多说,直接上代码
为啥这样说,现在大家都想先看大妈效果,再去看逻辑
先看整体架构
在这里插入图片描述

先贴yml吧,这个毕竟是项目一创建就需要的

spring:
  application:
    admin: apache-kafka
  kafka:
    bootstrap-servers: 这里是你自己的kafka地址 # kafka 服务器集群地址,默认为 localhost:9092
    template:
      default-topic: demo  #将消息发送到的默认主题,KafkaTemplate.sendDefault
    listener:
      type: batch #监听器类型,可选值有:SINGLE(单条消费,默认)、BATCH(批量消息)

    # kafka 生产者配置
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 key 序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 value 序列化方式
      batch-size: 16KB #默认批处理大小,如果值太小,则可能降低吞吐量,为零将完全禁用批处理,当 linger.ms=0 时,此值无效
      buffer-memory: 32MB #生产者可以用来缓冲等待发送到服务器的记录的总内存大小
      retries: 3 #发送失败时的重试次数,当大于零时,允许重试失败的发送。
#      在考虑请求完成之前,生产者要求领导者已收到的确认数,可选值有:-1、0、1(默认为1)
#      使用事务时,必须配置为 -1,表示领导者必须收到所有副本的确认消息。
      acks: -1
      properties:
        #消息提交延时时间(单位毫秒),当生产者接收到消息 linger.ms 秒钟后,就会将消息提交给 kafka。
        #当生产端积累的消息达到 batch-size 大小后,也会将消息提交给 kafka。
        #linger.ms 默认为 0 ,表示每接收到一条消息就会立即提交给 kafka,此时 batch-size 无效。如果对实时性要求高,则建议设置为 0
        linger.ms: 0
        partitioner:
          class: com.wmx.apachekafka.beans.MyKafkaPartitioner #kafka 自定义分区规则
      transaction-id-prefix: tx_kafka.

    # kafka 消费者配置
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 key 反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 value 反序列化方式
      group-id: test-consumer-group #标识此消费者所属的消费者组的唯一字符串,这里只要你是默认安装,那就是这个,不用修改
      #消费者客户端 id,在消费者组需要唯一指定。发出请求时会传递给服务器,用于服务器端日志记录
      #不写时,会自动命名,比如:consumer-1、consumer-2...,原子性递增。通常不建议自定义,使用默认值即可,因为容易冲突
      #client-id: wangmx1
      enable-auto-commit: true #消费者的偏移量是否在后台自动提交,默认为 true
      auto-commit-interval: 5000 #如果enable.auto.commit=true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为 5000
      # 当 Kafka 中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,可选的值有 latest、earliest、exception、none,默认值为 latest
      # latest:重置为分区中最新的 offset(消费分区中新产生的数据)、earliest:重置为分区中最小的 offset
      auto-offset-reset: latest
      properties:
        session.timeout.ms: 180000 #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发 rebalance(重新平衡) 操作)
        request.timeout.ms: 120000 #消费请求超时时间
      max-poll-records: 5 #一次调用poll()时返回的最大记录数,即批量消费每次最多消费多少条消息,注意是最多,并不是必须满足数量后才消费.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

自定义分区MyKafkaPartitioner:

package com.zy.apachekafka.beans;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * kafka 自定义分区规则,一旦自定义了分区规则,就不会再走 kafka 默认的分区规则
 *
 * @author zy
 */
public class MyKafkaPartitioner implements Partitioner {
    /**
     * 计算给定记录的分区,发送消息到 kafka 服务器之前,都会先走这里进行计算目标分区,即将消息发送到具体的哪个分区
     *
     * @param topic      :主题名称
     * @param key        :要分区的键(如果没有键,则为null)
     * @param keyBytes   :要分区的序列化键(如果没有键,则为null)
     * @param value      :要分区的值或null,健可以有可无,值才是真正的消息内容
     * @param valueBytes :要分区的序列化值或null
     * @param cluster    :当前集群信息
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 返回的整数值就是表示生产者将消息发送到的分区
        // 具体的规则可以根据自身需要设置
        System.out.println("发送消息:" + value);
        System.out.println("指定分区为:" + 0);
        return 0;
    }

    /**
     * 在分区程序关闭时调用
     */
    @Override
    public void close() {

    }

    /**
     * 使用给定的键值对配置此类
     *
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

消费者定时器ConsumerTimer:

package com.zy.apachekafka.component;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.Set;

/**
 * 消费者定时器——定时开关消费者消费功能
 * 1、本类使用 @EnableScheduling 定时任务的方式开关消费者监听器,同理可以自己提供控制层接口,通过 http 的方式来开关。
 *
 * @author zy
 */
@Component
@EnableScheduling
@EnableAsync
public class ConsumerTimer {

    /**
     * 1{@link KafkaListener} 注解标注的方法会被注册在 KafkaListenerEndpointRegistry 中。
     * 2{@link KafkaListenerEndpointRegistry} 在 Spring IOC 容器中已经存在,可以直接取。
     */
    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    /**
     * 定时启动消费者监听器
     * <p>
     * MessageListenerContainer getListenerContainer(String id)
     * * 1、返回具有指定id的{@link MessageListenerContainer},如果不存在此类容器,则返回 null。
     * * 2、这个 id 就是 @KafkaListener 注解的 id 属性值
     * Set<String> getListenerContainerIds():获取所有的 KafkaListener 监听器 id
     * Collection<MessageListenerContainer> getListenerContainers():获取所有的 KafkaListener 监听器容器
     */
    @Scheduled(cron = "0 52 20 * * ? ")
    public void startListener() {
        Set<String> containerIds = kafkaListenerEndpointRegistry.getListenerContainerIds();
        containerIds.stream().forEach(item -> System.out.println("KafkaListener 消费者监听器:" + item));

        //boolean isRunning():检查此组件当前是否正在运行
        //void start():启动此组件,如果组件已在运行,则不应引发异常,配合 stop 方法使用,
        //void resume():如果暂停,在下一次轮询后恢复此容器,配合 pause 方法使用。
        kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").resume();
        //kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").start();
        System.out.println(LocalDateTime.now() + " 启动 kafka 消费者监听器:basicConsumer");
    }

    /**
     * 定时关闭/暂停消费者监听器
     * void pause():在下次轮询之前暂停此容器,配合 resume
     * void stop():以同步方式停止此组件/容器,如果组件未运行(尚未启动),则不应引发异常。配合 start 方法重新启动
     */
    @Scheduled(cron = "0 50 20 * * ? ")
    public void shutDownListener() {
        kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").pause();
        //kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").stop();
        System.out.println(LocalDateTime.now() + " 暂停 kafka 消费者监听器:basicConsumer");
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

下面该有消费和生产消息:
消费者 · 接收消息.KafkaConsumer:

package com.zy.apachekafka.controller;

import cn.hutool.core.exceptions.ExceptionUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * Kafka 消费者 · 接收消息.
 * 1、topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics = {"${my.kafka.topic-name}"})
 * 2、系统中定义了消费者(@KafkaListener)时,启动服务后,如果连不上kafka服务器则会输出大量的警告日志,但是不会报错。
 * 不是每个环境都启动了kafka服务,所以当没有配置消费者组id的时候,本类不交由Spring容器初始化,不再监听消息。
 *
 * @author zy
 */
@Component
@ConditionalOnProperty(prefix = "spring.kafka.consumer", name = "group-id")
public class KafkaConsumer {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);

    /**
     * 监听指定主题上的消息,topics 属性是一个字符串数组,可以监听多个主题。
     * * id :用于唯一标识此消费者监听器,不同方法上此注解的id必须唯一,不设置时,会自动生成
     * * topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics = {"${my.kafka.topic-name}"})
     *
     * @param record :消息记录对象,包含消息正文、主题名、分区号、偏移量、时间戳等等
     */
    @KafkaListener(id = "basicConsumer", topics = {"car-infos", "basic-info", "helloWorld", "bgt.basic.agency.frame.topic"})
    public void messageListener1(ConsumerRecord<?, ?> record) {
        /**
         * headers:消息头信息
         * offset:此记录在相应的 Kafka 分区中的位置。
         * partition:记录所在的分区
         * serializedKeySize:序列化的未压缩密钥的大小(以字节为单位),如果 key为 null,则返回的大小为 -1
         * serializedValueSize:序列化的未压缩值(消息正文)的大小(以字节为单位,record.value().getBytes().length)。如果值为 null,则返回的大小为 -1
         * timestamp:记录的时间戳
         * TimestampType:记录的时间戳类型
         * topic:接收此记录的主题
         * value:消息内容
         */
        Headers headers = record.headers();
        long offset = record.offset();
        int partition = record.partition();
        int serializedKeySize = record.serializedKeySize();
        int serializedValueSize = record.serializedValueSize();
        long timestamp = record.timestamp();
        TimestampType timestampType = record.timestampType();
        String topic = record.topic();
        Object value = record.value();

        System.out.println("收到消息:");
        System.out.println("\theaders=" + headers);
        System.out.println("\toffset=" + offset);
        System.out.println("\tpartition=" + partition);
        System.out.println("\tserializedKeySize=" + serializedKeySize);
        System.out.println("\tserializedValueSize=" + serializedValueSize);
        System.out.println("\ttimestamp=" + timestamp);
        System.out.println("\ttimestampType=" + timestampType);
        System.out.println("\ttopic=" + topic);
        System.out.println("\tvalue=" + value);
    }

    /**
     * 批量消费时,必须使用 List 接收,否则会抛异常。
     * 即如果配置文件配置的是批量消费(spring.kafka.listener.type=batch),则监听时必须使用 list 接收
     * 反之如果配置是单条消息消费,则不能使用 list 接收,否则也会异常.
     *
     * @param records
     */
    @KafkaListener(topics = "batch-msg")
    public void messageListener2(List<ConsumerRecord<?, ?>> records) {
        System.out.println(">>>批量消费返回条数,records.size()=" + records.size());
        int count = 0;
        for (ConsumerRecord<?, ?> record : records) {
            System.out.println("\t消息" + (++count) + ":" + record.value());
        }
    }

    /**
     * 消费消息并转换。SendTo 可以标注在类上,此时对类中的所有方法有效,方法的返回值表示转发的消息内容。
     *
     * @param record
     * @return
     */
    @KafkaListener(topics = {"sendTo"})
    @SendTo("car-infos")
    public String messageListener3(ConsumerRecord<?, ?> record) {
        System.out.println("消费单条消费并转发:" + record.value() + "," + record.timestamp());
        return record.value().toString();
    }

    /**
     * 单位一体化编码与名称更正消息监听
     * 约定更正接口返回结果监听的主题为:basic.kafka.syncAgencyStatInfo.reply
     *
     * @param recordList
     */
    @KafkaListener(topics = {"${app.kafka.topics.agency:topic3}"})
    public void syncAgencyStatInfoMsgListener(List<ConsumerRecord<String, String>> recordList) {
        for (ConsumerRecord<String, String> record : recordList) {
            log.info("监听单位一体化编码与名称更正消息:{}", record);
            try {
                System.out.println("消息处理.....");
            } catch (Exception e) {
                log.error("单位一体化编码与名称更正消息消费失败:{}", ExceptionUtil.getMessage(e));
            }
        }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120

生产者KafkaProducer:

package com.zy.apachekafka.controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * kafka 生产者 · 发送消息
 *
 * @author zy
 */
@RestController
public class KafkaProducer {

    private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    /**
     * {@link KafkaAutoConfiguration} 中会自动根据 {@link KafkaProperties} 配置属性读取配置,
     * 然后将 {@link KafkaTemplate} 模板添加到 Spring 容器中,所以这里直接获取使用即可。
     */
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsg?topic=car-infos
     * <p>
     * 1、send(String topic, @Nullable V data):向指定主题发送消息,如果 topic 不存在,则自动创建,
     * 但是创建的主题默认只有一个分区 - PartitionCount: 1、分区也没有副本 - ReplicationFactor: 1,1表示自身。
     * 2、send 方法默认是异步的,主线程会直接继续向后运行,想要获取发送结果是否成功,请添加回调方法 addCallback。
     * [WARN ][org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:241)]:[Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
     * [ERROR][org.springframework.kafka.support.LoggingProducerListener.onError(LoggingProducerListener.java:76)]:Exception thrown when sending a message with key='xxx' and payload='xxx' to topic bgt.basic.agency.frame.topic:
     * 3、send().get() 可以同步阻塞主线程直到获取执行结果,或者执行超时抛出异常.
     * java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException:
     * Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
     *
     * @param topic   :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
     * @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
     * @return
     */
    @PostMapping("kafka/sendMsg")
    @Transactional(rollbackFor = Exception.class)
    public Map<String, Object> sendMessage(@RequestParam String topic, @RequestBody Map<String, Object> message) {
        logger.info("向指定主题发送信息,topic={},message={}", topic, message);
        try {
            String valueAsString = new ObjectMapper().writeValueAsString(message);
            // 异步
            // kafkaTemplate.send(topic, valueAsString);
            // 同步:get() 获取执行结果,此时线程将阻塞,等待执行结果
            SendResult<String, Object> sendResult = kafkaTemplate.send(topic, valueAsString).get();
            sendResult.toString();
            message.put("sendResult", sendResult.toString());
            // org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
        } catch (Exception e) {
            // 异步发送时子线程中的异常是不会进入这里的,只有同步发送时,主线程阻塞,发送是吧,抛出异常时,才会进入这里。
            e.printStackTrace();
        }
        return message;
    }

    /**
     * 向默认主题(default-topic)发送消息:http://localhost:8080/kafka/sendMsgDefault
     * 默认主题由 spring.kafka.template.default-topic 选项进行配置
     *
     * @param message :待发送的消息,如:{"version":2,"text":"后日凌晨三点执行任务,不得有误"}
     * @return
     */
    @PostMapping("kafka/sendMsgDefault")
    @Transactional(rollbackFor = Exception.class)
    public Map<String, Object> sendMsgDefault(@RequestBody Map<String, Object> message) {
        logger.info("向默认主题发送信息,topic={},topic={}", kafkaTemplate.getDefaultTopic(), message);
        try {
            String valueAsString = new ObjectMapper().writeValueAsString(message);
            kafkaTemplate.sendDefault(valueAsString);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return message;
    }

    /**
     * 异步回调写法 1
     * 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
     * http://localhost:8080/kafka/sendMsgCallback?topic=car-infos
     * 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程。
     *
     * @param topic   :car-infos
     * @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
     * @return
     */
    @PostMapping("kafka/sendMsgCallback")
    @Transactional(rollbackFor = Exception.class)
    public Map<String, Object> sendMessageCallback(@RequestParam String topic,
        @RequestBody Map<String, Object> message) {
        try {
            String valueAsString = new ObjectMapper().writeValueAsString(message);
            /**
             * addCallback:添加成功或者失败的异步回调
             * {@link SuccessCallback}:是发送成功回调,函数式接口,其中的方法参数为 {@link SendResult},表示发送结果
             * {@link FailureCallback}:是发送失败回调,函数式接口,其中的方法参数为 Throwable,表示异常对象
             */
            kafkaTemplate.send(topic, valueAsString).addCallback(success -> {
                String topic2 = success.getRecordMetadata().topic();
                int partition = success.getRecordMetadata().partition();
                long offset = success.getRecordMetadata().offset();
                logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);
            }, failure -> {
                logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
                logger.warn("保存到数据库中,后期再做处理.");
            });
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);
        return message;
    }

    /**
     * 异步回调写法 2
     * 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
     * http://localhost:8080/kafka/sendMsgCallback2?topic=helloWorld
     * 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程,主线程会直接运行过去。
     *
     * @param topic   :helloWorld
     * @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
     * @return
     */
    @PostMapping("kafka/sendMsgCallback2")
    @Transactional(rollbackFor = Exception.class)
    public Map<String, Object> sendMessageCallback2(@RequestParam String topic,
        @RequestBody Map<String, Object> message) {
        try {
            String valueAsString = new ObjectMapper().writeValueAsString(message);
            /**
             * ListenableFutureCallback 接口继承了 {@link SuccessCallback}{@link FailureCallback} 函数式接口
             * 重写方法即可
             */
            kafkaTemplate.send(topic, valueAsString).addCallback(
                new ListenableFutureCallback<SendResult<String, Object>>() {
                    @Override
                    public void onSuccess(SendResult<String, Object> success) {
                        int partition = success.getRecordMetadata().partition();
                        long offset = success.getRecordMetadata().offset();
                        String topic2 = success.getRecordMetadata().topic();
                        logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);
                    }

                    @Override
                    public void onFailure(Throwable failure) {
                        logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
                        logger.warn("保存到数据库中,后期再做处理.");
                    }
                });
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);
        return message;
    }

    /**
     * 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsgTransactional1?topic=car-infos
     * 与 springframework 框架的事务整合到一起,此时异常处理完全和平时一样.
     *
     * @param topic   :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
     * @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
     * @return
     */
    @PostMapping("kafka/sendMsgTransactional1")
    @Transactional(rollbackFor = Exception.class)
    public Map<String, Object> sendMessageTransactional1(@RequestParam String topic,
        @RequestBody Map<String, Object> message) {
        try {
            logger.info("向指定主题发送信息,带事务管理,topic={},message={}", topic, message);
            String msg = new ObjectMapper().writeValueAsString(message);
            ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topic, msg);
            if ("110".equals(message.get("version").toString())) {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(1 / 0);
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return message;
    }

    /**
     * http://localhost:8080/kafka/sendMsgTransactional2?topic=car-infos
     * 生成者发送消息事务管理方式2:使用 executeInTransaction(OperationsCallback<K, V, T> callback)
     * executeInTransaction:表示执行本地事务,不参与全局事务(如果存在),即方法内部和外部是分离的,只要内部不
     * 发生异常,消息就会发送,与外部无关,即使外部有 @Transactional 注解也不影响消息发送,此时外围有没有 @Transactional 都一样。
     *
     * @param topic
     * @param message
     * @return
     */
    @PostMapping("kafka/sendMsgTransactional2")
    public Map<String, Object> sendMessageTransactional2(@RequestParam String topic,
        @RequestBody Map<String, Object> message) {
        try {
            logger.info("向指定主题发送信息,带事务管理:topic={},message={}", topic, message);
            String msg = new ObjectMapper().writeValueAsString(message);

            /**
             * executeInTransaction 表示这些操作在本地事务中调用,不参与全局事务(如果存在)
             * 所以回调方法内部发生异常时,消息不会发生出去,但是方法外部发生异常不会回滚,即便是外围方法加了 @Transactional 也没用。
             */
            kafkaTemplate.executeInTransaction(operations -> {
                operations.send(topic, msg);
                if ("120".equals(message.get("version").toString())) {
                    System.out.println(1 / 0);
                }
                return null;
            });
            //如果在这里发生异常,则只要 executeInTransaction 里面不发生异常,它仍旧会发生消息成功
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return message;
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242

贴一份pom文件吧,现在随着依赖的增加,很多时候会出现依赖之间出现问题,而且还很难排错,,有一个idea插件可以安排(maven helper)

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.wmx</groupId>
    <artifactId>apache-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>apache-kafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- spring 整合的 apache kafka 消息队列依赖-->
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.7</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

如果有不明白的联系作者,一起学习

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/951098
推荐阅读
相关标签
  

闽ICP备14008679号