当前位置:   article > 正文

flink的Oracle-cdc如何实现_flink oracle cdc

flink oracle cdc

标题: flink的Oracle-cdc如何实现
日期: 2021-08-08 14:33:39
标签: [flink, oracle]
分类: 实时数仓

今天来说说如何实时接入oracle的数据,官方没有提供oracle-cdc的connector,那么我们现在要自己写connector,且看下文。

flink connector

且看阿里提供的数据接入connector有哪些:mysql-cdc postgres-cdc
官档在这里:
mysql-cdc
postgres-cdc
依赖:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>1.1.0</version>
</dependency>

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-connector-postgres-cdc</artifactId>
  <version>1.1.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

使用方法:

-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3)
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'flinkuser',
 'password' = 'flinkpw',
 'database-name' = 'inventory',
 'table-name' = 'products'
);

-- register a PostgreSQL table 'shipments' in Flink SQL
CREATE TABLE shipments (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'localhost',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'shipments'
);

-- read snapshot and binlogs from shipments table
SELECT * FROM shipments;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

这样就直接将数据接入到flink了,剩下的就看你怎么操作flink了。

oracle怎么实时接入呢?

第一种方法 debezium connector

官方提供了一个debezium connector,它是干嘛的?
它是一个cdc工具,将关系型数据库的增删改数据实时获取到kafka中的,支持mysql、postgres、mongodb,当然oracle也支持,它支持大部分的关系型数据库,也支持非关系型数据库。
然后我们通过kafka connector的debezium-json格式化方式来接受debezium获取的实时数据,不就可以将增删改同步到hbase了么,完美。

依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro-confluent-registry</artifactId>
  <version>1.13.0</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.13.0</version>
</dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

看代码:

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 -- using 'debezium-json' as the format to interpret Debezium JSON messages
 -- please use 'debezium-avro-confluent' if Debezium encodes messages in Avro format
 'format' = 'debezium-json'
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

第二种办法 自定义format

但是这种方式需要自己手动搭建一套debezium服务,配置oracle也是手动的,有没有像mysql-cdc一样的办法呢,直接一步到位。

办法是有的,我们知道oracle自带有实时数据获取工具,叫OGG,这个ogg能将oracle的增删改数据,实时同步到kafka,实现的结果跟debezium是一致的,但是比debezium简单,因为ogg是oracle自带的,你只需要配置相应的服务就行。
但是ogg到kafka的数据格式是一个json,这个json带有的数据增删改标识,跟flink changelog对应的增删改标识是不一样的,这样的话,flink就不能直接使用了,怎么办?

flink提供了自定义format接口,自己实现ogg format,将ogg kafka数据转换为flink能够认识的changelog 格式,这样就可以直接对接hbase connector,实现hbase的增删改,同步更新了。

关于如何实现ogg format,看官档给出的例子,我总是强调,没事就看官档,明了而且详细,比路边上的野文档强多了,而且实用。
这里举了一个changelog-csv format的例子,我们也可以自定义一个ogg format:
自定义changelog-csv format

这种方法比搭建debezium方法稍微简单,而且轻量级,不需要维护debezium了。

好了,今天就是这样的,官方的oracle-cdc正在测试中,相信在不久的将来就会出现了,大家敬请期待。

我是程序员Lee,欢迎关注我的博客,我在上海,可以互相交流。


每天进步一点点,我迟早会秃头。

欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,所以做了这1个公众号:
程序员写书
喜欢宠物的朋友可以关注:看一只流浪狗的逆袭流浪狗逆袭

一起学习,一起进步。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/473015?site
推荐阅读
相关标签
  

闽ICP备14008679号