当前位置:   article > 正文

Flink(62):Flink中通用MySQLUtil工具类_flink 数据库连接池

flink 数据库连接池

目录

0. 相关文章链接

1. 开发目的

2. 导入依赖

3. 代码

3.1. 方法说明

3.2. 具体实现

4. 如何使用

5. Hikari连接池各配置说明

6. 注意点

7. 静态MySQLUtil工具类 


0. 相关文章链接

Flink文章汇总

1. 开发目的

        在使用SpringBoot后端开发中,我们如果需要对MySQL进行增删查改,可以很方便的使用Mybatis进行操作。但是在大数据中,如果想要对MySQL进行操作,就没有那么方便,特别当flink新一代流式计算框架兴起后,在老版本中没有读取和写入MySQL的连接源,虽然在后续新版本中有以及社区开发的其他补充项目中有source源和sink源了(比如flink-cdc和写入MySQL的sink方法),但当中间需要读取维度数据时,还是不方便。此时一个较为方便的工具类就能很方便的使用,能达到节省开发时间、减小开发难度等目的。

2. 导入依赖

以MySQL8.x版本为例

  1. <!--MySQL驱动包 mysql8版本-->
  2. <dependency>
  3. <groupId>mysql</groupId>
  4. <artifactId>mysql-connector-java</artifactId>
  5. </dependency>
  6. <!-- 日志打印的jar包 -->
  7. <dependency>
  8. <groupId>log4j</groupId>
  9. <artifactId>log4j</artifactId>
  10. <scope>provided</scope>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.slf4j</groupId>
  14. <artifactId>slf4j-api</artifactId>
  15. <scope>provided</scope>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.slf4j</groupId>
  19. <artifactId>slf4j-log4j12</artifactId>
  20. <scope>provided</scope>
  21. </dependency>
  22. <!-- json解析包,fastjson包 -->
  23. <dependency>
  24. <groupId>com.alibaba</groupId>
  25. <artifactId>fastjson</artifactId>
  26. </dependency>
  27. <!--commons-beanutils 是 Apache 开源组织提供的用于操作 JAVA BEAN 的工具包。使用 commons-beanutils,我们可以很方便的对 bean 对象的属性进行操作-->
  28. <dependency>
  29. <groupId>commons-beanutils</groupId>
  30. <artifactId>commons-beanutils</artifactId>
  31. <scope>provided</scope>
  32. </dependency>
  33. <!--Guava 工程包含了若干被 Google 的 Java 项目广泛依赖的核心库,方便开发-->
  34. <dependency>
  35. <groupId>com.google.guava</groupId>
  36. <artifactId>guava</artifactId>
  37. <scope>provided</scope>
  38. </dependency>
  39. <!-- 数据库连接池和jdbc操作模板 -->
  40. <dependency>
  41. <groupId>org.springframework.boot</groupId>
  42. <artifactId>spring-boot-starter-jdbc</artifactId>
  43. <version>${springboot.version}</version>
  44. </dependency>
  45. <dependency>
  46. <groupId>com.zaxxer</groupId>
  47. <artifactId>HikariCP</artifactId>
  48. <version>2.6.1</version>
  49. </dependency>

对应版本号:

  1. <properties>
  2. <maven.compiler.source>8</maven.compiler.source>
  3. <maven.compiler.target>8</maven.compiler.target>
  4. <scala.binary.version>2.11</scala.binary.version>
  5. <scala.version>2.11.8</scala.version>
  6. <flink.binary.version>1.10</flink.binary.version>
  7. <flink.version>1.10.0</flink.version>
  8. <alink.version>1.4.0</alink.version>
  9. <log4j.version>1.2.17</log4j.version>
  10. <slf4j.version>1.7.21</slf4j.version>
  11. <mysql.version>8.0.21</mysql.version>
  12. <fastjson.version>1.2.75</fastjson.version>
  13. <huaweicloud.dws.jdbc.version>8.1.0</huaweicloud.dws.jdbc.version>
  14. <commons.beanutils.version>1.9.4</commons.beanutils.version>
  15. <guava.version>29.0-jre</guava.version>
  16. <okhttp.version>3.6.0</okhttp.version>
  17. <springboot.version>2.0.2.RELEASE</springboot.version>
  18. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  19. <avro.version>1.10.0</avro.version>
  20. </properties>

3. 代码

注意:下述代码中使用了自定义的ModelUtil工具类,该工具类的具体介绍可以参考博主的另一篇博文:Flink(60):Flink中通用ModelUtil工具类

3.1. 方法说明

  • 构造方法:
    • MySQLUtil(String url, String username, String password, int maxConnect, int minConnect) :根据传入的MySQL各项信息和连接池连接的数量来构建连接池
    • MySQLUtil(String url, String username, String password) :根据传入的MySQL各项信息,使用默认的连接池配置(最大连接2个,最小连接1个)

  • 初始化JdbcTemplate:
    • initJdbcTemplate(String url, String username, String password, int maxConnect, int minConnect) :在构造方法中调用,使用HikariPool连接池初始化JdbcTemplate,并赋值给成员变量,后续使用

  • 处理特殊字符:
    • disposeSpecialCharacter(Object object) :对传入的数据中的特殊字符进行处理,比如当我们拼接SQL时,使用的是单引号拼接,但当数据中有单引号就会报错,可以对这些特殊字符进行处理

  • 查询方法:
    • List<T> queryList(String sql, Class<T> clz, boolean underScoreToCamel) :通过输入的SQL语句查询MySQL表中的数据,并将数据转换成传入的clz对应的对象

  • 插入方法:
    • insert(String tableName, boolean underScoreToCamel, Object object) : 通过输入的表名,以及是否对字段名进行下划线驼峰命名转换,将传入的数据(object对象,可以是bean对象,也可以是JSONObject对象)插入到对应的表中

  • 删除方法:
    • delete(String tableName, Map<String, Object> fieldNameAndValue) : 通过传入的表名以及删除的条件(Map集合),删除对应表中对应的数据
    • delete(String tableName, boolean underScoreToCamel, Object object, String... fields) : 通过传入的表名,以及是否对字段名进行下划线驼峰命名转换,并使用传入对象中给定的字段进行匹配,删除匹配到的数据

  • 更新方法:
    • update(String tableName, boolean underScoreToCamel, Object object, Map<String, Object> fieldNameAndValue) :通过传入的表名,以及是否对字段名进行下划线驼峰命名转换,将对应的object对象中的数据更新到表中,使用传入Map集合作为匹配条件
    • update(String tableName, boolean underScoreToCamel, Object object, String... fields) : 通过传入的表名,以及是否对字段名进行下划线驼峰命名转换,将对应的object对象中的数据更新到表中,使用传入的字段集合作为更新条件

  • upsert方法:
    • upsertByPrimaryKey(String tableName, boolean underScoreToCamel, Object object, Map<String, Object> fieldNameAndValue) : 根据主键使用ON DUPLICATE KEY UPDATE语法对对应的表中的数据进行更新,也可以对传入的数据中的字段是否下划线驼峰命名转换,并将传入的Map集合作为update条件
    • upsertByPrimaryKey(String tableName, boolean underScoreToCamel, Object object, String... fields) : 根据主键使用ON DUPLICATE KEY UPDATE语法对对应的表中的数据进行更新,也可以对传入的数据中的字段是否下划线驼峰命名转换,并将传入的字段集合作为update条件(字段集合必须在object中有数据)
    • upsert(String tableName, boolean underScoreToCamel, Object object, String... fields) :先使用上述update方法,如果返回变更的条数等于0,就执行上述的insert方法,如果大于0就不执行
    • upsert(String tableName, boolean underScoreToCamel, Object object, Map<String, Object> fieldNameAndValue) : 先使用上述update方法,如果返回变更的条数等于0,就执行上述的insert方法,如果大于0就不执行

3.2. 具体实现

  1. package com.yishou.bigdata.common.utils;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.google.common.base.CaseFormat;
  5. import com.google.common.collect.Lists;
  6. import com.zaxxer.hikari.HikariDataSource;
  7. import org.apache.commons.beanutils.BeanUtils;
  8. import org.apache.commons.lang3.StringUtils;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.jdbc.core.JdbcTemplate;
  12. import java.util.*;
  13. /**
  14. * @date: 2022/5/17
  15. * @author: yangshibiao
  16. * @desc: MySQLUtil
  17. */
  18. public class MySQLUtil {
  19. static Logger logger = LoggerFactory.getLogger(MySQLUtil.class);
  20. /**
  21. * jdbcTemplate
  22. */
  23. private JdbcTemplate jdbcTemplate;
  24. /**
  25. * 通过传入的参数创建MySQLUtil对象
  26. *
  27. * @param url mysql的url
  28. * @param username mysql的username
  29. * @param password mysql的password
  30. * @param maxConnect 连接池中最大连接数
  31. * @param minConnect 连接池中最小连接数
  32. */
  33. public MySQLUtil(String url, String username, String password, int maxConnect, int minConnect) {
  34. initJdbcTemplate(url, username, password, maxConnect, minConnect);
  35. }
  36. /**
  37. * 通过传入的参数创建MySQLUtil对象
  38. *
  39. * @param url mysql的url
  40. * @param username mysql的username
  41. * @param password mysql的password
  42. */
  43. public MySQLUtil(String url, String username, String password) {
  44. initJdbcTemplate(url, username, password, 2, 1);
  45. }
  46. /**
  47. * 初始化MySQL的jdbcTemplate
  48. *
  49. * @param url mysql的url
  50. * @param username mysql的username
  51. * @param password mysql的password
  52. * @param maxConnect 连接池中最大连接数
  53. * @param minConnect 连接池中最小连接数
  54. */
  55. public void initJdbcTemplate(String url, String username, String password, int maxConnect, int minConnect) {
  56. try {
  57. HikariDataSource ds = new HikariDataSource();
  58. Thread.sleep(1000);
  59. ds.setDriverClassName("com.mysql.cj.jdbc.Driver");
  60. ds.setJdbcUrl(url);
  61. ds.setUsername(username);
  62. ds.setPassword(password);
  63. ds.setMaximumPoolSize(maxConnect);
  64. ds.setMinimumIdle(minConnect);
  65. jdbcTemplate = new JdbcTemplate(ds);
  66. logger.info(
  67. "使用HikariPool连接池初始化JdbcTemplate成功,使用的URL为:{} , 其中最大连接大小为:{} , 最小连接大小为:{} ;",
  68. ds.getJdbcUrl(),
  69. ds.getMaximumPoolSize(),
  70. ds.getMinimumIdle()
  71. );
  72. } catch (Exception e) {
  73. e.printStackTrace();
  74. throw new RuntimeException("创建MySQL数据库的jdbcTemplate失败,抛出的异常信息为:" + e.getMessage());
  75. }
  76. }
  77. /**
  78. * 获取对应的 JdbcTemplate
  79. *
  80. * @return JdbcTemplate
  81. */
  82. public JdbcTemplate getJdbcTemplate() {
  83. return jdbcTemplate;
  84. }
  85. /**
  86. * 处理传入数据中的特殊字符(例如: 单引号),并将其中数据为空的过滤
  87. *
  88. * @param object 传入的数据对象
  89. * @return 返回的结果
  90. */
  91. public String disposeSpecialCharacter(Object object) {
  92. // 根据传入的情况,将数据转换成json格式(如果传入为string,那就本来是json格式,不需要转)
  93. String data;
  94. if (object instanceof String) {
  95. data = object.toString();
  96. } else {
  97. data = JSON.parseObject(JSON.toJSONString(object)).toString();
  98. }
  99. // 处理传入数据中的特殊字符(例如: 单引号)
  100. data = data.replace("'", "''");
  101. // 将其中为空值的去掉(注意:如果不能转换成json并从中获取数据的,就是从delete中传过来的,只有单纯的value值)
  102. try {
  103. JSONObject result = new JSONObject();
  104. for (Map.Entry<String, Object> entry : JSON.parseObject(data).entrySet()) {
  105. if (StringUtils.isNotEmpty(entry.getValue().toString())) {
  106. result.put(entry.getKey(), entry.getValue());
  107. }
  108. }
  109. data = result.toJSONString();
  110. } catch (Exception exception) {
  111. exception.printStackTrace();
  112. logger.warn("传入的数据为:{};该数据是从delete中传入的,不能转换成json值", object);
  113. }
  114. // 返回数据
  115. return data;
  116. }
  117. /**
  118. * 如果传入的clz中的属性又包含对象,会报错,此时传入JSONObject对象即可
  119. *
  120. * @param sql 执行的查询语句
  121. * @param clz 返回的数据类型
  122. * @param underScoreToCamel 是否将下划线转换为驼峰命名法
  123. * @param <T> 样例类
  124. * @return 样例类集合
  125. */
  126. public <T> List<T> queryList(String sql, Class<T> clz, boolean underScoreToCamel) {
  127. try {
  128. List<Map<String, Object>> mapList = jdbcTemplate.queryForList(sql);
  129. List<T> resultList = new ArrayList<>();
  130. for (Map<String, Object> map : mapList) {
  131. Set<String> keys = map.keySet();
  132. // 当返回的结果中存在数据,通过反射将数据封装成样例类对象
  133. T result = clz.newInstance();
  134. for (String key : keys) {
  135. String propertyName = underScoreToCamel ? CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, key) : key;
  136. BeanUtils.setProperty(
  137. result,
  138. propertyName,
  139. map.get(key)
  140. );
  141. }
  142. resultList.add(result);
  143. }
  144. return resultList;
  145. } catch (Exception exception) {
  146. exception.printStackTrace();
  147. throw new RuntimeException(
  148. "\r\n从MySQL数据库中 查询 数据失败," +
  149. "\r\n抛出的异常信息为:" + exception.getMessage() +
  150. "\r\n查询的SQL为:" + sql
  151. );
  152. }
  153. }
  154. /**
  155. * 将传入的数据插入到对应的MySQL的表中
  156. *
  157. * @param tableName 表名
  158. * @param underScoreToCamel 是否将驼峰转换为下划线
  159. * @param object 数据对象
  160. * INSERT INTO customer_t1 (c_customer_sk, c_first_name) VALUES (3769, 'Grace');
  161. */
  162. public void insert(String tableName, boolean underScoreToCamel, Object object) {
  163. // 将传入的对象转换成JSONObject格式(并将其中的特殊字符进行替换)
  164. JSONObject data = JSON.parseObject(disposeSpecialCharacter(object));
  165. // 从传入的数据中获取出对应的key和value,因为要一一对应,所以使用list
  166. ArrayList<String> fieldList = Lists.newArrayList(data.keySet());
  167. ArrayList<String> valueList = new ArrayList<>();
  168. for (String field : fieldList) {
  169. valueList.add(data.getString(field));
  170. }
  171. // 拼接SQL
  172. StringBuilder sql = new StringBuilder();
  173. sql.append(" INSERT INTO ").append(tableName);
  174. sql.append(" ( ");
  175. for (String field : fieldList) {
  176. if (underScoreToCamel) {
  177. sql.append(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field)).append(",");
  178. } else {
  179. sql.append(field).append(",");
  180. }
  181. }
  182. sql.deleteCharAt(sql.length() - 1);
  183. sql.append(" ) ");
  184. sql.append(" values ('").append(StringUtils.join(valueList, "','")).append("')");
  185. // 执行插入操作
  186. try {
  187. jdbcTemplate.execute(sql.toString());
  188. } catch (Exception exception) {
  189. exception.printStackTrace();
  190. throw new RuntimeException(
  191. "\r\n向MySQL数据库中 插入 数据失败," +
  192. "\r\n抛出的异常信息为:" + exception.getMessage() +
  193. "\r\n执行的SQL为:" + sql
  194. );
  195. }
  196. }
  197. /**
  198. * 根据主键删除对应数据
  199. * 注意:传入的字段名要和数据库中一一匹配,即数据库中有下划线,那传入的字段名也要有下划线
  200. *
  201. * @param tableName 表名
  202. * @param fieldNameAndValue 更新时匹配的字段(key)和值(value)(注意:传入的字段名要和数据库中一一匹配,即数据库中有下划线,那传入的字段名也要有下划线)
  203. * @return 删除时影响的条数
  204. */
  205. public int delete(String tableName, Map<String, Object> fieldNameAndValue) {
  206. // 拼接SQL
  207. StringBuilder sql = new StringBuilder();
  208. sql.append(" delete from ").append(tableName);
  209. if (fieldNameAndValue.size() > 0) {
  210. sql.append(" WHERE ");
  211. for (Map.Entry<String, Object> fieldNameAndValueEntry : fieldNameAndValue.entrySet()) {
  212. sql
  213. .append(fieldNameAndValueEntry.getKey())
  214. .append(" = ")
  215. .append("'")
  216. .append(disposeSpecialCharacter(fieldNameAndValueEntry.getValue()))
  217. .append("'")
  218. .append(" AND ");
  219. }
  220. sql.delete(sql.length() - 4, sql.length() - 1);
  221. } else {
  222. throw new RuntimeException("从MySQL中删除数据异常,输入的删除条件没有指定字段名和对应的值,会进行全表删除, 拼接的SQL为:" + sql);
  223. }
  224. // 执行删除操作
  225. try {
  226. return jdbcTemplate.update(sql.toString());
  227. } catch (Exception exception) {
  228. exception.printStackTrace();
  229. throw new RuntimeException(
  230. "\r\n向MySQL数据库中 删除 数据失败," +
  231. "\r\n抛出的异常信息为:" + exception.getMessage() +
  232. "\r\n执行的SQL为:" + sql
  233. );
  234. }
  235. }
  236. /**
  237. * 根据传入的表名、数据、字段名,删除表中对应的数据
  238. *
  239. * @param tableName 表名
  240. * @param underScoreToCamel 是否将驼峰转换为下划线
  241. * @param object 数据对象
  242. * @param fields 更新时匹配的字段名
  243. */
  244. public int delete(String tableName, boolean underScoreToCamel, Object object, String... fields) {
  245. // 将传入的对象转换成JSONObject格式
  246. JSONObject data = JSON.parseObject(disposeSpecialCharacter(object));
  247. // 根据传入的字段,获取要更新的主键值
  248. HashMap<String, Object> fieldNameAndValue = new HashMap<>();
  249. for (String field : fields) {
  250. if (underScoreToCamel) {
  251. // data中的均为驼峰,获取数据时需要使用驼峰;但是将数据写入到fieldNameAndValue中时,需要全部转换成下划线
  252. fieldNameAndValue.put(
  253. field.contains("_") ? field : CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field),
  254. data.getString(field.contains("_") ? CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field) : field)
  255. );
  256. } else {
  257. // data中均为下划线,field中也是下划线
  258. fieldNameAndValue.put(field, data.getString(field));
  259. }
  260. }
  261. // 调用重载函数,删除数据
  262. return delete(tableName, fieldNameAndValue);
  263. }
  264. /**
  265. * 将传入的数据 更新 到对应的MySQL的表中
  266. *
  267. * @param tableName 表名
  268. * @param underScoreToCamel 是否将驼峰转换为下划线
  269. * @param object 数据对象(既可以包含更新的主键,也可以不包含)
  270. * @param fieldNameAndValue 更新时匹配的字段和对应的值
  271. * @return 返回更新的条数
  272. */
  273. public int update(String tableName, boolean underScoreToCamel, Object object, Map<String, Object> fieldNameAndValue) {
  274. // 将传入的对象转换成JSONObject格式,并判断输入的数据是否符合更新条件
  275. JSONObject data = JSON.parseObject(disposeSpecialCharacter(object));
  276. if (fieldNameAndValue == null || fieldNameAndValue.size() == 0) {
  277. throw new RuntimeException("向MySQL中更新数据异常,输入的更新条件没有指定数据,不能更新(这样更新会全表更新),传入的数据为:" + data);
  278. }
  279. // 拼接SQL
  280. StringBuilder sql = new StringBuilder();
  281. sql.append(" UPDATE ").append(tableName);
  282. sql.append(" SET ");
  283. if (underScoreToCamel) {
  284. // 删除传入对象中要更新的数据
  285. for (String key : fieldNameAndValue.keySet()) {
  286. data.remove(key.contains("_") ? CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, key) : key);
  287. }
  288. // 拼接要更新的结果值
  289. for (Map.Entry<String, Object> entry : data.entrySet()) {
  290. sql
  291. .append(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, entry.getKey()))
  292. .append(" = ")
  293. .append("'")
  294. .append(entry.getValue())
  295. .append("'")
  296. .append(",");
  297. }
  298. sql.deleteCharAt(sql.length() - 1);
  299. // 拼接判断条件
  300. sql.append(" WHERE ");
  301. for (Map.Entry<String, Object> fieldNameAndValueEntry : fieldNameAndValue.entrySet()) {
  302. String key = fieldNameAndValueEntry.getKey();
  303. Object value = fieldNameAndValueEntry.getValue();
  304. sql
  305. .append(key.contains("_") ? key : CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, key))
  306. .append(" = ")
  307. .append("'")
  308. .append(value)
  309. .append("'")
  310. .append(" AND ");
  311. }
  312. } else {
  313. // 删除传入对象中要更新的数据
  314. for (String key : fieldNameAndValue.keySet()) {
  315. data.remove(key);
  316. }
  317. // 拼接要更新的结果值
  318. for (Map.Entry<String, Object> entry : data.entrySet()) {
  319. sql
  320. .append(entry.getKey())
  321. .append(" = ")
  322. .append("'")
  323. .append(entry.getValue())
  324. .append("'")
  325. .append(",");
  326. }
  327. sql.deleteCharAt(sql.length() - 1);
  328. // 拼接判断条件
  329. sql.append(" WHERE ");
  330. for (Map.Entry<String, Object> fieldNameAndValueEntry : fieldNameAndValue.entrySet()) {
  331. String key = fieldNameAndValueEntry.getKey();
  332. Object value = fieldNameAndValueEntry.getValue();
  333. sql
  334. .append(key)
  335. .append(" = ")
  336. .append("'")
  337. .append(value)
  338. .append("'")
  339. .append(" AND ");
  340. }
  341. }
  342. sql.delete(sql.length() - 4, sql.length() - 1);
  343. // 执行更新操作
  344. try {
  345. return jdbcTemplate.update(sql.toString());
  346. } catch (Exception exception) {
  347. exception.printStackTrace();
  348. throw new RuntimeException(
  349. "\r\n向MySQL数据库中 更新 数据失败," +
  350. "\r\n抛出的异常信息为:" + exception.getMessage() +
  351. "\r\n执行的SQL为:" + sql
  352. );
  353. }
  354. }
  355. /**
  356. * 将传入的数据 更新 到对应的MySQL的表中
  357. *
  358. * @param tableName 表名
  359. * @param underScoreToCamel 是否将驼峰转换为下划线
  360. * @param object 数据对象
  361. * @param fields 更新时匹配的字段名(如果underScoreToCamel为true,传入的字段为驼峰,如果underScoreToCamel为false,传入的字段为下划线)
  362. * @return 返回更新的条数
  363. */
  364. public int update(String tableName, boolean underScoreToCamel, Object object, String... fields) {
  365. // 将传入的对象转换成JSONObject格式
  366. JSONObject data = JSON.parseObject(disposeSpecialCharacter(object));
  367. // 根据传入的字段,获取要更新的主键值
  368. HashMap<String, Object> fieldNameAndValue = new HashMap<>();
  369. for (String field : fields) {
  370. if (underScoreToCamel) {
  371. field = field.contains("_") ? CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field) : field;
  372. }
  373. fieldNameAndValue.put(field, data.getString(field));
  374. }
  375. // 调用重载函数,更新数据
  376. return update(tableName, underScoreToCamel, object, fieldNameAndValue);
  377. }
  378. /**
  379. * 将传入的数据 upsert 到对应的MySQL的表中
  380. * 会根据MySQL表中的主键进行更新,如果该主键在表中有对应数据,就更新,没有就插入
  381. * 传入的数据中必须有主键
  382. * <p>
  383. * mysql中的upsert语法:
  384. * INSERT INTO Student(Stud_ID, Name, Email, City) VALUES (4, 'John', 'john@lidihuo.com', 'New York') ON DUPLICATE KEY UPDATE City = 'California';
  385. *
  386. * @param tableName 表名
  387. * @param underScoreToCamel 是否将驼峰转换为下划线
  388. * @param object 数据对象
  389. * @param fieldNameAndValue 更新时匹配的字段名(如果underScoreToCamel为true,传入的字段为驼峰,如果underScoreToCamel为false,传入的字段为下划线)
  390. * @return 返回更改的条数
  391. */
  392. public int upsertByPrimaryKey(String tableName, boolean underScoreToCamel, Object object, Map<String, Object> fieldNameAndValue) {
  393. // 判断输入的数据是否符合更新条件
  394. if (fieldNameAndValue == null || fieldNameAndValue.size() == 0) {
  395. throw new RuntimeException("向MySQL中更新数据异常,输入的更新条件没有指定数据,不能更新(这样更新会全表更新),传入的数据为:" + object);
  396. }
  397. // 将传入的object转换成json类型,并将传入的更新匹配字段和值(即fieldNameAndValue),添加到数据对象中(即data)
  398. JSONObject data = JSON.parseObject(JSON.toJSONString(object));
  399. for (Map.Entry<String, Object> entry : fieldNameAndValue.entrySet()) {
  400. data.put(entry.getKey(), entry.getValue());
  401. }
  402. data = JSON.parseObject(disposeSpecialCharacter(data));
  403. // 拼接SQL
  404. StringBuilder sql = new StringBuilder();
  405. sql.append(" INSERT INTO ").append(tableName);
  406. if (underScoreToCamel) {
  407. sql.append(" ( ");
  408. for (String key : data.keySet()) {
  409. sql.append(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, key)).append(",");
  410. }
  411. sql.deleteCharAt(sql.length() - 1);
  412. sql.append(" ) ");
  413. sql.append(" values ");
  414. sql.append(" ( ");
  415. for (Object value : data.values()) {
  416. sql
  417. .append("'")
  418. .append(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, value.toString()))
  419. .append("'")
  420. .append(",");
  421. }
  422. sql.deleteCharAt(sql.length() - 1);
  423. sql.append(" ) ");
  424. sql.append(" ON DUPLICATE KEY UPDATE ");
  425. for (Map.Entry<String, Object> entry : fieldNameAndValue.entrySet()) {
  426. sql
  427. .append(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, entry.getKey()))
  428. .append(" = ")
  429. .append("'")
  430. .append(entry.getValue())
  431. .append("'")
  432. .append(",");
  433. }
  434. sql.deleteCharAt(sql.length() - 1);
  435. } else {
  436. sql.append(" ( ");
  437. for (String key : data.keySet()) {
  438. sql.append(key).append(",");
  439. }
  440. sql.deleteCharAt(sql.length() - 1);
  441. sql.append(" ) ");
  442. sql.append(" values ");
  443. sql.append(" ( ");
  444. for (Object value : data.values()) {
  445. sql
  446. .append("'")
  447. .append(value.toString())
  448. .append("'")
  449. .append(",");
  450. }
  451. sql.deleteCharAt(sql.length() - 1);
  452. sql.append(" ) ");
  453. sql.append(" ON DUPLICATE KEY UPDATE ");
  454. for (Map.Entry<String, Object> entry : fieldNameAndValue.entrySet()) {
  455. sql
  456. .append(entry.getKey())
  457. .append(" = ")
  458. .append("'")
  459. .append(entry.getValue())
  460. .append("'")
  461. .append(",");
  462. }
  463. sql.deleteCharAt(sql.length() - 1);
  464. }
  465. // 执行upsert操作
  466. try {
  467. return jdbcTemplate.update(sql.toString());
  468. } catch (Exception exception) {
  469. exception.printStackTrace();
  470. throw new RuntimeException(
  471. "\r\n向MySQL数据库中 upsert 数据失败," +
  472. "\r\n抛出的异常信息为:" + exception.getMessage() +
  473. "\r\n执行的SQL为:" + sql
  474. );
  475. }
  476. }
  477. /**
  478. * 将传入的数据 upsert 到对应的MySQL的表中
  479. * 会根据MySQL表中的主键进行更新,如果该主键在表中有对应数据,就更新,没有就插入
  480. * 传入的数据中必须有主键
  481. *
  482. * @param tableName 表名
  483. * @param underScoreToCamel 是否将驼峰转换为下划线
  484. * @param object 数据对象
  485. * @param fields 更新时匹配的字段名(如果underScoreToCamel为true,传入的字段为驼峰,如果underScoreToCamel为false,传入的字段为下划线)
  486. * @return 返回更改的条数
  487. */
  488. public int upsertByPrimaryKey(String tableName, boolean underScoreToCamel, Object object, String... fields) {
  489. // 将传入的对象转换成JSONObject格式
  490. JSONObject data = JSON.parseObject(disposeSpecialCharacter(object));
  491. // 根据传入的字段,获取要更新的主键值
  492. HashMap<String, Object> fieldNameAndValue = new HashMap<>();
  493. for (String field : fields) {
  494. if (underScoreToCamel) {
  495. field = field.contains("_") ? CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field) : field;
  496. }
  497. fieldNameAndValue.put(field, data.getString(field));
  498. }
  499. // 调用重载函数,更新数据
  500. return upsertByPrimaryKey(tableName, underScoreToCamel, object, fieldNameAndValue);
  501. }
  502. /**
  503. * 将传入的数据 upsert 到对应的MySQL数据库的表中
  504. * 使用的是先用update进行数据更新,如果更新的条数为0,就进行插入
  505. * 如果表中包含主键,那在传入的数据中也必须要有主键
  506. *
  507. * @param tableName 表名
  508. * @param underScoreToCamel 是否将驼峰转换为下划线
  509. * @param object 数据对象
  510. * @param fields 更新时匹配的字段名(如果underScoreToCamel为true,传入的字段为驼峰,如果underScoreToCamel为false,传入的字段为下划线)
  511. * @return 返回更改的条数
  512. */
  513. public int upsert(String tableName, boolean underScoreToCamel, Object object, String... fields) {
  514. int updateNum = update(tableName, underScoreToCamel, object, fields);
  515. if (updateNum == 0) {
  516. insert(tableName, underScoreToCamel, object);
  517. updateNum = 1;
  518. }
  519. return updateNum;
  520. }
  521. /**
  522. * 将传入的数据 upsert 到对应的MySQL数据库的表中
  523. * 使用的是先用update进行数据更新,如果更新的条数为0,就进行插入
  524. * 如果表中包含主键,那在传入的数据中也必须要有主键
  525. *
  526. * @param tableName 表名
  527. * @param underScoreToCamel 是否将驼峰转换为下划线
  528. * @param object 数据对象
  529. * @param fieldNameAndValue 更新时匹配的字段名(如果underScoreToCamel为true,传入的字段为驼峰,如果underScoreToCamel为false,传入的字段为下划线)
  530. * @return 返回更改的条数
  531. */
  532. public int upsert(String tableName, boolean underScoreToCamel, Object object, Map<String, Object> fieldNameAndValue) {
  533. int updateNum = update(tableName, underScoreToCamel, object, fieldNameAndValue);
  534. if (updateNum == 0) {
  535. insert(tableName, underScoreToCamel, object);
  536. updateNum = 1;
  537. }
  538. return updateNum;
  539. }
  540. }

4. 如何使用

以在sink中使用该MySQL进行举例。

该代码实现的需求是:前序使用cdc功能从MySQL中采集到了binlog数据,现在将数据写入到另一个MySQL数据库中(该数据库中需要有对应的表,并且表结构和原数据库的表一模一样)

  1. dwsStream
  2. .keyBy(new KeySelector<JSONObject, String>() {
  3. @Override
  4. public String getKey(JSONObject element) throws Exception {
  5. // 根据传入的表名和主键进行分组
  6. StringBuilder result = new StringBuilder();
  7. result.append(element.getString("table")).append(" : ");
  8. for (Map.Entry<String, Object> entry : element.getJSONObject("primary_key").getInnerMap().entrySet()) {
  9. result.append("key=>").append(entry.getKey()).append(" ; value=>").append(entry.getValue());
  10. }
  11. return result.toString();
  12. }
  13. })
  14. .addSink(new RichSinkFunction<JSONObject>() {
  15. // 声明MySQLUtil变量
  16. MySQLUtil mySQLUtil;
  17. @Override
  18. public void open(Configuration parameters) throws Exception {
  19. // 在open方法中创建MySQLUtil工具类的对象
  20. mySQLUtil = new MySQLUtil(
  21. ModelUtil.getConfigValue("mysql.yishou.data.url"),
  22. ModelUtil.getConfigValue("mysql.yishou.data.username"),
  23. ModelUtil.getConfigValue("mysql.yishou.data.password"),
  24. 2,
  25. 1
  26. );
  27. }
  28. @Override
  29. public void invoke(JSONObject element, Context context) throws Exception {
  30. long start = 0L;
  31. long end = 0L;
  32. try {
  33. switch (element.getString("type")) {
  34. // 对传入数据进行判断,如果是insert或者update就使用upsert系列方法更新数据
  35. case "insert":
  36. case "update":
  37. start = System.currentTimeMillis();
  38. mySQLUtil.upsertByPrimaryKey(
  39. element.getString("data_output_topic"),
  40. false,
  41. element.getJSONObject("data"),
  42. element.getJSONObject("primary_key").getInnerMap()
  43. );
  44. end = System.currentTimeMillis();
  45. logger.info("向MySQL中upsert数据,耗时:{}, 操作的表名为:{}, 数据中的时间戳为:{}", (end - start), element.getString("data_output_topic"), element.getString("ts"));
  46. break;
  47. // 如果是delete类型数据,就使用delete方法删除数据
  48. case "delete":
  49. start = System.currentTimeMillis();
  50. mySQLUtil.delete(element.getString("data_output_topic"), element.getJSONObject("primary_key").getInnerMap());
  51. end = System.currentTimeMillis();
  52. logger.info("向MySQL中delete数据,耗时:{}, 操作的表名为:{}, 数据中的时间戳为:{}", (end - start), element.getString("data_output_topic"), element.getString("ts"));
  53. break;
  54. // 当是其他类型的数据,不进行操作
  55. default:
  56. logger.error("无匹配,跳过,传入的数据为:{}", element.toJSONString());
  57. break;
  58. }
  59. } catch (Exception e) {
  60. throw new RuntimeException(" ==> " + element);
  61. }
  62. }
  63. })

5. Hikari连接池各配置说明

  1. # Hikari will use the above plus the following to setup connection pooling
  2. spring.datasource.type=com.zaxxer.hikari.HikariDataSource
  3. #最小空闲连接,默认值10,小于0或大于maximum-pool-size,都会重置为maximum-pool-size
  4. spring.datasource.hikari.minimum-idle=5
  5. #最大连接数,小于等于0会被重置为默认值10;大于零小于1会被重置为minimum-idle的值
  6. spring.datasource.hikari.maximum-pool-size=15
  7. #自动提交从池中返回的连接,默认值为true
  8. spring.datasource.hikari.auto-commit=true
  9. #空闲连接超时时间,默认值600000(10分钟),大于等于max-lifetime且max-lifetime>0,会被重置为0;不等于0且小于10秒,会被重置为10秒。
  10. #只有空闲连接数大于最大连接数且空闲时间超过该值,才会被释放
  11. spring.datasource.hikari.idle-timeout=30000
  12. #连接池名称,默认HikariPool-1
  13. spring.datasource.hikari.pool-name=Hikari
  14. #连接最大存活时间.不等于0且小于30秒,会被重置为默认值30分钟.设置应该比mysql设置的超时时间短;单位ms
  15. spring.datasource.hikari.max-lifetime=55000
  16. #连接超时时间:毫秒,小于250毫秒,会被重置为默认值30秒
  17. spring.datasource.hikari.connection-timeout=30000
  18. #连接测试查询
  19. spring.datasource.hikari.connection-test-query=SELECT 1

6. 注意点

        在使用上述方法时,因为是手动拼接SQL,并且会过滤值为空的字段,这样的话,需要在创建MySQL表时,不能设置该字段not null,或者设置该字段为not null但给该字段默认值。

        但是在MySQL8.x版本中,对一下类型,比如json类型等不能设置默认值,此时可以在建表的时候先设置 sql_mode 为 空,然后在建表,就可以给这些字段设置默认值了。

默认的 sql_mode 的值:

设置 sql_mode 为 空(注意:不修改配置文件的话,只在此会话中生效):

7. 静态MySQLUtil工具类 

        在上述的工具类中,每次创建都需要进行new MySQLUtil()操作,来设置需要操作的MySQL的具体属性以及连接属性,这在将数据写入到MySQL库中是比较方便的;但当我们需要进行维度匹配时,一个作业中可能很多算子都是用的一个库,这样我们每个算子中都创建一个MySQL连接客户端就比较麻烦了,而且可能也会浪费资源。

        这时我们就可以将这个库的MySQL创建成一个静态的工具类,这样在所有作业中就不需要创建了,可以直接使用;而且因为静态的特性,在每台机器上只会创建一个连接池,这样也比较能节省资源(当然,连接池的大小不能跟上述设置成最小为1,最大为2一样,博主在生产环境中一般设置为最小为5,最大为50,这样在一台机器上设置4个并发,经过博主的压测,最高查询次数为1500次/秒)。

具体代码如下所示(因为主要是维度匹配,所以全部都是查询方法):

  1. package com.yishou.bigdata.common.utils;
  2. import com.google.common.base.CaseFormat;
  3. import com.zaxxer.hikari.HikariDataSource;
  4. import org.apache.commons.beanutils.BeanUtils;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.jdbc.core.JdbcTemplate;
  8. import java.util.*;
  9. import java.util.stream.Collectors;
  10. /**
  11. * @date: 2021/7/5
  12. * @author: yangshibiao
  13. * @desc: 对数据库R7(MySQL)的工具类
  14. */
  15. public class MySQLR7Util {
  16. static Logger logger = LoggerFactory.getLogger(MySQLR7Util.class);
  17. /**
  18. * jdbcTemplate
  19. */
  20. private static JdbcTemplate jdbcTemplate;
  21. /**
  22. * 使用单例模式获取MySQL数据库R7的连接
  23. *
  24. * @return jdbc连接
  25. */
  26. public static JdbcTemplate getJdbcTemplate() {
  27. if (jdbcTemplate == null) {
  28. synchronized (MySQLR7Util.class) {
  29. if (jdbcTemplate == null) {
  30. try {
  31. HikariDataSource ds = new HikariDataSource();
  32. ds.setDriverClassName("com.mysql.cj.jdbc.Driver");
  33. ds.setJdbcUrl(ModelUtil.getConfigValue("mysql.r7.yishou.url"));
  34. ds.setUsername(ModelUtil.getConfigValue("mysql.r7.username"));
  35. ds.setPassword(ModelUtil.getConfigValue("mysql.r7.password"));
  36. ds.setMaximumPoolSize(50);
  37. ds.setMinimumIdle(5);
  38. jdbcTemplate = new JdbcTemplate(ds);
  39. logger.info(
  40. "##### 使用HikariPool连接池初始化JdbcTemplate成功,使用的URL为:{} , 其中最大连接大小为:{} , 最小连接大小为:{} ;",
  41. ds.getJdbcUrl(),
  42. ds.getMaximumPoolSize(),
  43. ds.getMinimumIdle()
  44. );
  45. } catch (Exception e) {
  46. e.printStackTrace();
  47. throw new RuntimeException("创建 MySQL R7 数据库连接失败");
  48. }
  49. }
  50. }
  51. }
  52. return jdbcTemplate;
  53. }
  54. /**
  55. * 处理传入数据中的特殊字符(例如: 单引号)
  56. *
  57. * @param data 传入的数据
  58. * @return 返回的结果
  59. */
  60. public static String disposeSpecialCharacter(String data) {
  61. // 处理其中的单引号
  62. data = data.replace("'", "''");
  63. // 返回结果
  64. return data;
  65. }
  66. /**
  67. * 通过传入的表名,主键key,主键value,样例类 和 需要的值,获取对应的数据
  68. *
  69. * @param tableName 表名
  70. * @param primaryKey 主键字段
  71. * @param primaryValue 主键值
  72. * @param clz 返回的数据类型
  73. * @param fields 需要的字段名
  74. * @param <T> 样例类
  75. * @return 样例类集合
  76. */
  77. public static <T> List<T> queryListByKey(String tableName, String primaryKey, String primaryValue, Class<T> clz, String... fields) {
  78. // 拼接SQL
  79. String sql = " select " +
  80. Arrays.stream(fields).map(String::valueOf).collect(Collectors.joining(",")) +
  81. " from " + tableName +
  82. " where " + primaryKey + " = '" + disposeSpecialCharacter(primaryValue) + "'";
  83. // 执行SQL并返回结果
  84. return queryList(sql, clz);
  85. }
  86. /**
  87. * 如果传入的clz中的属性又包含对象,会报错,此时传入JSONObject对象即可
  88. *
  89. * @param sql 执行的查询语句
  90. * @param clz 返回的数据类型
  91. * @param <T> 样例类
  92. * @return 样例类集合
  93. */
  94. public static <T> List<T> queryList(String sql, Class<T> clz) {
  95. try {
  96. List<Map<String, Object>> mapList = MySQLR7Util.getJdbcTemplate().queryForList(sql);
  97. List<T> resultList = new ArrayList<>();
  98. for (Map<String, Object> map : mapList) {
  99. Set<String> keys = map.keySet();
  100. // 当返回的结果中存在数据,通过反射将数据封装成样例类对象
  101. T result = clz.newInstance();
  102. for (String key : keys) {
  103. BeanUtils.setProperty(
  104. result,
  105. key,
  106. map.get(key)
  107. );
  108. }
  109. resultList.add(result);
  110. }
  111. return resultList;
  112. } catch (Exception exception) {
  113. exception.printStackTrace();
  114. throw new RuntimeException(
  115. "\r\n从 MySQL R7 数据库中 查询 数据失败," +
  116. "\r\n抛出的异常信息为:" + exception.getMessage() +
  117. "\r\n查询的SQL为:" + sql
  118. );
  119. }
  120. }
  121. /**
  122. * 如果传入的clz中的属性又包含对象,会报错,此时传入JSONObject对象即可
  123. *
  124. * @param sql 执行的查询语句
  125. * @param underScoreToCamel 是否将下划线转换为驼峰命名法
  126. * @param clz 返回的数据类型
  127. * @param <T> 样例类
  128. * @return 样例类集合
  129. */
  130. public static <T> List<T> queryList(String sql, boolean underScoreToCamel, Class<T> clz) {
  131. try {
  132. List<Map<String, Object>> mapList = MySQLR7Util.getJdbcTemplate().queryForList(sql);
  133. List<T> resultList = new ArrayList<>();
  134. for (Map<String, Object> map : mapList) {
  135. Set<String> keys = map.keySet();
  136. // 当返回的结果中存在数据,通过反射将数据封装成样例类对象
  137. T result = clz.newInstance();
  138. for (String key : keys) {
  139. String propertyName = underScoreToCamel ? CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, key) : key;
  140. BeanUtils.setProperty(
  141. result,
  142. propertyName,
  143. map.get(key)
  144. );
  145. }
  146. resultList.add(result);
  147. }
  148. return resultList;
  149. } catch (Exception exception) {
  150. exception.printStackTrace();
  151. throw new RuntimeException(
  152. "\r\n从 MySQL R7 数据库中 查询 数据失败," +
  153. "\r\n抛出的异常信息为:" + exception.getMessage() +
  154. "\r\n查询的SQL为:" + sql
  155. );
  156. }
  157. }
  158. }

注:其他相关文章链接由此进 -> Flink文章汇总


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/692567
推荐阅读
相关标签
  

闽ICP备14008679号