当前位置:   article > 正文

Springboot使用kafka事务-生产者方_chainedkafkatransactionmanager

chainedkafkatransactionmanager

前言

在上一篇文章中,我们使用了springboot的AOP功能实现了kafka的分布式事务,但是那样实现的kafka事务是不完美的,因为请求进来之后分配的是不同线程,但不同线程使用的kafka事务却是同一个,这样会造成多请求情况下的事务失效。

而解决这个问题的方法,就是每个线程都使用一个新的事务生产者去发送一条新的事务消息,然后这个事务还要和当前线程进行绑定,实现不同线程之间的事务隔离。

通常来说,这个繁杂的过程虽然我们可以实现,但是始终没有框架研发者做的那么完美,所以,我们首先要去看一下框架的作者有没有实现这个功能。

幸运地是,上述功能在kafka之中是有实现的,而且首次实现的时间是在2017年,所以我们可以直接使用作者提供的基于springboot的事务管理功能。

注入kafka事务

在springboot中启用kafka的事务,有两种方式,第一种方式为使用springboot提供的自动配置,第二种是自己往容器中注入。

方式一:springboot自动注入

想要使用自动注入,我们只需要在配置文件中加入transaction-id-prefix即可,配置文件如下:

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      #bootstrap-servers: localhost:9010
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      transaction-id-prefix: test
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

这样配置之后,就开启了kafka的事务。

方式一弊端

这样虽然可以直接使用springboot自动装配功能,但是却有下面两个弊端

  • 只能使用一个kafka的集群地址
  • 全局开启了事务,有的方法并不需要全局开启事务
    所以一旦有多个kafka的地址需要配置,或者只想让部分方法使用事务,那么就可以使用第二种方法来解决,那就是自己往容器里面添加kafka的事务管理器。

方式二:向spring容器中添加自定义kafka事务管理器

在kafka事务管理器中,有三个重要的对象,分别是ProducerFactory、KafkaTemplate、KafkaTransactionManager,他们的作用如下:

  • ProducerFactory,用来创建kafka的生产者对象
  • KafkaTemplate,springboot封装的kafka模版
  • KafkaTransactionManager,kafka的事务管理器
    想要往spring容器中添加自定义的kafka事务管理器,其实就是添加一个自定义的KafkaTransactionManager对象,那么我们只需要想办法构造一个KafkaTransactionManager就好。

利用springboot的配置类,我们能很轻松的做到这一点。
第一步,构造一个配置类KafkaAndDataTransactionConfig,加上@Configuration注解。

@Configuration
public class KafkaAndDataTransactionConfig {
}
  • 1
  • 2
  • 3

第二步,构建一个ProducerFactory对象的Bean,交给spring容器。

	@Resource
    NacosDiscoveryProperties nacosDiscoveryProperties;
	/**
     * 注入一个kafka生产者,这个生产者的transactional.id自定义,避免导致多个生产者的事务id相同
     * @param props yaml文件中的定义属性
     */
    @Bean
    ProducerFactory<String, String> pf1(KafkaProperties props) {
        Map<String, Object> pProps = props.buildProducerProperties();
        pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "product-transactional-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());
        pProps.put(ProducerConfig.CLIENT_ID_CONFIG, "product-client-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());
        return new DefaultKafkaProducerFactory<>(pProps);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

注意其中的nacosDiscoveryProperties变量,这是用来获取实例在nacos中的ip地址,因为在多实例的情况下需要保证每一个事务id的唯一,才不会被kafka的事务管理器识别为失效事务生产者,从而导致事务冲突失效。
第三步,创建一个KafkaTransactionManager对象的Bean,添加到spring容器。

	/**
     * 注入一个kafka事务管理器,这个事务管理器使用事务id
     * @param pf1
     * @return
     */
    @Bean
    KafkaTransactionManager<String, String> kafkaTransactionManagerWithTxId(ProducerFactory<String, String> pf1) {
        return new KafkaTransactionManager<>(pf1);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

只需要将创建好的生产者bean,作为构造参数传入即可。
通过以上三步,我们就得到了一个支持事务的kafka事务管理器了,不过,此时我们还少创建了一个KafkaTemplate,没有这个对象我们将完不成事务发送的管控。

第四步,创建KafkaTemplate

	/**
     * 注入一个使用事务id的kafkaTemplate,这个kafkaTemplate可以使用事务
     * @param pf1
     * @return
     */
    @Bean
    KafkaTemplate<String, String> kafkaTemplateWithTxId(ProducerFactory<String, String> pf1) {
        return new KafkaTemplate<>(pf1);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

经过以上代码,我们就得到了一个完整的kafka事务管理器了。
全部代码如下:

@Configuration
public class KafkaAndDataTransactionConfig {
	@Resource
    NacosDiscoveryProperties nacosDiscoveryProperties;
	/**
     * 注入一个kafka生产者,这个生产者的transactional.id自定义,避免导致多个生产者的事务id相同
     * @param props yaml文件中的定义属性
     */
    @Bean
    ProducerFactory<String, String> pf1(KafkaProperties props) {
        Map<String, Object> pProps = props.buildProducerProperties();
        pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "product-transactional-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());
        pProps.put(ProducerConfig.CLIENT_ID_CONFIG, "product-client-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());
        return new DefaultKafkaProducerFactory<>(pProps);
	/**
     * 注入一个kafka事务管理器,这个事务管理器使用事务id
     * @param pf1
     * @return
     */
    @Bean
    KafkaTransactionManager<String, String> kafkaTransactionManagerWithTxId(ProducerFactory<String, String> pf1) {
        return new KafkaTransactionManager<>(pf1);
    }
    /**
     * 注入一个使用事务id的kafkaTemplate,这个kafkaTemplate可以使用事务
     * @param pf1
     * @return
     */
    @Bean
    KafkaTemplate<String, String> kafkaTemplateWithTxId(ProducerFactory<String, String> pf1) {
        return new KafkaTemplate<>(pf1);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

增加DataSourceTransaction事务管理器

默认情况,DataSourceTransaction事务管理器springboot会帮我们自动配置,但是在使用了kafka的事务之后,会存在一个类的加载冲突,导致DataSourceTransaction没有被springboot自动加载到,所以我们还需要自己将DataSourceTransaction事务管理加入进来。
在上面的代码中,再加入以下代码

	//构造器注入DataSource和transactionManagerCustomizers
	private final DataSource dataSource;
    private final TransactionManagerCustomizers transactionManagerCustomizers;
	KafkaAndDataTransactionConfig(DataSource dataSource,
                      ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
        this.dataSource = dataSource;
        this.transactionManagerCustomizers = transactionManagerCustomizers.getIfAvailable();
    }
	/**
     * @Bean 去掉了ConditionalOnMissingBean 避免注入了kafka事务管理器后,springboot不再注入DataSourceTransactionManager
     * @Primary  作为主事务管理器,这样在使用@Transactional时,就会使用DataSourceTransactionManager
     * @param properties
     * @return
     */
    @Bean
    @Primary
    public DataSourceTransactionManager dstm(DataSourceProperties properties) {
        DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(this.dataSource);
        if (this.transactionManagerCustomizers != null) {
            this.transactionManagerCustomizers.customize(transactionManager);
        }
        return transactionManager;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

增加ChainedKafkaTransactionManager管理器

在实际开发中,有时候一个方法需要既支持kafka的事务,又需要支持JDBC的事务,这个时候为了兼容两者的事务,我们需要将两者的事务放到同一个事务管理器中,让他们两个构成一个事务。kafka的作者为我们提供了ChainedKafkaTransactionManager这个对象,来支持这个操作,只需要加入以下代码即可

	 //多个事务管理器构成一个事务,使用ChainedKafkaTransactionManager管理,是因为可以自动偏移kafka事务给消费者
	@Bean 
    public ChainedKafkaTransactionManager kafkaAndDataSourceTransactionManager(DataSourceTransactionManager transactionManager,
                                                                        @Autowired @Qualifier("kafkaTransactionManagerWithTxId") KafkaTransactionManager<?, ?> kafkaTransactionManager){
        return new ChainedKafkaTransactionManager<>(transactionManager, kafkaTransactionManager);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

以上,就是kafka集成springboot的方案,接下来,看看怎么使用

使用

基于以上的配置,一共有三种使用方式

  • 只使用kafka事务
  • 只使用JDBC事务
  • 同时使用kafka和JDBC事务

针对于上面的三种情况的切换,其实就是使用不同Transactional注解中的value值切换不同的事务管理器,事务的指定都在service层的实现类中。

只使用kafka事务

	//指定事务模版为自定义模版
    @Resource(name = "kafkaTemplateWithTxId")
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Transactional(rollbackFor = Exception.class,value = "kafkaAndDataSourceTransactionManager")
    public void transation() {
        ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("test-topic", "test");
        kafkaTemplate.send(stringStringProducerRecord);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

只使用JDBC事务

不需要指定任何的事务管理器

	@Override
    @Transactional(rollbackFor = Exception.class)
    public void transationOfJdbc() {
        xxxService.update(user);
    }
  • 1
  • 2
  • 3
  • 4
  • 5

同时使用kafka和JDBC事务

指定自定义的事务管理器

	//指定事务模版为自定义模版
    @Resource(name = "kafkaTemplateWithTxId")
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Transactional(rollbackFor = Exception.class,value = "kafkaAndDataSourceTransactionManager")
    public void transationAll() {
	
        xxxService.update(user);
        spreadMonitorService.sendMsg();
        ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("test-topic", "test");
        kafkaTemplate.send(stringStringProducerRecord);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

结语

以上,就是在springboot中生产端实现事务的方法,总结一下,一共分为以下几步

  • 增加kafka事务管理器
  • 增加JDBC事务管理器
  • 增加事务链事务管理器
  • 使用三种事务管理器

下一篇,将写springboot中消费端如何配置。


引用资料:
kafka官网
kafka的github
spring-kafka官网

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/524837
推荐阅读
相关标签
  

闽ICP备14008679号