赞
踩
Flink SQL是建立在Apache Flink之上的SQL处理引擎,它允许用户以SQL的方式处理流数据和批数据。以下是一些Flink SQL的基础操作:
1.启动flink集群
./start-cluster.sh
./sql-client.sh
CREATE TABLE students (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
) WITH (
'connector' = 'kafka',
'topic' = 'students',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'csv'
);
SELECT id, name, age
FROM students
WHERE age > 18;
CREATE TABLE results (
id STRING,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'results',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'csv'
);
INSERT INTO results
SELECT id, name, age
FROM students
WHERE age > 18;
注意事项
- 在进行Flink SQL操作时,需要确保已经正确配置了Flink环境,并且已经添加了必要的依赖库。
- Flink SQL的语法和功能可能会随着Flink版本的更新而发生变化,因此建议查阅最新的官方文档以获取准确的信息。
1、 从csv中读取数据
CREATE TABLE well_casting_alarm ( _id VARCHAR, comCode VARCHAR, wellCode VARCHAR, uuid VARCHAR, type INT, alarmType INT, alarmGrade INT, zp INT, startAlarmTime TIME, startAlarmValue DECIMAL, threshold INT, warnStatus INT, isDeal INT, createTime TIME, _class VARCHAR ) WITH ( 'connector' = 'filesystem', 'path' = '/wfg/data/sjzz.wellCastingAlarm0606.csv', 'format' = 'csv' );
2、查看所有表
Flink SQL> show tables;
+----------------------+
| table name |
+----------------------+
| employee_information |
| well_casting_alarm |
+----------------------+
2 rows in set
3、删除表
DROP TABLE well_casting_alarm;
4、查询数据
select *from well_casting_alarm limit 1;
5、删除一条数据
DELETE FROM well_casting_alarm where '_id'='_id';
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。