当前位置:   article > 正文

Java/scala使用内置嵌入式embedded debezium全量和增量同步Mysql binlog数据_java debezium

java debezium

1. 背景

Debezium通常基于Kafka启动一个Kafka Connect服务,之后可以向Debezium提交Mysql、PostgresSQL等connector任务进行同步数据,数据被保存到Kafka中。分布式的Kafka Connect服务能提供容错性和可拓展性

但是我们有时不想部署一套Kafka集群和Debezium的Kafka Connect服务。而是通过一种更轻量级的方式,将Debezium的Mysql、PostgresSQL等connector任务直接集成到我们的Java/scala代码中,直接在代码中接收changelog数据,处理完成后发送到下游

2. 添加依赖

这里只能使用1.5版本的,该版本支持Java1.8。因为从1.6版本开始,只支持Java11。参考debezium release的已经测试过的版本确定每个版本支持的Java和Mysql

        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>1.5.4.Final</version>
        </dependency>

        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>1.5.4.Final</version>
        </dependency>

        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-mysql</artifactId>
            <version>1.5.4.Final</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3. 同步代码

package org.mq.streamWarehouse.ODS.pulsar

import io.debezium.engine.DebeziumEngine.{ChangeConsumer, CompletionCallback}
import io.debezium.engine.format.Json
import io.debezium.engine.{ChangeEvent, DebeziumEngine}

import java.util
import java.util.Properties
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.util.function.Consumer
import scala.collection.JavaConverters.asScalaBufferConverter

object MysqlDebeziumEngine {


  def main(args: Array[String]): Unit = {

    val props: Properties = new Properties()
    // engine的参数设置
    props.setProperty("name", "engine")
    props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
    props.setProperty("offset.storage.file.filename", "/root/offsets.log")
    props.setProperty("offset.flush.interval.ms", "6000")
    props.setProperty("converter.schemas.enable", "true")
    // mysql connector的参数设置
    props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector")
    props.setProperty("database.hostname", "192.168.8.124")
    props.setProperty("database.port", "3306")
    props.setProperty("database.user", "hnmqet")
    props.setProperty("database.password", "hnmq123456")
    props.setProperty("database.server.id", "85744")
    props.setProperty("database.server.name", "my-app-connector")
    props.setProperty("database.include.list", "d_enforce,d_general")
    props.setProperty("snapshot.mode", "schema_only")
    props.setProperty("decimal.handling.mode", "double")
    props.setProperty("database.history",
      "io.debezium.relational.history.FileDatabaseHistory")
    props.setProperty("database.history.file.filename",
      "/root/dbhistory.log")

    try {
      // 创建engine。DebeziumEngine继承了Closeable,会自动关闭
      val engine: DebeziumEngine[ChangeEvent[String, String]] =
        DebeziumEngine.create(classOf[Json])
          .using(props)
          .notifying(new Consumer[ChangeEvent[String, String]] {
            override def accept(changeEvent: ChangeEvent[String, String]): Unit = {

              println(changeEvent.key())
              println(changeEvent.value())
            }
          })
          .notifying(
            new ChangeConsumer[ChangeEvent[String, String]] {
              override def handleBatch(list: util.List[ChangeEvent[String, String]], recordCommitter: DebeziumEngine.RecordCommitter[ChangeEvent[String, String]]): Unit = {
                for (changeEvent <- list.asScala) {
                  println(changeEvent.value())

                  recordCommitter.markProcessed(changeEvent)
                }

                recordCommitter.markBatchFinished()
              }
            }
          )
          // 加上回调代码,查看错误信息
          .using(new CompletionCallback {
            override def handle(success: Boolean, message: String, error: Throwable): Unit = {
              if (!success && error != null) {
                System.out.println("----------error------")
                System.out.println(message)
                error.printStackTrace()
              }

            }
          })
          .build()

      // 异步执行engine
      val executor: ExecutorService = Executors.newSingleThreadExecutor()
      executor.execute(engine)

      // 优雅的关闭应用
      executor.shutdown() // 执行shutdown,等待已经提交的任务执行完毕
      // 持续监控任务是否完成,如果未完成,则继续等待
      while (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
        println("Waiting another 10 seconds for the embedded engine to shut down")
      }


    } catch {
      case e: InterruptedException => {
        Thread.currentThread().interrupt()
      }
    }


  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100

engine部分参数说明如下:

  • message接收的格式可以是:JSON, Avro、Kafka Connect服务的SourceRecord
  • DebeziumEngine的properties用于engine和mysql connector
  • 参数name:指定engine的自定义name。用于engine内部状态维护,和作为source records的一个字段
  • 参数converter.schemas.enable:output是否包含schema

Mysql Connector部分参数说明如下:

  • offset:mysql connector每处理一个record都会有一个offset,但是engine定期将offset的数据flush到文件中。以便下次重启application的时候,从保存的offset位置开始同步
  • 参数connector.class:继承自Kafka Connect的org.apache.kafka.connect.source.SourceConnector抽象类
  • 用于同步的Mysql用户需要的同步全量snapshot权限:SELECT、RELOAD、SHOW DATABASES,用于增量同步binlog的权限:REPLICATION SLAVE、REPLICATION CLIENT
  • 参数database.server.id:MySqlConnector实例相当于Mysql的slave。需要在MySQL server group中保持唯一,范围为: 1   2 32 − 1 1~2^{32}-1 1 2321
  • 参数database.server.name:为Mysql的master自定义一个名称。将作为source records的一个字段
  • 参数table.include.list:指定同步的数据库有哪些。多个逗号分隔
  • 参数snapshot.mode:默认是initial,表示同步snapshot后,再同步binlog。也可以指定为schema-only,先同步所有表的schema,再从最新的binlog position开始同步
  • 参数decimal.handling.mode:默认解析出来的含字母的字符串。设置connector将decimal解析成double
  • 参数database.history:用于记录Mysql数据库的schema变更,以便能够正确的decode这些change events

创建engine部分说明:

  • notifying中的参数是一个java.util.function.Consumer形式的lambada表达式,用于处理record。record的数据类型,是DebeziumEngine.create(classOf[Json])定义的数据类型。在该Consumer中不能抛出异常给Consumer。如果抛出异常给Consumer, engine将会记录该异常,继续处理下一个record,会导致Mysql Server端和同步目标数据库的数据不一致

运行engine部分说明:

  • DebeziumEngine需要通过Executor或ExecutorService异步执行

Exactly-once实现说明:

  • offset.flush.interval.ms设置为0
  • notifying中对Consumer进行实现,不进行批处理,来一条处理一条,如下:
import java.util.function.Consumer
import io.debezium.engine.ChangeEvent

          .notifying(new Consumer[ChangeEvent[String, String]] {
            override def accept(changeEvent: ChangeEvent[String, String]): Unit = {
              
              println(changeEvent.key())
              println(changeEvent.value())
            }
          })
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

4. 同步问题

4.1 Mysql Datetime类型解析多了8小时

问题场景:Mysql中的一列字段类型为Datetime,值为:2021-07-22 11:19:27。其对应的时间戳为:1626923967000。而Debezium解析出来的是Long类型的时间戳,值为1626952767000,对应的时间为:2021-07-22 19:19:27,比Mysql中的值多了8小时

原因:参考源码:debezium/debezium-core/src/main/java/io/debezium/time/Timestamp.java。Mysql数据库中设置的时区是UTC+8。Debezium默认将MySQL中datetime类型转成UTC的时间戳,而且没有提供参数进行修改。尝试通过官网提供的参数props.setProperty("serverTimezone", "UTC")props.setProperty("serverTimezone", "Asia/Shanghai")进行修改,并没有效果,而且源码也没有看到该参数

解决办法:

  1. 修改源码
  2. 在自己的代码中对解析出来的Long类型时间戳,减8小时
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/477774?sit
推荐阅读
相关标签
  

闽ICP备14008679号