当前位置:   article > 正文

java Flink(四十)FlinkSql 简单Demo以及具体使用_java flinksql

java flinksql

之前FlinkSql用的比较少,今天开始简单介绍一下。

首先导入依赖:

<!--        引入flink table相关依赖-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

然后我们的数据源是读取文件:

 POJO类

  1. package beans;
  2. //传感器温度读数的数据类型
  3. public class SensorReading {
  4. //属性 id,时间戳,温度值
  5. private String id;
  6. private Long timestamp;
  7. private Double temperature;
  8. public SensorReading() {
  9. }
  10. public SensorReading(String id, Long timestamp, Double temperature) {
  11. this.id = id;
  12. this.timestamp = timestamp;
  13. this.temperature = temperature;
  14. }
  15. public String getId() {
  16. return id;
  17. }
  18. public void setId(String id) {
  19. this.id = id;
  20. }
  21. public Long getTimestamp() {
  22. return timestamp;
  23. }
  24. public void setTimestamp(Long timestamp) {
  25. this.timestamp = timestamp;
  26. }
  27. public Double getTemperature() {
  28. return temperature;
  29. }
  30. public void setTemperature(Double temperature) {
  31. this.temperature = temperature;
  32. }
  33. @Override
  34. public String toString() {
  35. return "SensorReading{" +
  36. "id='" + id + '\'' +
  37. ", timestamp=" + timestamp +
  38. ", temperature=" + temperature +
  39. '}';
  40. }
  41. }

具体使用DEMO:

  1. package table;
  2. import beans.SensorReading;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.table.api.Table;
  7. import org.apache.flink.table.api.java.StreamTableEnvironment;
  8. import org.apache.flink.types.Row;
  9. public class Table1 {
  10. public static void main(String[] args) throws Exception{
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. env.setParallelism(1);
  13. DataStreamSource<String> inputStream = env.readTextFile("D:\\idle\\FlinkTest\\src\\main\\resources\\sensor.txt");
  14. DataStream<SensorReading> dataStream = inputStream.map(line -> {
  15. String[] fields = line.split(",");
  16. return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
  17. });
  18. //创建表的执行环境
  19. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  20. //创建表
  21. Table dataTable = tableEnv.fromDataStream(dataStream);
  22. //调用table API进行转换操作
  23. Table resultTable = dataTable.select("id, temperature")
  24. .where("id = 'sensor_1'");
  25. //执行sql
  26. //dataTable注册进环境
  27. tableEnv.createTemporaryView("sensor",dataTable);
  28. String sql = "select id, temperature from sensor where id='sensor_1'";
  29. Table resultSqlTable = tableEnv.sqlQuery(sql);
  30. //打印结果
  31. tableEnv.toAppendStream(resultTable, Row.class).print("result");
  32. tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
  33. env.execute();
  34. }
  35. }

查看结果

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号