当前位置:   article > 正文

Flink SQL 基础操作

Flink SQL 基础操作

Flink SQL是建立在Apache Flink之上的SQL处理引擎,它允许用户以SQL的方式处理流数据和批数据。以下是一些Flink SQL的基础操作:

一、环境准备

1.启动flink集群

./start-cluster.sh
  • 1
  1. 启动sql-client
./sql-client.sh
  • 1

二、数据源定义

  1. 创建表(Source):
  • 使用CREATE TABLE语句定义输入数据源,包括其schema、存储格式(如CSV、JSON等)以及连接器的配置(如Kafka、FileSystem等)。
  • 示例:
CREATE TABLE students (  
    id STRING,  
    name STRING,  
    age INT,  
    sex STRING,  
    clazz STRING  
) WITH (  
    'connector' = 'kafka',  
    'topic' = 'students',  
    'properties.bootstrap.servers' = 'localhost:9092',  
    'format' = 'csv'  
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

三、数据处理

  1. 编写SQL查询:
  • 使用标准的SQL语句对数据进行查询、过滤、聚合等操作。
  • 示例:
SELECT id, name, age  
FROM students  
WHERE age > 18;
  • 1
  • 2
  • 3

四、数据输出

  1. 创建表(Sink):
  • 使用CREATE TABLE语句定义输出数据源,用于将处理后的数据写入外部系统,如Kafka、数据库等。
  • 示例:
CREATE TABLE results (  
    id STRING,  
    name STRING,  
    age INT  
) WITH (  
    'connector' = 'kafka',  
    'topic' = 'results',  
    'properties.bootstrap.servers' = 'localhost:9092',  
    'format' = 'csv'  
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. 插入数据:
  • 使用INSERT INTO语句将查询结果写入Sink表。
  • 示例:
INSERT INTO results  
SELECT id, name, age  
FROM students  
WHERE age > 18;
  • 1
  • 2
  • 3
  • 4

五、执行与监控

  1. 执行SQL语句:
  • 在Flink SQL客户端或程序中执行SQL语句。
  • 可以通过Flink的Dashboard或其他监控工具来查看作业的执行状态和性能指标。
  1. 结果展示:
  • Flink SQL客户端支持多种结果显示模式,如表格模式、变更日志模式和Tableau模式,可以根据需要设置。

六、其他操作

  1. 动态表:
  • Flink SQL中的表是动态表,支持对流数据的实时查询和处理。
  1. Join操作:
  • Flink SQL支持多种Join方式,包括Regular Joins、Interval Joins、Temporal Joins和Lookup Joins,用于处理表之间的关联查询。
  1. 窗口函数:
  • Flink SQL支持窗口函数,用于对时间序列数据进行分组和聚合操作。

注意事项

  • 在进行Flink SQL操作时,需要确保已经正确配置了Flink环境,并且已经添加了必要的依赖库。
  • Flink SQL的语法和功能可能会随着Flink版本的更新而发生变化,因此建议查阅最新的官方文档以获取准确的信息。

样例操作

1、 从csv中读取数据

CREATE TABLE well_casting_alarm (
    _id VARCHAR,
    comCode VARCHAR,
	wellCode VARCHAR,
	uuid VARCHAR,
	type INT,
	alarmType INT,
	alarmGrade INT,
	zp INT,
	startAlarmTime TIME,
	startAlarmValue DECIMAL,
	threshold INT,
	warnStatus INT,
	isDeal INT,
	createTime TIME,
	_class VARCHAR
) WITH ( 
    'connector' = 'filesystem',
    'path' = '/wfg/data/sjzz.wellCastingAlarm0606.csv',
    'format' = 'csv'
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

2、查看所有表

Flink SQL> show tables;
+----------------------+
|           table name |
+----------------------+
| employee_information |
|   well_casting_alarm |
+----------------------+
2 rows in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3、删除表

DROP TABLE well_casting_alarm;
  • 1

4、查询数据

select *from well_casting_alarm limit 1;
  • 1

5、删除一条数据

DELETE FROM well_casting_alarm where '_id'='_id';
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/977325
推荐阅读
相关标签
  

闽ICP备14008679号