赞
踩
ShardingJDBC的整体框架如图所示,主要分为以下5个阶段。
解析过程分为词法解析和语法解析。 词法解析器用于将SQL拆解为不可再分的原子符号,称为Token。并根据不同数据库方言所提供的字典,将其归类为关键字,表达式,字面量和操作符。 再使用语法解析器将SQL转换为抽象语法树(简称AST, Abstract Syntax Tree)。
根据解析上下文匹配数据库和表的分片策略,并生成路由路径。对于携带分片键的 SQL,根据分片键的不同可以划分为单片路由(分片键的操作符是等号)、多片路由(分片键的操作符是 IN)和范围路由(分片键的操作符是 BETWEEN)。不携带分片键的 SQL 则采用广播路由。
分片策略通常可以采用由数据库内置或由用户方配置。数据库内置的方案较为简单,内置的分片策略大致可分为尾数取模、哈希、范围、标签、时间等。由用户方配置的分片策略则更加灵活,可以根据使用方需求定制复合分片策略。
首先,在数据方言方面。Apache ShardingSphere 提供了 SQL 方言翻译的能力,能否实现数据库方言之间的自动转换。例如,用户可以使用 MySQL 客户端连接 ShardingSphere 并发送基于 MySQL 方言的 SQL,ShardingSphere 能自动识别用户协议与存储节点类型自动完成 SQL 方言转换,访问 PostgreSQL 等异构存储节点。
ShardingSphere 采用一套自动化的执行引擎,负责将路由和改写完成之后的真实 SQL 安全且高效发送到底层数据源执行。它不是简单地将 SQL 通过 JDBC 直接发送至数据源执行;也并非直接将执行请求放入线程池去并发执行。它更关注平衡数据源连接创建以及内存占用所产生的消耗,以及最大限度地合理利用并发等问题。执行引擎的目标是自动化的平衡资源控制与执行效率。
将从各个数据节点获取的多数据结果集,组合成为一个结果集并正确的返回至请求客户端,称为结果归并。
用于分片的数据库字段,是将数据库(表)水平拆分的关键字段。例:将订单表中的订单主键的尾数取模分片,则订单主键为分片字段。 SQL中如果无分片字段,将执行全路由,性能较差。 除了对单分片字段的支持,ShardingSphere也支持根据多个字段进行分片。
通过分片算法将数据分片,支持通过=、BETWEEN和IN分片。
目前提供4种分片算法。由于分片算法和业务实现紧密相关,因此并未提供内置分片算法,而是通过分片策略将各种场景提炼出来,提供更高层级的抽象,并提供接口让应用开发者自行实现分片算法。
对应PreciseShardingAlgorithm,用于处理使用单一键作为分片键的=与IN进行分片的场景。需要配合StandardShardingStrategy使用。
对应RangeShardingAlgorithm,用于处理使用单一键作为分片键的BETWEEN AND进行分片的场景。需要配合StandardShardingStrategy使用。
对应ComplexKeysShardingAlgorithm,用于处理使用多键作为分片键进行分片的场景,包含多个分片键的逻辑较复杂,需要应用开发者自行处理其中的复杂度。需要配合ComplexShardingStrategy使用。
对应HintShardingAlgorithm,用于处理使用Hint行分片的场景。需要配合HintShardingStrategy使用。
包含分片键和分片算法,由于分片算法的独立性,将其独立抽离。真正可用于分片操作的是分片键 + 分片算法,也就是分片策略。目前提供5种分片策略。
对应StandardShardingStrategy。提供对SQL语句中的=, IN和BETWEEN AND的分片操作支持。StandardShardingStrategy只支持单分片键,提供PreciseShardingAlgorithm和RangeShardingAlgorithm两个分片算法。PreciseShardingAlgorithm是必选的,用于处理=和IN的分片。RangeShardingAlgorithm是可选的,用于处理BETWEEN AND分片,如果不配置RangeShardingAlgorithm,SQL中的BETWEEN AND将按照全库路由处理。
对应ComplexShardingStrategy。复合分片策略。提供对SQL语句中的=, IN和BETWEEN AND的分片操作支持。ComplexShardingStrategy支持多分片键,由于多分片键之间的关系复杂,因此并未进行过多的封装,而是直接将分片键值组合以及分片操作符透传至分片算法,完全由应用开发者实现,提供最大的灵活度。
对应InlineShardingStrategy。使用Groovy的表达式,提供对SQL语句中的=和IN的分片操作支持,只支持单分片键。对于简单的分片算法,可以通过简单的配置使用,从而避免繁琐的Java代码开发,如: t_user_$->{u_id % 8} 表示t_user表根据u_id模8,而分成8张表,表名称为t_user_0到t_user_7。
对应HintShardingStrategy。通过Hint而非SQL解析的方式分片的策略。
对应NoneShardingStrategy。不分片的策略。
ShardingJDBC的本质其实是生成一个带有分库分表功能的ShardingSphereDatasource。他就是我们经常使用的DataSource接口的实现类。因此,我们也可以尝试构建一个JDBC的应用,作为源码阅读的起点。
import com.zaxxer.hikari.HikariDataSource; import org.apache.shardingsphere.driver.api.ShardingSphereDataSourceFactory; import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration; import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration; import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration; import org.apache.shardingsphere.sharding.api.config.strategy.keygen.KeyGenerateStrategyConfiguration; import org.apache.shardingsphere.sharding.api.config.strategy.sharding.StandardShardingStrategyConfiguration; import javax.sql.DataSource; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * @author yangnk * @desc * @date 2023/10/12 08:51 **/ public class ShardingJDBCDemo { public static void main(String[] args) throws SQLException { //=======一、配置数据库 Map<String, DataSource> dataSourceMap = new HashMap<>(2);//为两个数据库的datasource // 配置第一个数据源 HikariDataSource dataSource0 = new HikariDataSource(); dataSource0.setDriverClassName("com.mysql.cj.jdbc.Driver"); dataSource0.setJdbcUrl("jdbc:mysql://localhost:3306/coursedb?serverTimezone=GMT%2B8&useSSL=false"); dataSource0.setUsername("root"); dataSource0.setPassword("root"); dataSourceMap.put("m0", dataSource0); // 配置第二个数据源 HikariDataSource dataSource1 = new HikariDataSource(); dataSource1.setDriverClassName("com.mysql.cj.jdbc.Driver"); dataSource1.setJdbcUrl("jdbc:mysql://localhost:3306/coursedb2?serverTimezone=GMT%2B8&useSSL=false"); dataSource1.setUsername("root"); dataSource1.setPassword("root"); dataSourceMap.put("m1", dataSource1); //=======二、配置分库分表策略 ShardingRuleConfiguration shardingRuleConfig = createRuleConfig(); //三、配置属性值 Properties properties = new Properties(); //打开日志输出 4.x版本是sql.show,5.x版本变成了sql-show properties.setProperty("sql-show", "true"); //K1 创建ShardingSphere的数据源 ShardingDataSource DataSource dataSource = ShardingSphereDataSourceFactory.createDataSource(dataSourceMap, Collections.singleton(shardingRuleConfig), properties); //-------------测试部分-----------------// ShardingJDBCDemo test = new ShardingJDBCDemo(); //建表 // test.droptable(dataSource); // test.createtable(dataSource); //插入数据 // test.addcourse(dataSource); //K1 调试的起点 查询数据 test.querycourse(dataSource); } private static ShardingRuleConfiguration createRuleConfig(){ ShardingRuleConfiguration result = new ShardingRuleConfiguration(); //spring.shardingsphere.rules.sharding.tables.course.actual-data-nodes=m$->{0..1}.course_$->{1..2} ShardingTableRuleConfiguration courseTableRuleConfig = new ShardingTableRuleConfiguration("course", "m$->{0..1}.course_$->{1..2}"); //spring.shardingsphere.rules.sharding.key-generators.alg_snowflake.type=SNOWFLAKE //spring.shardingsphere.rules.sharding.key-generators.alg_snowflake.props.worker.id=1 Properties snowflakeprop = new Properties(); snowflakeprop.setProperty("worker.id", "123"); result.getKeyGenerators().put("alg_snowflake", new AlgorithmConfiguration("SNOWFLAKE", snowflakeprop)); //spring.shardingsphere.rules.sharding.tables.course.key-generate-strategy.column=cid //spring.shardingsphere.rules.sharding.tables.course.key-generate-strategy.key-generator-name=alg_snowflake courseTableRuleConfig.setKeyGenerateStrategy(new KeyGenerateStrategyConfiguration("cid","alg_snowflake")); //spring.shardingsphere.rules.sharding.tables.course.database-strategy.standard.sharding-column=cid //spring.shardingsphere.rules.sharding.tables.course.database-strategy.standard.sharding-algorithm-name=course_db_alg courseTableRuleConfig.setDatabaseShardingStrategy(new StandardShardingStrategyConfiguration("cid","course_db_alg")); //spring.shardingsphere.rules.sharding.sharding-algorithms.course_db_alg.type=MOD //spring.shardingsphere.rules.sharding.sharding-algorithms.course_db_alg.props.sharding-count=2 Properties modProp = new Properties(); modProp.put("sharding-count",2); result.getShardingAlgorithms().put("course_db_alg",new AlgorithmConfiguration("MOD",modProp)); //spring.shardingsphere.rules.sharding.tables.course.table-strategy.standard.sharding-column=cid //spring.shardingsphere.rules.sharding.tables.course.table-strategy.standard.sharding-algorithm-name=course_tbl_alg courseTableRuleConfig.setTableShardingStrategy(new StandardShardingStrategyConfiguration("cid","course_tbl_alg")); //#spring.shardingsphere.rules.sharding.sharding-algorithms.course_tbl_alg.type=INLINE //#spring.shardingsphere.rules.sharding.sharding-algorithms.course_tbl_alg.props.algorithm-expression=course_$->{cid%2+1} Properties inlineProp = new Properties(); inlineProp.setProperty("algorithm-expression", "course_$->{((cid+1)%4).intdiv(2)+1}"); result.getShardingAlgorithms().put("course_tbl_alg",new AlgorithmConfiguration("INLINE",inlineProp)); result.getTables().add(courseTableRuleConfig); return result; } //添加10条课程记录 public void addcourse(DataSource dataSource) throws SQLException { for (int i = 1; i < 10; i++) { long orderId = executeAndGetGeneratedKey(dataSource, "INSERT INTO course (cname, user_id, cstatus) VALUES ('java'," + i + ", '1')"); System.out.println("添加课程成功,课程ID:" + orderId); } } public void querycourse(DataSource dataSource) throws SQLException { Connection conn = null; try { //ShardingConnectioin conn = dataSource.getConnection(); //ShardingStatement Statement statement = conn.createStatement(); String sql = "SELECT cid,cname,user_id,cstatus from course where cid=851198093910081536"; //ShardingResultSet ResultSet result = statement.executeQuery(sql); while (result.next()) { System.out.println("result:" + result.getLong("cid")); } } catch (SQLException e) { e.printStackTrace(); } finally { if (null != conn) { conn.close(); } } } private void execute(final DataSource dataSource, final String sql) throws SQLException { try ( Connection conn = dataSource.getConnection(); Statement statement = conn.createStatement()) { statement.execute(sql); } } private long executeAndGetGeneratedKey(final DataSource dataSource, final String sql) throws SQLException { long result = -1; try ( Connection conn = dataSource.getConnection(); Statement statement = conn.createStatement()) { statement.executeUpdate(sql, Statement.RETURN_GENERATED_KEYS); ResultSet resultSet = statement.getGeneratedKeys(); if (resultSet.next()) { result = resultSet.getLong(1); } } return result; } /** * -----------------------------表初始化-------------------------------- */ public void droptable(DataSource dataSource) throws SQLException { execute(dataSource, "DROP TABLE IF EXISTS course_1"); execute(dataSource, "DROP TABLE IF EXISTS course_2"); } public void createtable(DataSource dataSource) throws SQLException { execute(dataSource, "CREATE TABLE course_1 (cid BIGINT(20) PRIMARY KEY,cname VARCHAR(50) NOT NULL,user_id BIGINT(20) NOT NULL,cstatus varchar(10) NOT NULL);"); execute(dataSource, "CREATE TABLE course_2 (cid BIGINT(20) PRIMARY KEY,cname VARCHAR(50) NOT NULL,user_id BIGINT(20) NOT NULL,cstatus varchar(10) NOT NULL);"); } }
pom依赖:
<dependencies> <!-- shardingJDBC核心依赖 --> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId> <version>5.2.1</version> <exclusions> <exclusion> <artifactId>snakeyaml</artifactId> <groupId>org.yaml</groupId> </exclusion> </exclusions> </dependency> <!-- 坑爹的版本冲突 --> <dependency> <groupId>org.yaml</groupId> <artifactId>snakeyaml</artifactId> <version>1.33</version> </dependency> <!--XA 分布式事务 --> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>shardingsphere-transaction-xa-core</artifactId> <version>5.2.1</version> <exclusions> <exclusion> <artifactId>transactions-jdbc</artifactId> <groupId>com.atomikos</groupId> </exclusion> <exclusion> <artifactId>transactions-jta</artifactId> <groupId>com.atomikos</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.20</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- <dependency>--> <!-- <groupId>org.apache.shardingsphere</groupId>--> <!-- <artifactId>sharding-jdbc-spring-boot-starter</artifactId>--> <!-- <version>4.0.0-RC1</version>--> <!-- </dependency>--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.0.5</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- <dependency>--> <!-- <groupId>org.apache.shardingsphere</groupId>--> <!-- <artifactId>shardingsphere-jdbc-core</artifactId>--> <!-- <version>5.2.1</version>--> <!-- </dependency>--> </dependencies>
application.properties配置文件:
# sharding-jdbc 水平分表策略 # 配置数据源,给数据源起别名 spring.shardingsphere.datasource.names=m1 # 一个实体类对应两张表,覆盖 spring.main.allow-bean-definition-overriding=true # 配置数据源的具体内容,包含连接池,驱动,地址,用户名,密码 spring.shardingsphere.datasource.m1.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.m1.driver-class-name=com.mysql.cj.jdbc.Driver spring.shardingsphere.datasource.m1.url=jdbc:mysql://xxxx:3306/test?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true spring.shardingsphere.datasource.m1.username=root spring.shardingsphere.datasource.m1.password=xxxx # 指定course表分布的情况,配置表在哪个数据库里,表的名称都是什么 m1.course_1,m1.course_2 spring.shardingsphere.sharding.tables.course.actual-data-nodes=m1.course_$->{1..2} # 指定 course 表里面主键 cid 的生成策略 SNOWFLAKE spring.shardingsphere.sharding.tables.course.key-generator.column=cid spring.shardingsphere.sharding.tables.course.key-generator.type=SNOWFLAKE # 配置分表策略 约定 cid 值偶数添加到 course_1 表,如果 cid 是奇数添加到 course_2 表 spring.shardingsphere.sharding.tables.course.table-strategy.inline.sharding-column=cid spring.shardingsphere.sharding.tables.course.table-strategy.inline.algorithm-expression=course_$->{cid % 2 + 1} # 打开 sql 输出日志 spring.shardingsphere.props.sql.show=true spring.shardingsphere.mode.type=Standalone spring.shardingsphere.mode.repository.type=File spring.shardingsphere.mode.overwrite=true orithms.course_tbl_alg.props.algorithm-expression=course_$->{cid%2+1}
在application.properties配置文件中配置主键生成规则:
spring.shardingsphere.rules.sharding.key-generators.course_cid_alg.type=MYKEY
以下是主键生成策略的核心代码,调用newInstance(final AlgorithmConfiguration keyGenerateAlgorithmConfig) 会生成对应的主键生成策略。
/** * Key generate algorithm factory. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class KeyGenerateAlgorithmFactory { static { ShardingSphereServiceLoader.register(KeyGenerateAlgorithm.class); } /** * Create new instance of key generate algorithm. * * @return created instance */ public static KeyGenerateAlgorithm newInstance() { return RequiredSPIRegistry.getRegisteredService(KeyGenerateAlgorithm.class); } /** * Create new instance of key generate algorithm. * * @param keyGenerateAlgorithmConfig key generate algorithm configuration * @return created instance */ public static KeyGenerateAlgorithm newInstance(final AlgorithmConfiguration keyGenerateAlgorithmConfig) { return ShardingSphereAlgorithmFactory.createAlgorithm(keyGenerateAlgorithmConfig, KeyGenerateAlgorithm.class); } /** * Judge whether contains key generate algorithm. * * @param keyGenerateAlgorithmType key generate algorithm type * @return contains key generate algorithm or not */ public static boolean contains(final String keyGenerateAlgorithmType) { return TypedSPIRegistry.findRegisteredService(KeyGenerateAlgorithm.class, keyGenerateAlgorithmType).isPresent(); } }
在application.properties配置文件中配置分片策略:
spring.shardingsphere.rules.sharding.sharding-algorithms.course_tbl_alg.type=MYCOMPLEX
以下是生成分片策略的工厂类,核心是调用newInstance()方法创建分片策略。
import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration; import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory; import org.apache.shardingsphere.sharding.spi.ShardingAlgorithm; import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader; import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry; /** * Sharding algorithm factory. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class ShardingAlgorithmFactory { static { ShardingSphereServiceLoader.register(ShardingAlgorithm.class); } /** * Create new instance of sharding algorithm. * * @param shardingAlgorithmConfig sharding algorithm configuration * @return created instance */ public static ShardingAlgorithm newInstance(final AlgorithmConfiguration shardingAlgorithmConfig) { return ShardingSphereAlgorithmFactory.createAlgorithm(shardingAlgorithmConfig, ShardingAlgorithm.class); } /** * Judge whether contains sharding algorithm. * * @param shardingAlgorithmType sharding algorithm type * @return contains sharding algorithm or not */ public static boolean contains(final String shardingAlgorithmType) { return TypedSPIRegistry.findRegisteredService(ShardingAlgorithm.class, shardingAlgorithmType).isPresent(); } }
以上的分布式主键生成和分片策略生成都可以自定义实现,该自定义实现是通过SPI机制实现的。
SPI机制指系统里抽象的各个模块,往往有很多不同的实现方案,比如日志模块的方案,xml解析模块、jdbc模块的方案等。面向的对象的设计里,我们一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。
Java SPI 的具体约定为:当服务的提供者,提供了服务接口的一种实现之后,在jar包的 META-INF/services/ 目录里同时创建一个以服务接口命名的文件。该文件里就是实现该服务接口的具体实现类。
而当外部程序装配这个模块的时候,就能通过该jar包META-INF/services/里的配置文件找到具体的实现类名,并装载实例化,完成模块的注入。
基于这样一个约定就能很好的找到服务接口的实现类,而不需要再代码里制定。jdk提供服务实现查找的一个工具类:java.util.ServiceLoader。在主键生成策略和分片策略中的源码中都能看到ServiceLoader.load(serviceInterface)
这样的方法,这就是通过SPI来加载自定义策略。
import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.SneakyThrows; import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.Map; import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; /** * ShardingSphere service loader. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class ShardingSphereServiceLoader { private static final Map<Class<?>, Collection<Object>> SERVICES = new ConcurrentHashMap<>(); /** * Register service. * * @param serviceInterface service interface */ public static void register(final Class<?> serviceInterface) { if (!SERVICES.containsKey(serviceInterface)) { SERVICES.put(serviceInterface, load(serviceInterface)); } } private static <T> Collection<Object> load(final Class<T> serviceInterface) { Collection<Object> result = new LinkedList<>(); for (T each : ServiceLoader.load(serviceInterface)) { result.add(each); } return result; }
ShardingSphere 核心原理精讲-完:https://learn.lianglianglee.com/%E4%B8%93%E6%A0%8F/ShardingSphere%20%E6%A0%B8%E5%BF%83%E5%8E%9F%E7%90%86%E7%B2%BE%E8%AE%B2-%E5%AE%8C/(非常详细)
ShardingSphere内核原理及核心源码剖析:https://blog.csdn.net/qq_43631716/article/details/120390636
ShardingJDBC核心源码以及内核解析:https://note.youdao.com/ynoteshare/index.html?id=84be7bce3683baa7b953e50d67259417&type=note&_time=1697027698834
本文由博客一文多发平台 OpenWrite 发布!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。