赞
踩
目录
5 在 Flink SQL CLI 中使用 Flink DDL 创建表
6 关联订单数据并且将其写入 Elasticsearch 中
7 在 Oracle 制造一些变更,观察 ElasticSearch 中的结果
Flink:1.13.5
Oralce:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0
Elasticsearch:7.6.0
Kibana:7.6.0
flink-sql-connector-oracle-cdc-2.1.1.jar
flink-sql-connector-elasticsearch7_2.11-1.13.2.jar
Oracle: Oracle 11g, 已经预先创建了 products
和 orders
表,并插入了一些数据。
Elasticsearch: orders
表将和 products
表进行join,join的结果写入Elasticsearch中。
Kibana: 可视化 Elasticsearch 中的数据。
下载Flink
下载flink 1.13.5 并将其解压至目录flink-1.13.5
下载依赖包
下载以下两个jar包到<FLINK_HOME>/lib/目录下:
flink-sql-connector-oracle-cdc-2.1.1.jar
flink-sql-connector-elasticsearch7_2.11-1.13.2.jar
- --创建表products
- CREATE TABLE products (
- id NUMBER(10) NOT NULL constraint pk_id PRIMARY KEY,
- name VARCHAR(255) NOT NULL,
- description VARCHAR(512)
- );
-
- --创建序列seq_products_id
- create sequence seq_products_id
- minvalue 101
- nomaxvalue
- start with 101
- increment by 1
- nocycle;
-
- --创建触发器tr_products,如果insert语句不指定ID自动插入增长值
- CREATE OR REPLACE TRIGGER tr_products
- BEFORE INSERT ON products FOR EACH ROW WHEN (new.id is null)
- begin
- select seq_products_id.nextval into:new.id from dual;
- end;
-
- --products表插入数据
- INSERT INTO products(NAME,DESCRIPTION) VALUES ('scooter','Small 2-wheel scooter');
- INSERT INTO products(NAME,DESCRIPTION) VALUES ('car battery','12V car battery');
- INSERT INTO products(NAME,DESCRIPTION) VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3');
- INSERT INTO products(NAME,DESCRIPTION) VALUES ('hammer','12oz carpenters hammer');
- INSERT INTO products(NAME,DESCRIPTION) VALUES ('hammer','14oz carpenters hammer');
- INSERT INTO products(NAME,DESCRIPTION) VALUES ('hammer','16oz carpenters hammer');
- INSERT INTO products(NAME,DESCRIPTION) VALUES ('rocks','box of assorted rocks');
- INSERT INTO products(NAME,DESCRIPTION) VALUES ('jacket','water resistent black wind breaker');
- INSERT INTO products(NAME,DESCRIPTION) VALUES ('spare tire','24 inch spare tire');
-
- --创建表orders
- CREATE TABLE orders (
- order_id NUMBER(10) NOT NULL constraint pk_orders_id PRIMARY KEY,
- order_date DATE NOT NULL,
- customer_name VARCHAR(255) NOT NULL,
- price NUMBER(10, 5) NOT NULL,
- product_id NUMBER(10) NOT NULL,
- order_status VARCHAR(10) NOT NULL
- );
-
- --创建序列seq_orders_id
- create sequence seq_orders_id
- minvalue 10001
- nomaxvalue
- start with 10001
- increment by 1
- nocycle;
-
- --创建触发器tr_orders,如果insert语句不指定ID自动插入增长值
- CREATE OR REPLACE TRIGGER tr_orders
- BEFORE INSERT ON orders FOR EACH ROW WHEN (new.order_id is null)
- begin
- select seq_orders_id.nextval into:new.order_id from dual;
- end;
-
- --orders表插入数据
- INSERT INTO orders(ORDER_DATE,CUSTOMER_NAME,PRICE,PRODUCT_ID,ORDER_STATUS) VALUES (to_date('2020-07-30 10:08:22','yyyy-mm-dd,hh24:mi:ss'), 'Jark', 50.50, 102, 'false');
- INSERT INTO orders(ORDER_DATE,CUSTOMER_NAME,PRICE,PRODUCT_ID,ORDER_STATUS) VALUES (to_date('2020-07-30 10:11:09','yyyy-mm-dd,hh24:mi:ss'), 'Sally', 15.00, 105, 'false');
- INSERT INTO orders(ORDER_DATE,CUSTOMER_NAME,PRICE,PRODUCT_ID,ORDER_STATUS) VALUES (to_date('2020-07-30 12:00:30','yyyy-mm-dd,hh24:mi:ss'), 'Edward', 25.25, 106, 'false');
使用下面的命令跳转至 Flink 目录下
cd flink-1.13.5
使用下面的命令启动 Flink 集群
./bin/start-cluster.sh
启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
3. 使用下面的命令启动 Flink SQL CLI
./bin/sql-client.sh
启动成功后,可以看到如下的页面:
1. 开启 checkpoint,每隔3秒做一次 checkpoint
-- Flink SQL Flink SQL> SET execution.checkpointing.interval = 3s;
2. 对于数据库中的表 products
, orders
, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据
- -- Flink SQL
- Flink SQL> CREATE TABLE products (
- ID INT,
- NAME STRING,
- DESCRIPTION STRING,
- PRIMARY KEY (ID) NOT ENFORCED
- ) WITH (
- 'connector' = 'oracle-cdc',
- 'hostname' = '192.168.50.3',
- 'port' = '1521',
- 'username' = 'flinkuser',
- 'password' = 'gtis',
- 'database-name' = 'ORCL',
- 'schema-name' = 'flinkuser',
- 'table-name' = 'products',
- 'debezium.database.tablename.case.insensitive'='false',
- 'debezium.log.mining.strategy'='online_catalog',
- 'debezium.log.mining.continuous.mine'='true'
- );
-
-
- Flink SQL>CREATE TABLE orders (
- ORDER_ID INT,
- ORDER_DATE TIMESTAMP(3),
- CUSTOMER_NAME STRING,
- PRICE DECIMAL(10, 5),
- PRODUCT_ID INT,
- ORDER_STATUS BOOLEAN
- ) WITH (
- 'connector' = 'oracle-cdc',
- 'hostname' = '192.168.50.3',
- 'port' = '1521',
- 'username' = 'flinkuser',
- 'password' = 'gtis',
- 'database-name' = 'ORCL',
- 'schema-name' = 'flinkuser',
- 'table-name' = 'orders',
- 'debezium.database.tablename.case.insensitive'='false',
- 'debezium.log.mining.strategy'='online_catalog',
- 'debezium.log.mining.continuous.mine'='true'
- );
-
3. 创建 enriched_orders_orcl
表, 用来将关联后的订单数据写入 Elasticsearch 中
- -- Flink SQL
- Flink SQL>CREATE TABLE enriched_orders_orcl (
- ORDER_ID INT,
- ORDER_DATE TIMESTAMP(3),
- CUSTOMER_NAME STRING,
- PRICE DECIMAL(10, 5),
- PRODUCT_ID INT,
- ORDER_STATUS BOOLEAN,
- PRODUCT_NAME STRING,
- PRODUCT_DESCRIPTION STRING,
- PRIMARY KEY (ORDER_ID) NOT ENFORCED
- ) WITH (
- 'connector' = 'elasticsearch-7',
- 'hosts' = 'http://192.168.50.3:9200',
- 'index' = 'enriched_orders_orcl'
- );
使用 Flink SQL 将订单表 order
与 商品表 products
关联,并将关联后的订单信息写入 Elasticsearch 中。
- -- Flink SQL
- Flink SQL> INSERT INTO enriched_orders_orcl
- SELECT o.*, p.NAME, p.DESCRIPTION
- FROM orders AS o
- LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;
现在,就可以在 Kibana 中看到包含商品信息的订单数据。
首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders_orcl
然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.
通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句,Elasticsearch中的数据都会实时更新。
- --orders表插入一条数据
- INSERT INTO orders(ORDER_DATE,CUSTOMER_NAME,PRICE,PRODUCT_ID,ORDER_STATUS) VALUES (to_date('2020-07-30 13:01:00','yyyy-mm-dd,hh24:mi:ss'),'Mic', 29.71, 104, 'false');
- --orders表更新一条数据
- UPDATE orders SET ORDER_STATUS = 'true' WHERE ORDER_ID = 10004;
- --orders表删除一条数据
- DELETE FROM orders WHERE ORDER_ID = 10004;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。