赞
踩
Debezium通常基于Kafka启动一个Kafka Connect服务,之后可以向Debezium提交Mysql、PostgresSQL等connector任务进行同步数据,数据被保存到Kafka中。分布式的Kafka Connect服务能提供容错性和可拓展性
但是我们有时不想部署一套Kafka集群和Debezium的Kafka Connect服务。而是通过一种更轻量级的方式,将Debezium的Mysql、PostgresSQL等connector任务直接集成到我们的Java/scala代码中,直接在代码中接收changelog数据,处理完成后发送到下游
这里只能使用1.5版本的,该版本支持Java1.8。因为从1.6版本开始,只支持Java11。参考debezium release的已经测试过的版本确定每个版本支持的Java和Mysql
<dependency> <groupId>io.debezium</groupId> <artifactId>debezium-api</artifactId> <version>1.5.4.Final</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-embedded</artifactId> <version>1.5.4.Final</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-mysql</artifactId> <version>1.5.4.Final</version> </dependency>
package org.mq.streamWarehouse.ODS.pulsar import io.debezium.engine.DebeziumEngine.{ChangeConsumer, CompletionCallback} import io.debezium.engine.format.Json import io.debezium.engine.{ChangeEvent, DebeziumEngine} import java.util import java.util.Properties import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import java.util.function.Consumer import scala.collection.JavaConverters.asScalaBufferConverter object MysqlDebeziumEngine { def main(args: Array[String]): Unit = { val props: Properties = new Properties() // engine的参数设置 props.setProperty("name", "engine") props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") props.setProperty("offset.storage.file.filename", "/root/offsets.log") props.setProperty("offset.flush.interval.ms", "6000") props.setProperty("converter.schemas.enable", "true") // mysql connector的参数设置 props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector") props.setProperty("database.hostname", "192.168.8.124") props.setProperty("database.port", "3306") props.setProperty("database.user", "hnmqet") props.setProperty("database.password", "hnmq123456") props.setProperty("database.server.id", "85744") props.setProperty("database.server.name", "my-app-connector") props.setProperty("database.include.list", "d_enforce,d_general") props.setProperty("snapshot.mode", "schema_only") props.setProperty("decimal.handling.mode", "double") props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory") props.setProperty("database.history.file.filename", "/root/dbhistory.log") try { // 创建engine。DebeziumEngine继承了Closeable,会自动关闭 val engine: DebeziumEngine[ChangeEvent[String, String]] = DebeziumEngine.create(classOf[Json]) .using(props) .notifying(new Consumer[ChangeEvent[String, String]] { override def accept(changeEvent: ChangeEvent[String, String]): Unit = { println(changeEvent.key()) println(changeEvent.value()) } }) .notifying( new ChangeConsumer[ChangeEvent[String, String]] { override def handleBatch(list: util.List[ChangeEvent[String, String]], recordCommitter: DebeziumEngine.RecordCommitter[ChangeEvent[String, String]]): Unit = { for (changeEvent <- list.asScala) { println(changeEvent.value()) recordCommitter.markProcessed(changeEvent) } recordCommitter.markBatchFinished() } } ) // 加上回调代码,查看错误信息 .using(new CompletionCallback { override def handle(success: Boolean, message: String, error: Throwable): Unit = { if (!success && error != null) { System.out.println("----------error------") System.out.println(message) error.printStackTrace() } } }) .build() // 异步执行engine val executor: ExecutorService = Executors.newSingleThreadExecutor() executor.execute(engine) // 优雅的关闭应用 executor.shutdown() // 执行shutdown,等待已经提交的任务执行完毕 // 持续监控任务是否完成,如果未完成,则继续等待 while (!executor.awaitTermination(10, TimeUnit.SECONDS)) { println("Waiting another 10 seconds for the embedded engine to shut down") } } catch { case e: InterruptedException => { Thread.currentThread().interrupt() } } } }
engine部分参数说明如下:
Mysql Connector部分参数说明如下:
创建engine部分说明:
运行engine部分说明:
Exactly-once实现说明:
import java.util.function.Consumer
import io.debezium.engine.ChangeEvent
.notifying(new Consumer[ChangeEvent[String, String]] {
override def accept(changeEvent: ChangeEvent[String, String]): Unit = {
println(changeEvent.key())
println(changeEvent.value())
}
})
问题场景:Mysql中的一列字段类型为Datetime,值为:2021-07-22 11:19:27。其对应的时间戳为:1626923967000。而Debezium解析出来的是Long类型的时间戳,值为1626952767000,对应的时间为:2021-07-22 19:19:27,比Mysql中的值多了8小时
原因:参考源码:debezium/debezium-core/src/main/java/io/debezium/time/Timestamp.java。Mysql数据库中设置的时区是UTC+8。Debezium默认将MySQL中datetime类型转成UTC的时间戳,而且没有提供参数进行修改。尝试通过官网提供的参数props.setProperty("serverTimezone", "UTC")
或props.setProperty("serverTimezone", "Asia/Shanghai")
进行修改,并没有效果,而且源码也没有看到该参数
解决办法:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。