当前位置:   article > 正文

FlinkSql-LookUpJoin_flinksql join后挂掉

flinksql join后挂掉
  1. public class LookUpJoinTest {
  2. public static void main(String[] args) {
  3. //获取执行环境
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. env.setParallelism(1);
  6. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  7. //读取端口数据创建流并转换为动态表
  8. DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop102", 8888);
  9. SingleOutputStreamOperator<WaterSensor> waterSensorDS = socketTextStream.map(line -> {
  10. String[] split = line.split(",");
  11. return new WaterSensor(split[0],
  12. Double.parseDouble(split[1]),
  13. Long.parseLong(split[2]));
  14. });
  15. Table table = tableEnv.fromDataStream(waterSensorDS,
  16. $("id"),
  17. $("vc"),
  18. $("ts"),
  19. $("pt").proctime());
  20. tableEnv.createTemporaryView("t1", table);
  21. //创建LookUp表
  22. tableEnv.executeSql("" +
  23. "CREATE TEMPORARY TABLE my_base_dic ( " +
  24. " id STRING, " +
  25. " dic_name STRING " +
  26. ") WITH ( " +
  27. " 'connector' = 'jdbc', " +
  28. " 'url' = 'jdbc:mysql://hadoop102:3306/gmall-211027-flink', " +
  29. " 'username' = 'root', " +
  30. " 'password' = '000000', " +
  31. " 'lookup.cache.max-rows' = '10', " +
  32. " 'lookup.cache.ttl' = '1 hour', " +
  33. " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
  34. " 'table-name' = 'base_dic' " +
  35. ")");
  36. //关联并打印
  37. tableEnv.sqlQuery("" +
  38. "select " +
  39. " t1.id, " +
  40. " t1.vc, " +
  41. " t2.dic_name " +
  42. "from t1 " +
  43. "join my_base_dic FOR SYSTEM_TIME AS OF t1.pt as t2 " +
  44. "on t1.id = t2.id")
  45. .execute()
  46. .print();
  47. }
  48. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/520109
推荐阅读
相关标签
  

闽ICP备14008679号