赞
踩
公司产品部收到了一些重要客户的需求,他们希望能够依赖独立的数据库存储来支持他们的业务数据。与此同时,仍有许多中小客户,可以继续使用公共库以满足其需求。技术实现方面,此前持久层框架使用的Mybatis-plus,部分业务场景使用到了Sharding-JDBC用于分表,另外,我们的数据库版本控制工具使用的是Flyway。
这里将方案进行简要说明,配置统一通过Nacos管理(有需要的可以自行定义租户配置页面)。
需要在Nacos提前维护租户与数据源关系配置。
<dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.flywaydb</groupId> <artifactId>flyway-core</artifactId> <version>7.15.0</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.4.1</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId> <version>3.4.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.2.6</version> </dependency>
spring: flyway: #关闭flyway自动配置,自定义实现 enabled: false datasource: dynamic: #默认数据源 primary: ds0 datasource: ds0: type: com.alibaba.druid.pool.DruidDataSource driverClassName: org.postgresql.Driver url: jdbc:postgresql://127.0.0.1:5432/ds0 username: ds0 password: ds0123 ds1: type: com.alibaba.druid.pool.DruidDataSource driverClassName: org.postgresql.Driver url: jdbc:postgresql://127.0.0.1:5432/ds1 username: ds1 password: ds1123
Java @Slf4j @Configuration @EnableTransactionManagement public class FlywayConfig { @Value("${spring.application.name}") private String appName; @Autowired private DataSource dataSource; @Bean public void migrate() { log.info("flyway开始逐数据源执行脚本"); DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource; Map<String, DataSource> dataSources = ds.getDataSources(); dataSources.forEach((k, v) -> { if (!"sharding".equals(k)) { // Flyway相关参数建议通过配置管理,以下代码仅供参考 Flyway flyway = Flyway.configure() .dataSource(v) .table("t_" + k + "_" + appName + "_version") .baselineOnMigrate(true) .outOfOrder(true) .baselineVersion("1.0.0") .baselineDescription(k + "初始化") .locations(CommonConstant.SQL_BASE_LOCATION + (CommonConstant.DEFAULT_DS_NAME.equals(k) ? CommonConstant.MASTER_DB : CommonConstant.TENANT_DB)) .load(); flyway.migrate(); log.info("flyway在 {} 数据源执行脚本成功", k); } }); } }
@Slf4j @Component @WebFilter(filterName = "dynamicDatasourceFilter", urlPatterns = {"/*"}) public class DynamicDatasourceFilter implements Filter { // 构建演示用租户与数据源关系配置 private static Map<String, String> tenantDsMap = new HashMap<>(); static { tenantDsMap.put("tenant123", "ds0"); tenantDsMap.put("tenant456", "ds0"); tenantDsMap.put("tenant789", "ds1"); } @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HttpServletRequest httpRequest = (HttpServletRequest) request; // 从请求头获取租户ID String tenantId = httpRequest.getHeader(CommonConstant.TENANT_HEADER); try { // 设置数据源 if (tenantDsMap.get(tenantId) == null) { // 如果根据租户ID未找到租户数据源配置,默认走主库 DynamicDataSourceContextHolder.push(CommonConstant.DEFAULT_DS_NAME); } else { //注意,如果是分片表,那么需要在分片表Service类或方法上加@DS("sharding")注解,最终由sharding的库分片策略决定SQL在哪个库执行。而这里的设置将会被@DS注解配置覆盖 DynamicDataSourceContextHolder.push(tenantDsMap.get(tenantId)); } // 执行 chain.doFilter(request, response); } catch (Exception e) { log.error("切换数据源失败,tenantId={},请求接口uri={},异常原因:{}", tenantId, httpRequest.getRequestURI(), ExceptionUtils.getStackTrace(e)); } finally { // 清空当前线程数据源 DynamicDataSourceContextHolder.poll(); } }
如果微服务还需要使用Sharding分片,那么还需要引入sharding-jdbc组件依赖,并配置sharding数据源和分片规则。如果是多个服务共用数据库,那么建议将Sharding数据源配置做为公共配置在Nacos管理,而Sharding分片规则则做为服务个性化配置单独维护(分片规则基本不需要动态变更),这样当有新租户需要申请开通独立租户库的时候,直接变更Sharding数据源公共配置,服务在监听到公共配置变更后,即可重新构建新的Sharding数据源实例和动态数据源更新,无需重启服务。
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-core</artifactId>
<version>4.1.1</version>
</dependency>
# sharding数据源配置 dataSources: ds0: !!com.alibaba.druid.pool.DruidDataSource driverClassName: org.postgresql.Driver url: jdbc:postgresql://127.0.0.1:5432/ds0 username: ds0 password: ds0123 ds1: !!com.alibaba.druid.pool.DruidDataSource driverClassName: org.postgresql.Driver url: jdbc:postgresql://127.0.0.1:5432/ds1 username: ds1 password: ds1123 ds2: !!com.alibaba.druid.pool.DruidDataSource driverClassName: org.postgresql.Driver url: jdbc:postgresql://127.0.0.1:5432/ds2 username: ds2 password: ds2123 # sharding分片规则配置 shardingRule: tables: t_order: actualDataNodes: ds$->{0..2}.t_order$->{0..1} tableStrategy: inline: shardingColumn: order_no algorithmExpression: t_order$->{order_no.toBigInteger() % 2} defaultDataSourceName: ds0 # 默认库分片策略 defaultDatabaseStrategy: standard: shardingColumn: tenant_id # 自定义精确分片策略 preciseAlgorithmClassName: cn.xtstu.demo.config.CustomDataSourcePreciseShardingAlgorithm #hint: # # algorithmClassName: cn.xtstu.demo.config.CustomHintShardingAlgorithm defaultTableStrategy: none: props: sql.show: true
public class CustomDataSourcePreciseShardingAlgorithm implements PreciseShardingAlgorithm<String> { // 构建演示用租户与数据源关系配置 private static Map<String, String> tenantDsMap = new HashMap<>(); static { tenantDsMap.put("tenant123", "ds0"); tenantDsMap.put("tenant456", "ds0"); tenantDsMap.put("tenant789", "ds1"); } @Override public String doSharding(Collection<String> dataSourceNames, PreciseShardingValue<String> shardingValue) { // 库分片策略配置的分片键是字段tenant_id,根据分片键查询配置的数据源 String dsName = tenantDsMap.get(shardingValue.getValue()); // 如果如前文所属,Sharding子数据源key与dynamic数据源key保持一致的话,这里直接返回就行了 return dsName; // TODO 需要处理未匹配到数据源的情况 } }
public class CustomHintShardingAlgorithm implements HintShardingAlgorithm<Integer> { // 构建演示用租户与数据源关系配置 private static Map<String, String> tenantDsMap = new HashMap<>(); static { tenantDsMap.put("tenant123", "ds0"); tenantDsMap.put("tenant456", "ds0"); tenantDsMap.put("tenant789", "ds1"); } @Override public Collection<String> doSharding(Collection<String> collection, HintShardingValue<Integer> hintShardingValue) { Collection<String> result = new ArrayList<>(); // 从请求头取到当前租户ID HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); result.add(tenantDsMap.get(request.getHeader("tenantId"))); // TODO 需要处理未匹配到数据源的情况 return result; } }
@Slf4j @Configuration public class CustomDynamicDataSourceConfig { @Value("${spring.cloud.nacos.config.extension-configs[0].data-id}") private String dataId; @Value("${spring.cloud.nacos.config.group:DEFAULT_GROUP}") private String group; @Resource private DynamicDataSourceProperties properties; @Resource private NacosHelper nacosHelper; /** * 启动时通过查询Nacos上sharding数据源及分片规则yaml配置初始化sharding-jdbc数据源 * * @return */ @Bean public ShardingDataSource shardingDataSource() { ConfigService configService = nacosHelper.getConfigService(); if (configService == null) { log.error("连接nacos失败"); } String configInfo = null; try { configInfo = configService.getConfig(dataId, group, 5000); } catch (NacosException e) { log.error("获取{}配置失败,异常原因:{}", dataId, ExceptionUtils.getStackTrace(e)); } if (StringUtils.isBlank(configInfo)) { log.error("{}配置为空,启动失败", dataId); throw new NullPointerException(dataId + "配置为空"); } try { // 通过工厂类和yaml配置创建Sharding数据源 return (ShardingDataSource) YamlShardingDataSourceFactory.createDataSource(configInfo.getBytes(StandardCharsets.UTF_8)); } catch (Exception e) { log.error("创建sharding-jdbc数据源异常:{}", ExceptionUtils.getStackTrace(e)); throw new NullPointerException("sharding-jdbc数据源为空"); } } /** * 将动态数据源设置为首选的 * 当spring存在多个数据源时, 自动注入的是首选的对象 * 设置为主要的数据源之后,就可以支持shardingJdbc原生的配置方式了 */ @Primary @Bean public DataSource dataSource() { DynamicRoutingDataSource dataSource = new DynamicRoutingDataSource(); dataSource.setPrimary(properties.getPrimary()); dataSource.setStrict(properties.getStrict()); dataSource.setStrategy(properties.getStrategy()); dataSource.setP6spy(properties.getP6spy()); dataSource.setSeata(properties.getSeata()); return dataSource; } /** * 初始化动态数据源 * * @return */ @Bean public DynamicDataSourceProvider dynamicDataSourceProvider(ShardingDataSource shardingDataSource) { return new AbstractDataSourceProvider() { @Override public Map<String, DataSource> loadDataSources() { Map<String, DataSource> dataSourceMap = new HashMap<>(); // 将sharding数据源整体添加到动态数据源里 dataSourceMap.put(CommonConstant.SHARDING_DS_NAME, shardingDataSource); // 同时把sharding内部管理的子数据源也添加到动态数据源里 Map<String, DataSource> shardingInnerDataSources = shardingDataSource.getDataSourceMap(); dataSourceMap.putAll(shardingInnerDataSources); return dataSourceMap; } }; } }
@Slf4j @Configuration public class NacosShardingConfigListener { @Value("${spring.cloud.nacos.config.extension-configs[0].data-id}") private String dataId; @Value("${spring.cloud.nacos.config.group:DEFAULT_GROUP}") private String group; @Value("${spring.application.name}") private String appName; @Autowired private DataSource dataSource; @Autowired private NacosHelper nacosHelper; @PostConstruct public void shardingConfigListener() throws Exception { ConfigService configService = nacosHelper.getConfigService(); if (configService == null) { return; } configService.addListener(dataId, group, new Listener() { @Override public Executor getExecutor() { return null; } @Override public void receiveConfigInfo(String configInfo) { log.info("configInfo:\n{}", configInfo); if (StringUtils.isBlank(configInfo)) { log.warn("sharding-jdbc配置为空,不会刷新数据源"); return; } try { if (StringUtils.isNotBlank(configInfo)) { // 通过yaml配置创建sharding数据源(注意:如果分片规则是独立配置文件,那么需要提前合并数据源和分片规则配置) ShardingDataSource shardingDataSource = (ShardingDataSource) YamlShardingDataSourceFactory.createDataSource(configInfo.getBytes(StandardCharsets.UTF_8)); Map<String, DataSource> shardingInnerDataSources = shardingDataSource.getDataSourceMap(); DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource; // 遍历sharding子数据源 for (String poolName : shardingInnerDataSources.keySet()) { // TODO 这里还有个细节,如果yaml配置删减了数据源,对应数据源应该要从ds中remove掉,且主数据源不能被remove。另外其实只有新增的数据源才需要执行flyway脚本 // 将sharding子数据源逐个添加到动态数据源 ds.addDataSource(poolName, shardingInnerDataSources.get(poolName)); // 通过代码完成数据源Flyway配置,并执行迁移操作 Flyway flyway = Flyway.configure() .dataSource(dataSource) .table("t_" + poolName + "_" + appName + "_version") .baselineOnMigrate(true) .outOfOrder(true) .baselineVersion("1.0.0") .baselineDescription(poolName + "初始化") .locations(CommonConstant.SQL_BASE_LOCATION + CommonConstant.TENANT_DB) .load(); flyway.migrate(); } // 将sharding数据源自身也添加到动态数据源 ds.addDataSource(CommonConstant.SHARDING_DS_NAME, shardingDataSource); log.info("动态数据源刷新完成,现有数据源:{}", JSONUtil.toJsonStr(ds.getDataSources().keySet())); } } catch (Exception e) { log.error("创建sharding-jdbc数据源异常:{}", ExceptionUtils.getStackTrace(e)); } } }); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。