当前位置:   article > 正文

Springboot+shardingsphere实现一库多表分表(对多个表进行分表操作)_shardingspheredatasourcefactory.createdatasource

shardingspheredatasourcefactory.createdatasource

        Springboot+shardingsphere实现多一个数据库里面的多个表进行分表操作,本文是针对mysql数据库中,两个表进行分表操作,实现根据分表字段自动创建不存在的表以及自动根据分表字段实现查询数据库操作。所使用的主要技术栈为shardingsphere+sqi。

maven依赖

  1. <dependency>
  2. <groupId>org.apache.shardingsphere</groupId>
  3. <artifactId>shardingsphere-jdbc-core</artifactId>
  4. <version>5.0.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>velocity</groupId>
  8. <artifactId>velocity</artifactId>
  9. <version>1.4</version>
  10. </dependency>

config文件

  1. @Slf4j
  2. @Configuration
  3. public class ShardingJdbcConfig {
  4. private final DataSourceProperties dataSourceProperties;
  5. public static HikariDataSource hikariDataSource;
  6. static {
  7. Velocity.setProperty("resource.loader", "class");
  8. Velocity.setProperty("class.resource.loader.class",
  9. ClasspathResourceLoader.class.getName());
  10. try {
  11. Velocity.init();
  12. } catch (Exception e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. public ShardingJdbcConfig(DataSourceProperties dataSourceProperties) {
  17. this.dataSourceProperties = dataSourceProperties;
  18. }
  19. public HikariDataSource jdbcDatasource() {
  20. HikariDataSource dataSource = new HikariDataSource();
  21. BeanUtils.copyProperties(dataSourceProperties,dataSource);
  22. dataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
  23. dataSource.setJdbcUrl(dataSourceProperties.getUrl());
  24. dataSource.setUsername(dataSourceProperties.getUsername());
  25. dataSource.setPassword(dataSourceProperties.getPassword());
  26. dataSource.setConnectionTimeout(1000 * 60);
  27. hikariDataSource = dataSource;
  28. return dataSource;
  29. }
  30. @Bean
  31. @Primary
  32. @Qualifier("ds")
  33. public DataSource dataSource() throws SQLException {
  34. Map<String, DataSource> dataSourceMap = new HashMap<>();
  35. HikariDataSource dataSource = new HikariDataSource();
  36. dataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
  37. dataSource.setJdbcUrl(dataSourceProperties.getUrl());
  38. dataSource.setUsername(dataSourceProperties.getUsername());
  39. dataSource.setPassword(dataSourceProperties.getPassword());
  40. dataSourceMap.put("ds", dataSource);
  41. // 配置 record 表规则
  42. ShardingTableRuleConfiguration recordTableRuleConfiguration = new ShardingTableRuleConfiguration("t_test_table", getActualDataNodes());
  43. // 配置分表策略
  44. recordTableRuleConfiguration.setTableShardingStrategy(new StandardShardingStrategyConfiguration("create_time", KafkaTestTableStandardShardingAlgorithm.TYPE));
  45. // 配置分片规则
  46. ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
  47. shardingRuleConfig.getTables().add(recordTableRuleConfiguration);
  48. // 配置分表算法
  49. Properties tableShardingAlgorithmProps = new Properties();
  50. tableShardingAlgorithmProps.put("jdbcDatasource", jdbcDatasource());
  51. shardingRuleConfig.getShardingAlgorithms().put(KafkaTestTableStandardShardingAlgorithm.TYPE,
  52. new ShardingSphereAlgorithmConfiguration(KafkaTestTableStandardShardingAlgorithm.TYPE, tableShardingAlgorithmProps)
  53. );
  54. // 配置 t_redp_face_match_event 表规则
  55. ShardingTableRuleConfiguration recordTableRuleConfigurationRedp = new ShardingTableRuleConfiguration("t_kafka_test", getActualDataNodesRedp());
  56. // 配置分表策略
  57. recordTableRuleConfigurationRedp.setTableShardingStrategy(new StandardShardingStrategyConfiguration("create_time", KafkaTestTableStandardShardingAlgorithm.TYPE)
  58. );
  59. // 配置分片规则
  60. shardingRuleConfig.getTables().add(recordTableRuleConfigurationRedp);
  61. // 配置分表算法
  62. shardingRuleConfig.getShardingAlgorithms().put(KafkaTestTableStandardShardingAlgorithm.TYPE, new ShardingSphereAlgorithmConfiguration(KafkaTestTableStandardShardingAlgorithm.TYPE, tableShardingAlgorithmProps));
  63. List listConfig = new ArrayList<ShardingRuleConfiguration>(){
  64. {
  65. add(shardingRuleConfig);
  66. }
  67. };
  68. // 创建 ShardingSphereDataSource
  69. return ShardingSphereDataSourceFactory.createDataSource(dataSourceMap, listConfig, tableShardingAlgorithmProps);
  70. }
  71. private String getActualDataNodes() {
  72. return "ds.t_test_table${2021..2099}0${1..9},ds.t_test_table${2021..2099}1${0..2}";
  73. }
  74. private String getActualDataNodesRedp() {
  75. return "ds.t_kafka_test${2021..2099}0${1..9},ds.t_kafka_test${2021..2099}1${0..2}";
  76. }
  77. }

两个表的分表算法逻辑

  1. @Slf4j
  2. public class KafkaTestTableStandardShardingAlgorithm implements StandardShardingAlgorithm<Date> {
  3. public static final String TYPE = "KafkaTestTableStandardShardingAlgorithm";
  4. private List<DataSource> dataSources;
  5. private final Set<String> TABLES = new ConcurrentHashSet<>();
  6. protected Properties props;
  7. protected String initDate;
  8. private static ThreadLocal<DateFormat> dateformat = new ThreadLocal<>();
  9. @Override
  10. public String doSharding(Collection<String> collection, PreciseShardingValue<Date> shardingValue) {
  11. StringBuilder tableName = new StringBuilder();
  12. Date value = shardingValue.getValue();
  13. tableName.append(shardingValue.getLogicTableName()).append(getDateFormatter().format(value));
  14. log.info("执行操作的表名{}",tableName.toString());
  15. if (!TABLES.contains(tableName.toString())) {
  16. createTable(tableName.toString());
  17. }
  18. return tableName.toString();
  19. }
  20. @Override
  21. public Collection<String> doSharding(Collection<String> collection, RangeShardingValue<Date> rangeShardingValue) {
  22. Collection<String> result = new LinkedHashSet<>();
  23. Range<Date> shardingKey = rangeShardingValue.getValueRange();
  24. Date startTime = shardingKey.lowerEndpoint();
  25. Date endTime = shardingKey.upperEndpoint();
  26. // 获取起始,终止时间范围
  27. Date now = new Date();
  28. if (startTime.after(now)) {
  29. startTime = now;
  30. }
  31. if (endTime.after(now)) {
  32. endTime = now;
  33. }
  34. Collection<String> tables = getRoutTable(rangeShardingValue.getLogicTableName(), startTime, endTime);
  35. if (tables != null && tables.size() > 0) {
  36. result.addAll(tables);
  37. }
  38. return result;
  39. }
  40. @Override
  41. public void init() {
  42. this.dataSources = new ArrayList<DataSource>(){
  43. {
  44. add(ShardingJdbcConfig.hikariDataSource);
  45. }
  46. };
  47. this.syncTable();
  48. }
  49. @Override
  50. public String getType() {
  51. return TYPE;
  52. }
  53. @Override
  54. public Properties getProps() {
  55. return props;
  56. }
  57. @Override
  58. public void setProps(Properties props) {
  59. this.props = props;
  60. }
  61. private void syncTable() {
  62. String sql = "select TABLE_NAME from information_schema.TABLES " +
  63. "where TABLE_NAME like 't_kafka_test%'";
  64. for (DataSource dataSource : this.dataSources) {
  65. try (PreparedStatement preparedStatement = dataSource.getConnection().prepareStatement(sql);
  66. ResultSet rs = preparedStatement.executeQuery()) {
  67. while (rs.next()) {
  68. TABLES.add(rs.getString(1));
  69. }
  70. } catch (SQLException e) {
  71. log.error("sync table error:{}", e.getMessage(), e);
  72. }
  73. }
  74. }
  75. private void createTable(String tableName) {
  76. try {
  77. Template template =
  78. Velocity.getTemplate("template/ddlKafkaTest.template");
  79. VelocityContext context = new VelocityContext();
  80. context.put("tableName", tableName);
  81. StringWriter sw = new StringWriter();
  82. template.merge(context, sw);
  83. Resource resource = new ByteArrayResource(sw.toString().getBytes(StandardCharsets.UTF_8));
  84. for (DataSource dataSource : this.dataSources) {
  85. try (Connection connection = dataSource.getConnection()) {
  86. ScriptUtils.executeSqlScript(connection, resource);
  87. }
  88. }
  89. TABLES.add(tableName);
  90. } catch (Exception e) {
  91. log.error("create table({}) error:{}", tableName, e.getMessage(), e);
  92. }
  93. }
  94. private static DateFormat getDateFormatter(){
  95. DateFormat dateFormat = dateformat.get();
  96. if (dateFormat == null) {
  97. dateFormat = new SimpleDateFormat("yyyyMM");
  98. dateformat.set(dateFormat);
  99. }
  100. return dateFormat;
  101. }
  102. private Collection<String> getRoutTable(String logicTableName, Date startTime, Date endTime) {
  103. Set<String> rouTables = new HashSet<>();
  104. if (startTime != null && endTime != null) {
  105. List<String> rangeNameList = getRangeNameList(startTime, endTime);
  106. for (String YearMonth : rangeNameList) {
  107. String tableName = logicTableName + YearMonth;
  108. if (!TABLES.contains(tableName)) {
  109. createTable(tableName);
  110. }
  111. rouTables.add(tableName);
  112. }
  113. }
  114. return rouTables;
  115. }
  116. private static List<String> getRangeNameList(Date startTime, Date endTime) {
  117. List<String> result = Lists.newArrayList();
  118. DateTime beginOfMonth = cn.hutool.core.date.DateUtil.beginOfMonth(startTime);
  119. DateTime endOfMonth = cn.hutool.core.date.DateUtil.beginOfMonth(endTime);
  120. while (beginOfMonth.getTime() <= endOfMonth.getTime()) {
  121. result.add(getDateFormatter().format(beginOfMonth.getTime()));
  122. // 进行当前日期按月份 + 1
  123. beginOfMonth = beginOfMonth.offset(DateField.MONTH, 1);
  124. }
  125. return result;
  126. }
  127. }
  1. @Slf4j
  2. public class TestTableStandardShardingAlgorithm implements StandardShardingAlgorithm<Date> {
  3. public static final String TYPE = "TestTableStandardShardingAlgorithm";
  4. private List<DataSource> dataSources;
  5. private final Set<String> TABLES = new ConcurrentHashSet<>();
  6. protected Properties props;
  7. protected String initDate;
  8. private static ThreadLocal<DateFormat> dateformat = new ThreadLocal<>();
  9. @Override
  10. public String doSharding(Collection<String> collection, PreciseShardingValue<Date> shardingValue) {
  11. StringBuilder tableName = new StringBuilder();
  12. Date startTime = shardingValue.getValue();
  13. log.info("执行操作的表名{}", shardingValue.getLogicTableName() + getDateFormatter().format(startTime));
  14. tableName.append(shardingValue.getLogicTableName()).append(getDateFormatter().format(startTime));
  15. if (!TABLES.contains(tableName.toString())) {
  16. createTable(tableName.toString());
  17. }
  18. return tableName.toString();
  19. }
  20. @Override
  21. public Collection<String> doSharding(Collection<String> collection, RangeShardingValue<Date> rangeShardingValue) {
  22. Collection<String> result = new LinkedHashSet<>();
  23. Range<Date> shardingKey = rangeShardingValue.getValueRange();
  24. Date startTime = shardingKey.lowerEndpoint();
  25. Date endTime = shardingKey.upperEndpoint();
  26. // 获取起始,终止时间范围
  27. Date now = new Date();
  28. if (startTime.after(now)) {
  29. startTime = now;
  30. }
  31. if (endTime.after(now)) {
  32. endTime = now;
  33. }
  34. Collection<String> tables = getRoutTable(rangeShardingValue.getLogicTableName(), startTime, endTime);
  35. if (tables != null && tables.size() > 0) {
  36. result.addAll(tables);
  37. }
  38. return result;
  39. }
  40. @Override
  41. public void init() {
  42. this.dataSources = new ArrayList<DataSource>(){
  43. {
  44. add(ShardingJdbcConfig.hikariDataSource);
  45. }
  46. };
  47. this.syncTable();
  48. }
  49. @Override
  50. public String getType() {
  51. return TYPE;
  52. }
  53. @Override
  54. public Properties getProps() {
  55. return props;
  56. }
  57. @Override
  58. public void setProps(Properties props) {
  59. this.props = props;
  60. }
  61. private void syncTable() {
  62. String sql = "select TABLE_NAME from information_schema.TABLES " +
  63. "where TABLE_NAME like 't_test_table%'";
  64. for (DataSource dataSource : this.dataSources) {
  65. try (PreparedStatement preparedStatement =
  66. dataSource.getConnection().prepareStatement(sql);
  67. ResultSet rs = preparedStatement.executeQuery()) {
  68. while (rs.next()) {
  69. TABLES.add(rs.getString(1));
  70. }
  71. } catch (SQLException e) {
  72. log.error("sync table error:{}", e.getMessage(), e);
  73. }
  74. }
  75. }
  76. private void createTable(String tableName) {
  77. try {
  78. Template template =
  79. Velocity.getTemplate("template/ddl.template");
  80. VelocityContext context = new VelocityContext();
  81. context.put("tableName", tableName);
  82. StringWriter sw = new StringWriter();
  83. template.merge(context, sw);
  84. Resource resource =
  85. new ByteArrayResource(sw.toString().getBytes(StandardCharsets.UTF_8));
  86. for (DataSource dataSource : this.dataSources) {
  87. try (Connection connection = dataSource.getConnection()) {
  88. ScriptUtils.executeSqlScript(connection, resource);
  89. }
  90. }
  91. TABLES.add(tableName);
  92. } catch (Exception e) {
  93. log.error("create table({}) error:{}", tableName, e.getMessage(), e);
  94. }
  95. }
  96. private static DateFormat getDateFormatter(){
  97. DateFormat dateFormat = dateformat.get();
  98. if (dateFormat == null) {
  99. dateFormat = new SimpleDateFormat("yyyyMM");
  100. dateformat.set(dateFormat);
  101. }
  102. return dateFormat;
  103. }
  104. private Collection<String> getRoutTable(String logicTableName, Date startTime, Date endTime) {
  105. Set<String> rouTables = new HashSet<>();
  106. if (startTime != null && endTime != null) {
  107. List<String> rangeNameList = getRangeNameList(startTime, endTime);
  108. for (String YearMonth : rangeNameList) {
  109. String tableName = logicTableName + YearMonth;
  110. if (!TABLES.contains(tableName)) {
  111. createTable(tableName);
  112. }
  113. rouTables.add(tableName);
  114. }
  115. }
  116. return rouTables;
  117. }
  118. private static List<String> getRangeNameList(Date startTime, Date endTime) {
  119. List<String> result = Lists.newArrayList();
  120. DateTime beginOfMonth = cn.hutool.core.date.DateUtil.beginOfMonth(startTime);
  121. DateTime endOfMonth = cn.hutool.core.date.DateUtil.beginOfMonth(endTime);
  122. while (beginOfMonth.getTime() <= endOfMonth.getTime()) {
  123. result.add(getDateFormatter().format(beginOfMonth.getTime()));
  124. // 进行当前日期按月份 + 1
  125. beginOfMonth = beginOfMonth.offset(DateField.MONTH, 1);
  126. }
  127. return result;
  128. }
  129. }

数据库表模板文件

模板文件目录如下

 模板sql

  1. CREATE TABLE if not exists ${tableName} (
  2. `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  3. `order_id` varchar(200) NOT NULL,
  4. `name` varchar(255) DEFAULT NULL COMMENT '收货人',
  5. `address` varchar(255) DEFAULT NULL COMMENT '收货人地址',
  6. `phone` varchar(255) DEFAULT NULL COMMENT '联系电话',
  7. `create_time` datetime DEFAULT NULL,
  8. PRIMARY KEY (`id`),
  9. UNIQUE KEY `order_id_index` (`order_id`) USING BTREE
  10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

SPI配置

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/757119
推荐阅读
相关标签
  

闽ICP备14008679号