当前位置:   article > 正文

使用Spark清洗统计业务数据并保存到数据库中_spark在spring boot数据清洗

spark在spring boot数据清洗

1、打开前面创建的项目“BigData-Etl-KongGuan”,创建一些数据库访问的工具类和辅助类。

1)打开SpringBoot项目:BigData-Etl-KongGuan

2)创建数据库访问的工具类和辅助类:

com.qrsoft.etl.dao.IBaseDao数据库访问的通用类,包括创建连接、执行更新等通用操作
com.qrsoft.etl.common.db.ConnectionPoolManager连接管理类
com.qrsoft.etl.common.db.IConnectionPool连接池管理类接口
com.qrsoft.etl.common.db.ConnectionPool连接池管理类接口实现类
com.qrsoft.etl.common.db.DBbean这是外部可以配置的连接池属性 可以允许外部配置,拥有默认值
com.qrsoft.etl.common.db.DBInitInfo初始化,模拟加载所有的配置文件
com.qrsoft.etl.common.Constants全局常量类
com.qrsoft.etl.util.ConfigUtil加载配置文件的工具类
com.qrsoft.etl.util.ConfigManager配置文件管理的工具类
com.qrsoft.etl.util.MapManager地图管理的工具类,例如:是否在矩形区域内
  • 创建com.qrsoft.etl.util.ConfigUtil类,该类是一个通用工具类,用于加载myconfig.properties配置文件,并提供了一个根据键来读取配置文件中属性值的方法readProperty(key)。
  1. package com.qrsoft.etl.util;
  2. import org.springframework.core.io.support.PropertiesLoaderUtils;
  3. import java.io.IOException;
  4. import java.util.Properties;
  5. public class ConfigUtil {
  6. public static String readProperty(String key){
  7. Properties properties = new Properties();
  8. try {
  9. properties = PropertiesLoaderUtils.loadAllProperties("myconfig.properties");
  10. } catch (IOException e) {
  11. e.printStackTrace();
  12. }
  13. return properties.get(key).toString();
  14. }
  15. }
  • 创建com.qrsoft.etl.util.ConfigManager类,该类是一个通用工具类,用于配置文件管理,在该类的构造函数中加载config.properties配置文件,并提供了一个根据键来读取配置文件中属性值的方法getValue(key)。
  1. package com.qrsoft.etl.util;
  2. import java.io.IOException;
  3. import java.io.InputStream;
  4. import java.util.Properties;
  5. public class ConfigManager {
  6. private static ConfigManager configManager; // 声明工具类的一个私有的对象
  7. private static Properties properties; //声明对象
  8. private ConfigManager() { //私有无参构造方法
  9. String configFile = "config.properties";
  10. properties = new Properties();
  11. InputStream in = ConfigManager.class.getClassLoader().getResourceAsStream(configFile);
  12. try {
  13. properties.load(in);
  14. in.close();
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. public static ConfigManager getInstance() {
  20. if (configManager == null) {
  21. configManager = new ConfigManager();
  22. }
  23. return configManager;
  24. }
  25. public String getValue(String key) {
  26. return properties.getProperty(key);
  27. }
  28. }
  • 创建com.qrsoft.etl.util.MapManager类,该类是地图管理的工具类,提供了地图操作的相关工具方法,例如:地图上某个经纬度的点是否在矩形区域内、判断是否在经纬度范围内等。
  1. package com.qrsoft.etl.util;
  2. public class MapManager {
  3. /**
  4. * 是否在矩形区域内
  5. * @Title: isInArea
  6. * @Description: TODO()
  7. * @param lat 测试点经度
  8. * @param lng 测试点纬度
  9. * @param minLat 纬度范围限制1
  10. * @param maxLat 纬度范围限制2
  11. * @param minLng 经度限制范围1
  12. * @param maxLng 经度范围限制2
  13. * @return boolean
  14. */
  15. public boolean isInRectangleArea(double lat,double lng,double minLat,
  16. double maxLat,double minLng,double maxLng){
  17. if(this.isInRange(lat, minLat, maxLat)){//如果在纬度的范围内
  18. if(minLng*maxLng>0){
  19. if(this.isInRange(lng, minLng, maxLng)){
  20. return true;
  21. }else {
  22. return false;
  23. }
  24. }else {
  25. if(Math.abs(minLng)+Math.abs(maxLng)<180){
  26. if(this.isInRange(lng, minLng, maxLng)){
  27. return true;
  28. }else {
  29. return false;
  30. }
  31. }else{
  32. double left = Math.max(minLng, maxLng);
  33. double right = Math.min(minLng, maxLng);
  34. if(this.isInRange(lng, left, 180)||this.isInRange(lng, right,-180)){
  35. return true;
  36. }else {
  37. return false;
  38. }
  39. }
  40. }
  41. }else{
  42. return false;
  43. }
  44. }
  45. /**
  46. * 判断是否在经纬度范围内
  47. */
  48. public boolean isInRange(double point, double left,double right){
  49. if(point>=Math.min(left, right)&&point<=Math.max(left, right)){
  50. return true;
  51. }else {
  52. return false;
  53. }
  54. }
  55. }
  • 创建com.qrsoft.etl.common.db.DBbean类,该类中定义了外部可以配置的连接池属性 可以允许外部配置,拥有默认值。
  1. package com.qrsoft.etl.common.db;
  2. public class DBbean {
  3. // 连接池属性
  4. private String driverName;
  5. private String url;
  6. private String userName;
  7. private String password;
  8. // 连接池名字
  9. private String poolName;
  10. private int minConnections = 10; // 空闲池,最小连接数
  11. private int maxConnections = 300; // 空闲池,最大连接数
  12. private int initConnections = 20;// 初始化连接数
  13. private long connTimeOut = 1000;// 重复获得连接的频率
  14. private int maxActiveConnections = 500;// 最大允许的连接数,和数据库对应
  15. private long connectionTimeOut = 0;// 连接超时时间,默认20分钟
  16. private boolean isCurrentConnection = true; // 是否获得当前连接,默认true
  17. private boolean isCheakPool = true; // 是否定时检查连接池
  18. private long lazyCheck = 1000 * 60 * 60;// 延迟多少时间后开始 检查
  19. private long periodCheck = 1000 * 60 * 60;// 检查频率
  20. public DBbean(String driverName, String url, String userName,String password, String poolName) {
  21. super();
  22. this.driverName = driverName;
  23. this.url = url;
  24. this.userName = userName;
  25. this.password = password;
  26. this.poolName = poolName;
  27. }
  28. public DBbean() {
  29. }
  30. public String getDriverName() {
  31. if (driverName == null) {
  32. driverName = this.getDriverName() + "_" + this.getUrl();
  33. }
  34. return driverName;
  35. }
  36. public void setDriverName(String driverName) {
  37. this.driverName = driverName;
  38. }
  39. public String getUrl() {
  40. return url;
  41. }
  42. public void setUrl(String url) {
  43. this.url = url;
  44. }
  45. public String getUserName() {
  46. return userName;
  47. }
  48. public void setUserName(String userName) {
  49. this.userName = userName;
  50. }
  51. public String getPassword() {
  52. return password;
  53. }
  54. public void setPassword(String password) {
  55. this.password = password;
  56. }
  57. public String getPoolName() {
  58. return poolName;
  59. }
  60. public void setPoolName(String poolName) {
  61. this.poolName = poolName;
  62. }
  63. public int getMinConnections() {
  64. return minConnections;
  65. }
  66. public void setMinConnections(int minConnections) {
  67. this.minConnections = minConnections;
  68. }
  69. public int getMaxConnections() {
  70. return maxConnections;
  71. }
  72. public void setMaxConnections(int maxConnections) {
  73. this.maxConnections = maxConnections;
  74. }
  75. public int getInitConnections() {
  76. return initConnections;
  77. }
  78. public void setInitConnections(int initConnections) {
  79. this.initConnections = initConnections;
  80. }
  81. public int getMaxActiveConnections() {
  82. return maxActiveConnections;
  83. }
  84. public void setMaxActiveConnections(int maxActiveConnections) {
  85. this.maxActiveConnections = maxActiveConnections;
  86. }
  87. public long getConnTimeOut() {
  88. return connTimeOut;
  89. }
  90. public void setConnTimeOut(long connTimeOut) {
  91. this.connTimeOut = connTimeOut;
  92. }
  93. public long getConnectionTimeOut() {
  94. return connectionTimeOut;
  95. }
  96. public void setConnectionTimeOut(long connectionTimeOut) {
  97. this.connectionTimeOut = connectionTimeOut;
  98. }
  99. public boolean isCurrentConnection() {
  100. return isCurrentConnection;
  101. }
  102. public void setCurrentConnection(boolean isCurrentConnection) {
  103. this.isCurrentConnection = isCurrentConnection;
  104. }
  105. public long getLazyCheck() {
  106. return lazyCheck;
  107. }
  108. public void setLazyCheck(long lazyCheck) {
  109. this.lazyCheck = lazyCheck;
  110. }
  111. public long getPeriodCheck() {
  112. return periodCheck;
  113. }
  114. public void setPeriodCheck(long periodCheck) {
  115. this.periodCheck = periodCheck;
  116. }
  117. public boolean isCheakPool() {
  118. return isCheakPool;
  119. }
  120. public void setCheakPool(boolean isCheakPool) {
  121. this.isCheakPool = isCheakPool;
  122. }
  123. }
  • 创建com.qrsoft.etl.common.db.DBInitInfo类,该类主要用于加载配置文件中所有的有关数据库的配置信息,初始化数据库连接对象。
  1. package com.qrsoft.etl.common.db;
  2. import com.qrsoft.etl.util.ConfigManager;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. public class DBInitInfo {
  6. // 设置注册属性
  7. public static String DRIVER = ConfigManager.getInstance().getValue("jdbc.driver");
  8. public static String URL = ConfigManager.getInstance().getValue("jdbc.url");
  9. public static String USERNAME = ConfigManager.getInstance().getValue("jdbc.username");
  10. public static String PASSWORD = ConfigManager.getInstance().getValue("jdbc.password");
  11. public static String MinConnections = ConfigManager.getInstance().getValue("jdbc.min");
  12. public static String MaxConnections = ConfigManager.getInstance().getValue("jdbc.max");
  13. public static List<DBbean> beans = null;
  14. static {
  15. beans = new ArrayList<DBbean>();
  16. // 这里数据 可以从xml 等配置文件进行获取,为了测试,这里直接写死了
  17. DBbean beanMysql = new DBbean();
  18. beanMysql.setDriverName(DRIVER);
  19. beanMysql.setUrl(URL);
  20. beanMysql.setUserName(USERNAME);
  21. beanMysql.setPassword(PASSWORD);
  22. beanMysql.setMinConnections(Integer.parseInt(MinConnections));
  23. beanMysql.setMaxConnections(Integer.parseInt(MaxConnections));
  24. beanMysql.setPoolName("pool");
  25. beans.add(beanMysql);
  26. }
  27. }
  • 创建com.qrsoft.etl.common.db.IConnectionPool接口,该接口中定义了连接池操作类的接口方法,如:获得连接、回收连接、销毁清空、查看连接池活动状态、定时检查连接池等接口方法。
  1. package com.qrsoft.etl.common.db;
  2. import java.sql.Connection;
  3. import java.sql.SQLException;
  4. public interface IConnectionPool {
  5. // 获得连接
  6. public Connection getConnection();
  7. // 获得当前连接
  8. public Connection getCurrentConnecton();
  9. // 回收连接
  10. public void releaseConn(Connection conn) throws SQLException;
  11. // 销毁清空
  12. public void destroy();
  13. // 连接池是活动状态
  14. public boolean isActive();
  15. // 定时器,检查连接池
  16. public void cheackPool();
  17. }
  • 创建com.qrsoft.etl.common.db.ConnectionPool类,该类实现了IConnectionPool接口,并实现了接口中定义的方法。
  1. package com.qrsoft.etl.common.db;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.SQLException;
  5. import java.util.List;
  6. import java.util.Timer;
  7. import java.util.TimerTask;
  8. import java.util.Vector;
  9. public class ConnectionPool implements IConnectionPool {
  10. // 连接池配置属性
  11. private DBbean dbBean;
  12. private boolean isActive = false; // 连接池活动状态
  13. private int contActive = 0;// 记录创建的总的连接数
  14. // 空闲连接
  15. private List<Connection> freeConnection = new Vector<Connection>();
  16. // 活动连接
  17. private List<Connection> activeConnection = new Vector<Connection>();
  18. // 将线程和连接绑定,保证事务能统一执行
  19. private static ThreadLocal<Connection> threadLocal = new ThreadLocal<Connection>();
  20. public ConnectionPool(DBbean dbBean) {
  21. super();
  22. this.dbBean = dbBean;
  23. init();
  24. cheackPool();
  25. }
  26. // 初始化
  27. public void init() {
  28. try {
  29. Class.forName(dbBean.getDriverName());
  30. for (int i = 0; i < dbBean.getInitConnections(); i++) {
  31. Connection conn;
  32. conn = newConnection();
  33. // 初始化最小连接数
  34. if (conn != null) {
  35. freeConnection.add(conn);
  36. contActive++;
  37. }
  38. }
  39. isActive = true;
  40. } catch (ClassNotFoundException e) {
  41. e.printStackTrace();
  42. } catch (SQLException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. // 获得当前连接
  47. public Connection getCurrentConnecton() {
  48. // 默认线程里面取
  49. Connection conn = threadLocal.get();
  50. if (!isValid(conn)) {
  51. conn = getConnection();
  52. }
  53. return conn;
  54. }
  55. // 获得连接
  56. public synchronized Connection getConnection() {
  57. Connection conn = null;
  58. try {
  59. // 判断是否超过最大连接数限制
  60. if (contActive < this.dbBean.getMaxActiveConnections()) {
  61. if (freeConnection.size() > 0) {
  62. conn = freeConnection.get(0);
  63. if (conn != null) {
  64. threadLocal.set(conn);
  65. }
  66. freeConnection.remove(0);
  67. } else {
  68. conn = newConnection();
  69. }
  70. } else {
  71. // 继续获得连接,直到从新获得连接
  72. wait(this.dbBean.getConnTimeOut());
  73. conn = getConnection();
  74. }
  75. if (isValid(conn)) {
  76. activeConnection.add(conn);
  77. contActive++;
  78. }
  79. } catch (SQLException e) {
  80. e.printStackTrace();
  81. } catch (ClassNotFoundException e) {
  82. e.printStackTrace();
  83. } catch (InterruptedException e) {
  84. e.printStackTrace();
  85. }
  86. return conn;
  87. }
  88. // 获得新连接
  89. private synchronized Connection newConnection() throws ClassNotFoundException, SQLException {
  90. Connection conn = null;
  91. if (dbBean != null) {
  92. Class.forName(dbBean.getDriverName());
  93. conn = DriverManager.getConnection(dbBean.getUrl(), dbBean.getUserName(), dbBean.getPassword());
  94. }
  95. return conn;
  96. }
  97. // 释放连接
  98. public synchronized void releaseConn(Connection conn) throws SQLException {
  99. if (isValid(conn)&& !(freeConnection.size() > dbBean.getMaxConnections())) {
  100. freeConnection.add(conn);
  101. activeConnection.remove(conn);
  102. contActive--;
  103. threadLocal.remove();
  104. // 唤醒所有正待等待的线程,去抢连接
  105. notifyAll();
  106. }
  107. }
  108. // 判断连接是否可用
  109. private boolean isValid(Connection conn) {
  110. try {
  111. if (conn == null || conn.isClosed()) {
  112. return false;
  113. }
  114. } catch (SQLException e) {
  115. e.printStackTrace();
  116. }
  117. return true;
  118. }
  119. // 销毁连接池
  120. public synchronized void destroy() {
  121. for (Connection conn : freeConnection) {
  122. try {
  123. if (isValid(conn)) {
  124. conn.close();
  125. }
  126. } catch (SQLException e) {
  127. e.printStackTrace();
  128. }
  129. }
  130. for (Connection conn : activeConnection) {
  131. try {
  132. if (isValid(conn)) {
  133. conn.close();
  134. }
  135. } catch (SQLException e) {
  136. e.printStackTrace();
  137. }
  138. }
  139. isActive = false;
  140. contActive = 0;
  141. }
  142. // 连接池状态
  143. public boolean isActive() {
  144. return isActive;
  145. }
  146. // 定时检查连接池情况
  147. public void cheackPool() {
  148. if (dbBean.isCheakPool()) {
  149. new Timer().schedule(new TimerTask() {
  150. @Override
  151. public void run() {
  152. // 1.对线程里面的连接状态
  153. // 2.连接池最小 最大连接数
  154. // 3.其他状态进行检查,因为这里还需要写几个线程管理的类,暂时就不添加了
  155. System.out.println("空线池连接数:" + freeConnection.size());
  156. System.out.println("活动连接数::" + activeConnection.size());
  157. System.out.println("总的连接数:" + contActive);
  158. }
  159. }, dbBean.getLazyCheck(), dbBean.getPeriodCheck());
  160. }
  161. }
  162. }
  • 创建com.qrsoft.etl.common.db.ConnectionPoolManager类,该类为数据库的连接管理类,是一个通用的固定写法,通过连接池管理数据库连接,提供了获取单例模式连接的实现,还提供了根据连接池名字获得连接的方法,以及关闭连接、清空连接池等方法。
  1. package com.qrsoft.etl.common.db;
  2. import java.sql.Connection;
  3. import java.sql.SQLException;
  4. import java.util.Hashtable;
  5. /**
  6. * 连接管理类
  7. */
  8. public class ConnectionPoolManager {
  9. // 连接池存放
  10. public Hashtable<String, IConnectionPool> pools = new Hashtable<String, IConnectionPool>();
  11. // 初始化
  12. private ConnectionPoolManager() {
  13. init();
  14. }
  15. // 单例实现
  16. public static ConnectionPoolManager getInstance() {
  17. return Singtonle.instance;
  18. }
  19. private static class Singtonle {
  20. private static ConnectionPoolManager instance = new ConnectionPoolManager();
  21. }
  22. // 初始化所有的连接池
  23. public void init() {
  24. for (int i = 0; i < DBInitInfo.beans.size(); i++) {
  25. DBbean bean = DBInitInfo.beans.get(i);
  26. ConnectionPool pool = new ConnectionPool(bean);
  27. if (pool != null) {
  28. pools.put(bean.getPoolName(), pool);
  29. System.out.println("Info:Init connection successed ->"
  30. + bean.getPoolName());
  31. }
  32. }
  33. }
  34. // 获得连接,根据连接池名字 获得连接
  35. public Connection getConnection(String poolName) {
  36. Connection conn = null;
  37. if (pools.size() > 0 && pools.containsKey(poolName)) {
  38. conn = getPool(poolName).getConnection();
  39. try {
  40. conn.setAutoCommit(false);
  41. } catch (SQLException e) {
  42. e.printStackTrace();
  43. }
  44. } else {
  45. System.out.println(
  46. "Error:Can't find this connecion pool ->" + poolName);
  47. }
  48. return conn;
  49. }
  50. // 关闭,回收连接
  51. public void close(String poolName, Connection conn) {
  52. IConnectionPool pool = getPool(poolName);
  53. try {
  54. if (pool != null) {
  55. pool.releaseConn(conn);
  56. }
  57. } catch (SQLException e) {
  58. System.out.println("连接池已经销毁");
  59. e.printStackTrace();
  60. }
  61. }
  62. // 清空连接池
  63. public void destroy(String poolName) {
  64. IConnectionPool pool = getPool(poolName);
  65. if (pool != null) {
  66. pool.destroy();
  67. }
  68. }
  69. // 获得连接池
  70. public IConnectionPool getPool(String poolName) {
  71. IConnectionPool pool = null;
  72. if (pools.size() > 0) {
  73. pool = pools.get(poolName);
  74. }
  75. return pool;
  76. }
  77. }
  • 创建com.qrsoft.etl.dao.IBaseDao类,该类主要用于数据库的通用操作,如创建连接、执行更新等操作。
  1. package com.qrsoft.etl.dao;
  2. import com.qrsoft.etl.common.db.ConnectionPoolManager;
  3. import com.qrsoft.etl.common.db.IConnectionPool;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import java.sql.*;
  7. import java.util.ArrayList;
  8. import java.util.HashMap;
  9. import java.util.List;
  10. import java.util.Map;
  11. public class IBaseDao {
  12. private final static Logger logger = LoggerFactory.getLogger(IBaseDao.class);
  13. public ResultSet rs = null;
  14. private IConnectionPool pool= ConnectionPoolManager.getInstance().getPool("pool");
  15. private Connection conn = getConnection();
  16. // 定义sql语句的执行对象
  17. private PreparedStatement pstmt;
  18. public Connection getConnection(){
  19. Connection conn = null;
  20. if(pool != null && pool.isActive()){
  21. conn = pool.getConnection();
  22. }
  23. return conn;
  24. }
  25. public Connection getCurrentConnection(){
  26. Connection conn = null;
  27. if(pool != null && pool.isActive()){
  28. conn = pool.getCurrentConnecton();
  29. }
  30. return conn;
  31. }
  32. /**
  33. * 查询
  34. */
  35. public ResultSet execute(String sql, Object params[]) {
  36. try {
  37. conn.setAutoCommit(false);
  38. pstmt = conn.prepareStatement(sql);
  39. for (int i = 0; i < params.length; i++) {
  40. pstmt.setObject(i + 1, params[i]);
  41. }
  42. rs = pstmt.executeQuery();
  43. pool.releaseConn(conn);
  44. } catch (SQLException e) {
  45. e.printStackTrace();
  46. logger.info("查询失败!", e.getMessage());
  47. }
  48. return rs;
  49. }
  50. /**
  51. * 更新
  52. */
  53. public boolean update(String sql, Object params[]) throws SQLException {
  54. boolean flag = false;
  55. try {
  56. conn.setAutoCommit(false);
  57. pstmt = conn.prepareStatement(sql);
  58. for (int i = 0; i < params.length; i++) {
  59. pstmt.setObject(i + 1, params[i]);
  60. }
  61. int i = pstmt.executeUpdate();
  62. if (i > 0){
  63. flag = true;
  64. }else{
  65. flag = false;
  66. }
  67. pool.releaseConn(conn);
  68. conn.commit();
  69. } catch (SQLException e) {
  70. conn.rollback();
  71. e.printStackTrace();
  72. logger.info("更新失败!", e.getMessage());
  73. }
  74. return flag;
  75. }
  76. /**
  77. * 更新一个
  78. */
  79. public boolean updateOne(String sql) throws SQLException {
  80. boolean flag = false;
  81. try {
  82. pstmt = conn.prepareStatement(sql);
  83. int i = pstmt.executeUpdate();
  84. if (i > 0){
  85. flag = true;
  86. } else{
  87. flag = false;
  88. }
  89. pool.releaseConn(conn);
  90. conn.commit();
  91. } catch (SQLException e) {
  92. conn.rollback();
  93. e.printStackTrace();
  94. logger.info("更新失败!", e.getMessage());
  95. }
  96. return flag;
  97. }
  98. /**
  99. * 执行查询操作
  100. */
  101. public List<Map<String, Object>> findResult(String sql, List<?> params) throws SQLException {
  102. List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
  103. int index = 1;
  104. pstmt = conn.prepareStatement(sql);
  105. if (params != null && !params.isEmpty()) {
  106. for (int i = 0; i < params.size(); i++) {
  107. pstmt.setObject(index++, params.get(i));
  108. }
  109. }
  110. rs = pstmt.executeQuery();
  111. ResultSetMetaData metaData = rs.getMetaData();
  112. int cols_len = metaData.getColumnCount();
  113. while (rs.next()) {
  114. Map<String, Object> map = new HashMap<String, Object>();
  115. for (int i = 0; i < cols_len; i++) {
  116. String cols_name = metaData.getColumnName(i + 1);
  117. Object cols_value = rs.getObject(cols_name);
  118. if (cols_value == null) {
  119. cols_value = "";
  120. }
  121. map.put(cols_name, cols_value);
  122. }
  123. list.add(map);
  124. }
  125. return list;
  126. }
  127. public Boolean getBool(ResultSet airRs){
  128. boolean bool = false;
  129. Integer no = 0;
  130. try {
  131. while (airRs.next()) {
  132. no = airRs.getInt(1);
  133. if (no>0) {
  134. bool = true;
  135. }
  136. }
  137. } catch (SQLException e) {
  138. bool = false;
  139. e.printStackTrace();
  140. }
  141. return bool;
  142. }
  143. }
  • 创建com.qrsoft.etl.common.Constants类,该类为全局常量类,定义了相关的常量,使用的时候直接通过Constants类调用,即方便修改,又可以避免手写时写错。
  1. package com.qrsoft.etl.common;
  2. public class Constants {
  3. //间隔时间(10分钟)
  4. public final static int INTERVAL_TIME_10MIN = 10*60*1000;
  5. //间隔时间(5分钟)
  6. public final static int INTERVAL_TIME_5MIN = 5*60*1000;
  7. //间隔时间(1分钟)
  8. public final static int INTERVAL_TIME_1MIN = 60*1000;
  9. //间隔时间(30秒)
  10. public final static int INTERVAL_TIME_30SEC = 30*1000;
  11. //间隔时间(10秒)
  12. public final static int INTERVAL_TIME_10SEC = 10*1000;
  13. //间隔时间(10秒)
  14. public final static int INTERVAL_TIME_5SEC = 5*1000;
  15. //每分钟读取条数
  16. public final static int READ_COUNT = 10;
  17. //kg_airport
  18. public final static String TABLE_AIRPORT = "kg_airport";
  19. //kg_airlinecompany
  20. public final static String TABLE_AIRLINECOMPANY = "kg_airlinecompany";
  21. //kg_PlanData计划数据
  22. public final static String TASK_PlANDATA = "task_PlanData";
  23. public final static String TABLE_PlANDATA = "Kg_PlanData";
  24. public final static String FAMILY_PlANDATA = "ReportHome";
  25. public final static String COLUMN_PlANDATA = "EXECUTE_DATE";
  26. //kg_MultiRadarData综合航迹数据
  27. public final static String TASK_RADAR = "task_Radar";
  28. public final static String TABLE_RADAR = "Kg_MultiRadarData";
  29. public final static String FAMILY_RADAR = "RadarHome";
  30. public final static String COLUMN_RADAR = "SEND_RADAR_TIME";
  31. //kg_AFTN报文数据
  32. public final static String TASK_AFTN = "task_Aftn";
  33. public final static String TABLE_AFTN = "Kg_AFTN";
  34. public final static String FAMILY_AFTN = "AFTNHome";
  35. public final static String COLUMN_AFTN = "EXECUTE_DATE";
  36. //Kg_ATCDutyInfo管制值班人员数据
  37. public final static String TASK_ATCDUTY = "task_ATCDuty";
  38. public final static String TABLE_ATCDUTY = "Kg_ATCDutyInfo";
  39. public final static String FAMILY_ATCDUTY = "ATCDutyHome";
  40. public final static String COLUMN_ATCDUTY = "SEND_TIME";
  41. //Kg_WarnFlightHistory航班指令告警数据
  42. public final static String TASK_WARNFLIGHT = "task_WarnFlight";
  43. public final static String TABLE_WARNFLIGHT = "Kg_WarnFlightHistory";
  44. public final static String FAMILY_WARNFLIGHT = "WarnFlightHome";
  45. public final static String COLUMN_WARNFLIGHT = "GJ_DATE";
  46. //Kg_WarnSimilarHistory相似航班号告警数据
  47. public final static String TASK_WARNSIMILAR = "task_WarnSimilar";
  48. public final static String TABLE_WARNSIMILAR = "Kg_WarnSimilarHistory";
  49. public final static String FAMILY_WARNSIMILAR = "WarnSimilarHome";
  50. public final static String COLUMN_WARNSIMILAR = "GJ_DATE";
  51. //Kg_ATC扇区信息
  52. public final static String TASK_ATC = "task_ATC";
  53. public final static String TABLE_ATC = "Kg_ATC";
  54. public final static String FAMILY_ATC = "ATCHome";
  55. public final static String COLUMN_ATC = "EXECUTE_DATE";
  56. //Kg_CallSaturation 扇区通话饱和度信息
  57. public final static String TASK_CALLSATURATION = "task_CallSaturation";
  58. public final static String TABLE_CALLSATURATION = "Kg_CallSaturation";
  59. public final static String FAMILY_CALLSATURATION = "SaturationHome";
  60. public final static String COLUMN_CALLSATURATION = "SEND_TIME";
  61. }

2、创建com/qrsoft/etl/spark/SparkStreamingApplication.java类,在该类中配置Kafka和Spark的执行参数,然后使用Spark进行业务数据处理,在代码中会根据不同的Topic进入不同的分支,进行数据的处理。

  • 创建SparkStreamingApplication.java类,设置Zookeeper的brokers和zkServers,设置需要监听的Topic List
  1. package com.qrsoft.etl.spark;
  2. import com.qrsoft.etl.common.Constants;
  3. import com.qrsoft.etl.util.ConfigUtil;
  4. import net.sf.json.JSONObject;
  5. import org.apache.kafka.clients.consumer.ConsumerConfig;
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;
  7. import org.apache.kafka.common.serialization.StringDeserializer;
  8. import org.apache.spark.SparkConf;
  9. import org.apache.spark.api.java.JavaSparkContext;
  10. import org.apache.spark.streaming.Durations;
  11. import org.apache.spark.streaming.api.java.JavaInputDStream;
  12. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  13. import org.apache.spark.streaming.kafka010.ConsumerStrategies;
  14. import org.apache.spark.streaming.kafka010.KafkaUtils;
  15. import org.apache.spark.streaming.kafka010.LocationStrategies;
  16. import org.springframework.stereotype.Component;
  17. import java.io.Serializable;
  18. import java.util.Arrays;
  19. import java.util.Collection;
  20. import java.util.HashMap;
  21. import java.util.Map;
  22. import java.util.regex.Pattern;
  23. @Component
  24. public class SparkStreamingApplication implements Serializable {
  25. static final Pattern SPACE = Pattern.compile(" ");
  26. // 多个以逗号隔开
  27. static String brokers = ConfigUtil.readProperty("brokers");
  28. static String zkserver = ConfigUtil.readProperty("zkserver");
  29. // 消费者组名称
  30. static String groupId = "spark_etl";
  31. // topic列表
  32. static String topicsStr = Constants.TASK_RADAR;
  33. static String[] topicsStrs = {
  34. Constants.TASK_PlANDATA,
  35. Constants.TASK_WARNFLIGHT,
  36. Constants.TASK_ATCDUTY,
  37. Constants.TASK_WARNSIMILAR,
  38. Constants.TASK_AFTN,
  39. Constants.TASK_ATC,
  40. Constants.TASK_CALLSATURATION,
  41. Constants.TASK_RADAR
  42. };
  43. /**
  44. * 启动Spark读取、清洗数据
  45. */
  46. public void SparkEtlStart() {
  47. // ... 在此处添加代码 ...
  48. }
  49. }
  • 在SparkEtlStart()方法内添加代码,配置Spark和Kafka参数,创建StreamingContext

创建StreamingContext,设置拉取流的时间,准备读取Kafka数据。本地开发时Spark配置使用local[*]方式,设置成本地运行模式,放到集群中运行时需要修改为Yarn模式。

  1. SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("qst-etl");
  2. //反压机制
  3. conf.set("spark.streaming.backpressure.enabled", "true");
  4. conf.set("allowMultipleContexts", "true");
  5. JavaSparkContext sc = new JavaSparkContext(conf);
  6. // 获取jssc 以及设置获取流的时间
  7. JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));
  8. jssc.sparkContext().setLogLevel("WARN");
  9. // Kafka 参数配置
  10. Map<String, Object> kafkaParams = new HashMap<>();
  11. kafkaParams.put("zookeeper.connect", zkserver);
  12. kafkaParams.put("bootstrap.servers", brokers);
  13. kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //指定了KafkaConsumershuyu 哪一个消费者群组
  14. kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  15. kafkaParams.put("key.deserializer", StringDeserializer.class);
  16. kafkaParams.put("value.deserializer", StringDeserializer.class);
  17. kafkaParams.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
  18. kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //读取Kafka最新的一条
  19. kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
  20. Collection<String> topics = Arrays.asList(topicsStrs);
  • 拉取Kafka数据流
  1. // 获取流
  2. JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
  3. LocationStrategies.PreferConsistent(),
  4. ConsumerStrategies.Subscribe(topics, kafkaParams));
  • 解析Kafka数据流:解析流,对流进行循环处理,首先把流区分Topic,解析流中的value,其次根据不同Topic进入不同的分支,进行处理
  1. stream.foreachRDD(rdd -> {
  2. rdd.foreach(t -> {
  3. String topName = t.topic();
  4. JSONObject jsonObject = new JSONObject();
  5. String taskRadar = "";
  6. if(topName.equals(Constants.TASK_RADAR)){
  7. taskRadar = t.value();
  8. }else{
  9. jsonObject = JSONObject.parseObject(t.value());
  10. }
  11. SparkUtil sparkUtil = new SparkUtil();
  12. try {
  13. switch (topName) {
  14. case Constants.TASK_RADAR:
  15. // sparkUtil.TaskRadar(taskRadar);
  16. sparkUtil.TaskRadarStr(taskRadar);
  17. break;
  18. case Constants.TASK_PlANDATA:
  19. sparkUtil.TaskPlanData(jsonObject);
  20. break;
  21. case Constants.TASK_WARNFLIGHT:
  22. sparkUtil.TaskWarnfLight(jsonObject);
  23. break;
  24. case Constants.TASK_ATCDUTY:
  25. sparkUtil.TaskAtcduty(jsonObject);
  26. break;
  27. case Constants.TASK_WARNSIMILAR:
  28. sparkUtil.TaskWarnsimilar(jsonObject);
  29. break;
  30. case Constants.TASK_AFTN:
  31. sparkUtil.TaskAftn(jsonObject);
  32. break;
  33. case Constants.TASK_ATC:
  34. sparkUtil.TaskAtc(jsonObject);
  35. break;
  36. case Constants.TASK_CALLSATURATION:
  37. sparkUtil.TaskCallsaturation(jsonObject);
  38. break;
  39. }
  40. } catch (Exception e) {
  41. System.out.println(e);
  42. }
  43. //return Arrays.asList(SPACE.split(t.value())).iterator();
  44. });
  45. });
  • 启动Spark,进行业务处理
  1. // 打印结果
  2. //warns.print();
  3. try {
  4. // 启动
  5. jssc.start();
  6. jssc.awaitTermination();
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }

3、将清洗后的数据保存到数据库中

  • 在上一步的代码中,解析Kafka数据流时,首先把流区分Topic,解析流中的value,其次根据不同Topic进入不同的分支,进行处理。例如:
  1. case Constants.TASK_PlANDATA:
  2. sparkUtil.TaskPlanData(jsonObject);
  3. break;

该分支是处理机场起降数据,这里会用到一个类SparkUtil.java,该类中定义了处理不同Topic数据的方法,其中sparkUtil.TaskPlanData(jsonObject)就是处理机场起降数据对应的方法。

主要任务是:对起降信息进行统计和更新、对航线信息进行统计和更新:

1)首先判断是否有该机场航班起降的统计信息,如果数据库中没有该机场数据,则在数据库中插入;如果有则根据条件进行更新数据;

2)对于航线信息也是如此,如果数据库中没有相应的航线数据,则在数据库中插入;否则根据条件进行更新。

  • 编写com.qrsoft.etl.spark.SparkUtil类,代码位置src/main/java/com/qrsoft/etl/spark/SparkUtil.java,在该类中添加一个方法TaskPlanData(jsonObject),用于处理“机场起降数据”对应的Topic中的数据。
  1. package com.qrsoft.etl.spark;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.qrsoft.etl.dao.PlanDataDao;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.stereotype.Component;
  7. import java.text.SimpleDateFormat;
  8. import java.util.Date;
  9. @Component
  10. public class SparkUtil {
  11. private final static Logger logger = LoggerFactory.getLogger(SparkUtil.class);
  12. // 初始化扇区
  13. private static Double[] sectionG = {38.716066,42.297914,114.648477,128.759203};
  14. private static Double[] sectionK = {35.519796,38.716066,114.648477,128.759203};
  15. private static Double[] sectionE = {32.519796,35.519796,114.648477,128.759203};
  16. /**
  17. * 业务处理
  18. * @param jsonObject 机场起降数据
  19. * @throws Exception
  20. */
  21. public void TaskPlanData(JSONObject jsonObject) throws Exception {
  22. //起飞机场
  23. String adep = jsonObject.getString("ADEP");
  24. //降落机场
  25. String ades = jsonObject.getString("ADES");
  26. //操作数据库,统计和更新机场航班数
  27. operationDB(adep);
  28. operationDB(ades);
  29. //航班号
  30. String acid = jsonObject.getString("ACID");
  31. //操作数据库,统计和更新航线信息
  32. operationDBBOLT(adep, ades, acid);
  33. }
  34. /**
  35. * 操作数据库(对航班起降数进行统计或更新)
  36. * @param code “起飞机场”或“降落机场”
  37. */
  38. public void operationDB(String code) {
  39. //根据机场代码获取目前数据库中已存在的航班数
  40. PlanDataDao pDao = new PlanDataDao();
  41. boolean bool;
  42. try {
  43. bool = pDao.isExistThisAir(code);
  44. if (bool) {
  45. //存在,在原来基础上+1,修改数据库中该机场的航班数
  46. pDao.updateAnAirMsg(code);
  47. } else {
  48. //不存在,在统计表中创建该机场的航班数(默认为1
  49. pDao.createAnAirMsg(code);
  50. }
  51. } catch (Exception e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. /**
  56. * 操作数据库(对航线进行统计或更新)
  57. * @param adep 起飞机场
  58. * @param ades 降落机场
  59. * @param acid 航班号
  60. */
  61. public void operationDBBOLT(String adep, String ades, String acid) {
  62. boolean bool;
  63. PlanDataDao pDao = new PlanDataDao();
  64. if (pDao.isDomesticThisLine(adep) && pDao.isDomesticThisLine(ades)) {
  65. bool = pDao.isExistThisLine(acid);
  66. if (bool) {
  67. pDao.updateAnLineMsg(acid);
  68. } else {
  69. pDao.createAnLineMsg(acid, adep, ades);
  70. }
  71. }
  72. }
  73. // ... ...
  74. // ... 其他方法。因为当前要实现的是“机场起降数据”,所以其他可以只有方法体,没有方法实现及返回值。 ...
  75. // ... ...
  76. public void TaskRadarStr(String taskRadar) { }
  77. public void TaskWarnfLight(JSONObject jsonObject) { }
  78. public void TaskAtcduty(JSONObject jsonObject) { }
  79. public void TaskWarnsimilar(JSONObject jsonObject) { }
  80. public void TaskAftn(JSONObject jsonObject) { }
  81. public void TaskAtc(JSONObject jsonObject) { }
  82. public void TaskCallsaturation(JSONObject jsonObject) { }
  83. // ... ...
  84. }
  • 在上面的代码中会用到一个com.qrsoft.etl.dao.PlanDataDao类,代码位置src/main/java/com/qrsoft/etl/dao/PlanDataDao.java。该类是一个数据库操作类,会根据业务逻辑进行实际的数据库的操作,如增、删、改、查等。
  1. package com.qrsoft.etl.dao;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.sql.ResultSet;
  5. import java.sql.SQLException;
  6. public class PlanDataDao extends IBaseDao {
  7. private final static Logger logger = LoggerFactory.getLogger(PlanDataDao.class);
  8. // ... ...
  9. // ... 添加方法实现 ...
  10. // ... ...
  11. }
  • 对于“处理机场起降数据”会涉及到以下的方法:

查询该机场是否在国内:

  1. public boolean isDomesticThisLine(String code4){
  2. String sql = " SELECT COUNT(*) from airport_longlat where code4 ='"+code4+"';";
  3. Object[] params = {};
  4. ResultSet comRs = this.execute(sql, params);
  5. return getBool(comRs);
  6. }

根据机场代码查询是否有该机场的统计信息:

  1. public boolean isExistThisAir(String code) {
  2. String sql = " SELECT COUNT(*) from airport_number where flightcode='"+code+"';";
  3. Object[] params = {};
  4. ResultSet airRs = this.execute(sql, params);
  5. return getBool(airRs);
  6. }

如果根据机场代码查询,有该机场的统计信息,则在数据库中更新机场的起降航班数:

  1. public void updateAnAirMsg(String code) {
  2. String sql = "update airport_number set count=count+'1' where flightcode='"+code+"'; ";
  3. Object[] params = {};
  4. try {
  5. this.update(sql, params);
  6. } catch (SQLException e) {
  7. logger.info("修改指定机场的统计信息(统计数在原来基础上+1)失败! " + code);
  8. e.printStackTrace();
  9. }
  10. }

如果根据机场代码查询,有该机场的统计信息,则在统计表中创建该机场的航班数(默认为1):

  1. public void createAnAirMsg(String code) {
  2. String sql = "insert into airport_number (flightcode,cname,count) values ('"+code+"',(select airport_cname from kg_airport where AIRPORT_CODE4 = '"+code+"'),'1');";
  3. Object[] params = {};
  4. try {
  5. this.update(sql, params);
  6. } catch (SQLException e) {
  7. logger.info("创建新机场的统计信息失败! " + code);
  8. e.printStackTrace();
  9. }
  10. }

根据航班号查询是否有该航线存在:

  1. public boolean isExistThisLine(String acid){
  2. String sql = " SELECT COUNT(*) from airline_number where acid ='"+acid+"';";
  3. Object[] params = {};
  4. ResultSet comRs = this.execute(sql, params);
  5. return getBool(comRs);
  6. }

根据航班号查询,有该航线统计信息,则在统计表中修改指定航线的统计信息(统计数在原来的基础上+1):

  1. public void updateAnLineMsg(String acid) {
  2. String sql = "update airline_number set count=count+1 where acid='"+acid+"';";
  3. Object[] params = {};
  4. try {
  5. this.update(sql, params);
  6. } catch (SQLException e) {
  7. logger.info("修改指定航线统计信息(统计数在原来基础上+1)失败! 航班号:" + acid);
  8. e.printStackTrace();
  9. }
  10. }

根据航班号查询,没该航线统计信息,则创建新航线的统计信息:

  1. public void createAnLineMsg(String acid,String aDEP,String aDES) {
  2. String sql = "insert into airline_number (acid,adepcode,adescode,adepname,adesname,adeplong,adeplat,adeslong,adeslat,count) values ('"+acid+"','"+aDEP+"','"+aDES+"',(select airport_cname from kg_airport where airport_code4 = '"+aDEP+"'),(select airport_cname from kg_airport where airport_code4 = '"+aDES+"'),(select longitude from airport_longlat where code4 = '"+aDEP+"'),(select latitude from airport_longlat where code4 = '"+aDEP+"'),(select longitude from airport_longlat where code4 = '"+aDES+"'),(select latitude from airport_longlat where code4 = '"+aDES+"'),'1') ;";
  3. Object[] params = {};
  4. try {
  5. this.update(sql, params);
  6. } catch (SQLException e) {
  7. logger.info("创建新航线的统计信息失败! 航班号:" + acid);
  8. e.printStackTrace();
  9. }
  10. }

该PlanDataDao.java类是一个普通的数据库操作类,非常简单,主要是根据我们的业务需求对数据进行相应的操作。这里会涉及到前面我们创建的一些数据库访问的工具类和辅助类:

com.qrsoft.etl.dao.IBaseDao数据库访问的通用类,包括创建连接、执行更新等通用操作
com.qrsoft.etl.common.db.ConnectionPoolManager连接管理类
com.qrsoft.etl.common.db.IConnectionPool连接池管理类接口
com.qrsoft.etl.common.db.ConnectionPool连接池管理类接口实现类
com.qrsoft.etl.common.db.DBbean这是外部可以配置的连接池属性 可以允许外部配置,拥有默认值
com.qrsoft.etl.common.db.DBInitInfo初始化,模拟加载所有的配置文件
com.qrsoft.etl.common.Constants全局常量类
com.qrsoft.etl.util.ConfigUtil加载配置文件的工具类
com.qrsoft.etl.util.ConfigManager配置文件管理的工具类
com.qrsoft.etl.util.MapManager地图管理的工具类,例如:是否在矩形区域内
  • 添加或修改如下配置文件:

1)application.yml,服务器相关配置

  1. server:
  2. port: 8849
  3. spring:
  4. datasource:
  5. driver-class-name: com.mysql.jdbc.Driver
  6. username: root
  7. password: 123456
  8. url: jdbc:mysql://node3:3306/kongguan?serverTimezone=UTC
  9. redis:
  10. host: node3
  11. port: 6379
  12. database: 15
  13. mybatis-plus:
  14. configuration:
  15. log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
  16. mapper-locations: classpath:/mapper/*.xml

2)config.properties,MySQL数据库相关配置

  1. jdbc.driver=com.mysql.jdbc.Driver
  2. jdbc.url=jdbc:mysql://node3:3306/kongguan?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai
  3. jdbc.username=root
  4. jdbc.password=123456
  5. jdbc.min=20
  6. jdbc.max=500

3)log4j.properties,日志文件相关配置

  1. #定义LOG输出级别
  2. #log4j.rootLogger=INFO,Console,File,stdout
  3. log4j.rootLogger=Console
  4. log4j.rootCategory = ERROR,Console
  5. #定义日志输出目的地为控制台
  6. log4j.appender.Console=org.apache.log4j.ConsoleAppender
  7. log4j.appender.Console.Target=System.out
  8. #可以灵活地指定日志输出格式,下面一行是指定具体的格式
  9. log4j.appender.Console.layout = org.apache.log4j.PatternLayout
  10. log4j.appender.Console.layout.ConversionPattern=[%c] - %m%n
  11. #文件大小到达指定尺寸的时候产生一个新的文件
  12. # log4j.appender.File = org.apache.log4j.RollingFileAppender
  13. log4j.appender.File.Append=true
  14. log4j.appender.File = org.apache.log4j.DailyRollingFileAppender
  15. #指定输出目录
  16. log4j.appender.File.File = /home/tmp/hbase
  17. log4j.appender.File.DatePattern = '_'yyyy-MM-dd'.log'
  18. #定义文件最大大小
  19. log4j.appender.File.MaxFileSize = 10MB
  20. # 输出所以日志,如果换成DEBUG表示输出DEBUG以上级别日志
  21. log4j.appender.File.Threshold = ERROR
  22. log4j.appender.File.layout = org.apache.log4j.PatternLayout
  23. log4j.appender.File.layout.ConversionPattern =[%p] [%d{yyyy-MM-dd HH\:mm\:ss}][%c]%m%n
  24. # mybatis日志输出
  25. log4j.logger.com.sarnath.ind.dao.IRoleDao.addPermission=TRACE
  26. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  27. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  28. log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

4)myconfig.properties,Zookeeper和Kafka相关配置

  1. brokers: node1:9092,node2:9092,node3:9092
  2. zkserver: node1:2181,node2:2181,node3:2181

注意,在上面的步骤中只处理了一个Topic分支的数据,其余Topic分支,参考TASK_PlANDATA方式处理,自行实现其他XXX统计(请参考源代码)。

4、修改项目启动文件BigDataEtlKongGuanApplication.java,内容如下:

  1. package com.qrsoft;
  2. import com.qrsoft.etl.spark.SparkStreamingApplication;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.ConfigurableApplicationContext;
  6. import org.springframework.scheduling.annotation.EnableScheduling;
  7. @SpringBootApplication
  8. @EnableScheduling
  9. public class BigDataEtlKongGuanApplication {
  10. public static void main(String[] args) {
  11. ConfigurableApplicationContext run = SpringApplication.run(BigDataEtlKongGuanApplication.class, args);
  12. SparkStreamingApplication bean = run.getBean(SparkStreamingApplication.class);
  13. bean.SparkEtlStart();
  14. }
  15. }

5、测试

  • 确保Hadoop、Spark、Kafka、MySQL等环境均已经启动,如果没有启动,可参考前面的安装部署任务,自行启动。
  • 启动BigData-KongGuan项目(如果没有启动)
  • 启动BigData-Etl-KongGuan项目

  • 在node3节点,进入MySQL数据库,查询 airline*相关的表是否有数据
[root@node3 ~]# mysql -uroot -p123456
mysql> show databases;
mysql> use kongguan;
mysql> select * from airline_number limit 20;

例如:统计airline_number表,记录数是递增的。

mysql> select count(*) from airline_number;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/365204
推荐阅读
相关标签
  

闽ICP备14008679号