当前位置:   article > 正文

Flink CDC实时抽取Oracle数据存入ES_oracle数据写入到es中

oracle数据写入到es中

目录

1 实践环境

2 下载Flink 和所需要的依赖包

3 准备数据

4 启动 Flink 集群和 Flink SQL CLI

5 在 Flink SQL CLI 中使用 Flink DDL 创建表 

6 关联订单数据并且将其写入 Elasticsearch 中

7 在 Oracle 制造一些变更,观察 ElasticSearch 中的结果


1 实践环境

Flink:1.13.5

Oralce:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0

Elasticsearch:7.6.0

Kibana:7.6.0

flink-sql-connector-oracle-cdc-2.1.1.jar

flink-sql-connector-elasticsearch7_2.11-1.13.2.jar

  • Oracle: Oracle 11g, 已经预先创建了 products 和 orders表,并插入了一些数据。

  • Elasticsearch: orders 表将和 products 表进行join,join的结果写入Elasticsearch中。

  • Kibana: 可视化 Elasticsearch 中的数据。

2 下载Flink 和所需要的依赖包

  1. 下载Flink

    下载flink 1.13.5 并将其解压至目录flink-1.13.5
  2. 下载依赖包

    下载以下两个jar包到<FLINK_HOME>/lib/目录下:

    flink-sql-connector-oracle-cdc-2.1.1.jar

    flink-sql-connector-elasticsearch7_2.11-1.13.2.jar

3 准备数据

  1. --创建表products
  2. CREATE TABLE products (
  3. id NUMBER(10) NOT NULL constraint pk_id PRIMARY KEY,
  4. name VARCHAR(255) NOT NULL,
  5. description VARCHAR(512)
  6. );
  7. --创建序列seq_products_id
  8. create sequence seq_products_id
  9. minvalue 101
  10. nomaxvalue
  11. start with 101
  12. increment by 1
  13. nocycle;
  14. --创建触发器tr_products,如果insert语句不指定ID自动插入增长值
  15. CREATE OR REPLACE TRIGGER tr_products
  16. BEFORE INSERT ON products FOR EACH ROW WHEN (new.id is null)
  17. begin
  18. select seq_products_id.nextval into:new.id from dual;
  19. end;
  20. --products表插入数据
  21. INSERT INTO products(NAME,DESCRIPTION) VALUES ('scooter','Small 2-wheel scooter');
  22. INSERT INTO products(NAME,DESCRIPTION) VALUES ('car battery','12V car battery');
  23. INSERT INTO products(NAME,DESCRIPTION) VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3');
  24. INSERT INTO products(NAME,DESCRIPTION) VALUES ('hammer','12oz carpenters hammer');
  25. INSERT INTO products(NAME,DESCRIPTION) VALUES ('hammer','14oz carpenters hammer');
  26. INSERT INTO products(NAME,DESCRIPTION) VALUES ('hammer','16oz carpenters hammer');
  27. INSERT INTO products(NAME,DESCRIPTION) VALUES ('rocks','box of assorted rocks');
  28. INSERT INTO products(NAME,DESCRIPTION) VALUES ('jacket','water resistent black wind breaker');
  29. INSERT INTO products(NAME,DESCRIPTION) VALUES ('spare tire','24 inch spare tire');
  30. --创建表orders
  31. CREATE TABLE orders (
  32. order_id NUMBER(10) NOT NULL constraint pk_orders_id PRIMARY KEY,
  33. order_date DATE NOT NULL,
  34. customer_name VARCHAR(255) NOT NULL,
  35. price NUMBER(10, 5) NOT NULL,
  36. product_id NUMBER(10) NOT NULL,
  37. order_status VARCHAR(10) NOT NULL
  38. );
  39. --创建序列seq_orders_id
  40. create sequence seq_orders_id
  41. minvalue 10001
  42. nomaxvalue
  43. start with 10001
  44. increment by 1
  45. nocycle;
  46. --创建触发器tr_orders,如果insert语句不指定ID自动插入增长值
  47. CREATE OR REPLACE TRIGGER tr_orders
  48. BEFORE INSERT ON orders FOR EACH ROW WHEN (new.order_id is null)
  49. begin
  50. select seq_orders_id.nextval into:new.order_id from dual;
  51. end;
  52. --orders表插入数据
  53. INSERT INTO orders(ORDER_DATE,CUSTOMER_NAME,PRICE,PRODUCT_ID,ORDER_STATUS) VALUES (to_date('2020-07-30 10:08:22','yyyy-mm-dd,hh24:mi:ss'), 'Jark', 50.50, 102, 'false');
  54. INSERT INTO orders(ORDER_DATE,CUSTOMER_NAME,PRICE,PRODUCT_ID,ORDER_STATUS) VALUES (to_date('2020-07-30 10:11:09','yyyy-mm-dd,hh24:mi:ss'), 'Sally', 15.00, 105, 'false');
  55. INSERT INTO orders(ORDER_DATE,CUSTOMER_NAME,PRICE,PRODUCT_ID,ORDER_STATUS) VALUES (to_date('2020-07-30 12:00:30','yyyy-mm-dd,hh24:mi:ss'), 'Edward', 25.25, 106, 'false');

4 启动 Flink 集群和 Flink SQL CLI

  1. 使用下面的命令跳转至 Flink 目录下

    cd flink-1.13.5
    
  2. 使用下面的命令启动 Flink 集群

    ./bin/start-cluster.sh
    

    启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

3. 使用下面的命令启动 Flink SQL CLI

   ./bin/sql-client.sh

启动成功后,可以看到如下的页面:

5 在 Flink SQL CLI 中使用 Flink DDL 创建表 

1. 开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

2. 对于数据库中的表 productsorders, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

  1. -- Flink SQL
  2. Flink SQL> CREATE TABLE products (
  3.     ID INT,
  4.     NAME STRING,
  5.     DESCRIPTION STRING,
  6.     PRIMARY KEY (ID) NOT ENFORCED
  7.   ) WITH (
  8.     'connector' = 'oracle-cdc',
  9.     'hostname' = '192.168.50.3',
  10.     'port' = '1521',
  11.     'username' = 'flinkuser',
  12.     'password' = 'gtis',
  13.     'database-name' = 'ORCL',
  14.     'schema-name' = 'flinkuser',  
  15.     'table-name' = 'products',
  16.     'debezium.database.tablename.case.insensitive'='false',
  17.     'debezium.log.mining.strategy'='online_catalog',
  18.     'debezium.log.mining.continuous.mine'='true'
  19.   );
  20. Flink SQL>CREATE TABLE orders (
  21. ORDER_ID INT,
  22. ORDER_DATE TIMESTAMP(3),
  23. CUSTOMER_NAME STRING,
  24. PRICE DECIMAL(10, 5),
  25. PRODUCT_ID INT,
  26. ORDER_STATUS BOOLEAN
  27. ) WITH (
  28. 'connector' = 'oracle-cdc',
  29. 'hostname' = '192.168.50.3',
  30. 'port' = '1521',
  31. 'username' = 'flinkuser',
  32. 'password' = 'gtis',
  33. 'database-name' = 'ORCL',
  34. 'schema-name' = 'flinkuser',
  35. 'table-name' = 'orders',
  36. 'debezium.database.tablename.case.insensitive'='false',
  37. 'debezium.log.mining.strategy'='online_catalog',
  38. 'debezium.log.mining.continuous.mine'='true'
  39. );

3. 创建 enriched_orders_orcl 表, 用来将关联后的订单数据写入 Elasticsearch 中

  1. -- Flink SQL
  2. Flink SQL>CREATE TABLE enriched_orders_orcl (
  3.    ORDER_ID INT,
  4.    ORDER_DATE TIMESTAMP(3),
  5.    CUSTOMER_NAME STRING,
  6.    PRICE DECIMAL(10, 5),
  7.    PRODUCT_ID INT,
  8.    ORDER_STATUS BOOLEAN,
  9.    PRODUCT_NAME STRING,
  10.    PRODUCT_DESCRIPTION STRING,
  11.    PRIMARY KEY (ORDER_ID) NOT ENFORCED
  12.  ) WITH (
  13.      'connector' = 'elasticsearch-7',
  14.      'hosts' = 'http://192.168.50.3:9200',
  15.      'index' = 'enriched_orders_orcl'
  16.  );

6 关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与 商品表 products关联,并将关联后的订单信息写入 Elasticsearch 中。

  1. -- Flink SQL
  2. Flink SQL> INSERT INTO enriched_orders_orcl
  3.  SELECT o.*, p.NAME, p.DESCRIPTION
  4.  FROM orders AS o
  5.  LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;

现在,就可以在 Kibana 中看到包含商品信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders_orcl

 然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.

 7 在 Oracle 制造一些变更,观察 ElasticSearch 中的结果

通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句,Elasticsearch中的数据都会实时更新。

  1. --orders表插入一条数据
  2. INSERT INTO orders(ORDER_DATE,CUSTOMER_NAME,PRICE,PRODUCT_ID,ORDER_STATUS) VALUES (to_date('2020-07-30 13:01:00','yyyy-mm-dd,hh24:mi:ss'),'Mic', 29.71, 104, 'false');
  3. --orders表更新一条数据
  4. UPDATE orders SET ORDER_STATUS = 'true' WHERE ORDER_ID = 10004;
  5. --orders表删除一条数据
  6. DELETE FROM orders WHERE ORDER_ID = 10004;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/552196
推荐阅读
相关标签
  

闽ICP备14008679号