当前位置:   article > 正文

flink从数据库读数据_flink从数据库读取数据

flink从数据库读取数据
  1. package com.example.flink;
  2. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  3. import org.apache.flink.api.common.typeinfo.TypeInformation;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.operators.DataSource;
  6. import org.apache.flink.api.java.typeutils.RowTypeInfo;
  7. import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
  8. /**
  9. * @Author ex-liujiwei
  10. * @Date 2022/3/11 23:20
  11. */
  12. public class Demo01 {
  13. public static void main(String[] args) throws Exception {
  14. //sql查询结果列类型
  15. TypeInformation[] fieldTypes = new TypeInformation[] {
  16. BasicTypeInfo.STRING_TYPE_INFO,
  17. BasicTypeInfo.STRING_TYPE_INFO,
  18. BasicTypeInfo.STRING_TYPE_INFO,
  19. BasicTypeInfo.STRING_TYPE_INFO
  20. };
  21. RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
  22. JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
  23. //数据库连接信息
  24. .setDrivername("com.mysql.jdbc.Driver")
  25. .setDBUrl("jdbc:mysql://10.20.16.15:5102/bsmp?useSSL=false&useUnicode=true&characterEncoding=UTF-8")
  26. .setUsername("bsmpopr")
  27. .setPassword("Jcfgdasdf4#")
  28. .setQuery("SELECT user_id,name,phone_num,email FROM sys_user_info")
  29. .setRowTypeInfo(rowTypeInfo)
  30. .finish();
  31. //搭建flink
  32. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  33. //获取数据源
  34. //datasource
  35. DataSource s = env.createInput(jdbcInputFormat);
  36. s.print();
  37. env.execute();
  38. }
  39. }

pom

  1. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>1.10.0</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-streaming-java_2.11</artifactId>
  11. <version>1.10.0</version>
  12. <!-- <scope>provided</scope> -->
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-jdbc_2.11</artifactId>
  17. <version>1.8.0</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.flink</groupId>
  21. <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
  22. <version>1.9.0</version>
  23. </dependency>

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/518804
推荐阅读
相关标签
  

闽ICP备14008679号