当前位置:   article > 正文

Mycat+druid+zk实现多租户_mycat与druid

mycat与druid

前言

Mycat的应用场景之一就是实现多租户,多租户应用,每个应用一个库,但应用程序只连接 Mycat,从而不改造程序本身,实现多租户化;接下来我们使用mycat,结合druid拦截sql添加注释头,利用zk修改mycat配置文件中的schema、dataNode节点等信息,来实现多租户。

三种实现方案

多租户在数据存储上存在三种主要的方案,分别是:

独立数据库

这种方案一个租户一个数据库,这种方案的用户数据隔离级别最高,安全性最好,但成本也高。

优点:

为不同的租户提供独立的数据库,有助于简化数据模型的扩展设计,满足不同租户的独特需求;

如果出现故障,恢复数据比较简单。

缺点:

增大了数据库的安装数量,随之带来维护成本和购置成本的增加。

共享数据库,独立Schema

这是方案即多个或所有租户共享 Database,但是每个租户一个 Schema。

优点:

为安全性要求较高的租户提供了一定程度的逻辑数据隔离,并不是完全隔离;

每个数据库可以支持更多的租 户数量。

缺点:

如果出现故障,数据恢复比较困难,因为恢复数据库将牵扯到其他租户的数据;

如果需要跨租户统计数据,存在一定困难。

共享数据库,共享Schema,共享数据表

这种方案即租户共享同一个 Database、同一个 Schema,但在表中通过 TenantID 区分租户的数据。 这是共享程度最高、隔离级别最低的模式。

优点:

三种方案比较,第三种方案的维护和购置成本最低,允许每个数据库支持的租户数量最多。

缺点:

隔离级别最低,安全性最低,需要在设计开发时加大对安全的开发量;

数据备份和恢复最困难,需要逐表逐条备份和还原;

如果希望以最少的服务器为最多的租户提供服务,并且租户接受以牺牲隔离级别换取降低成本,这种方案最 适合。

如何选择

首先,mysql数据库中没有schema的概念,所以对于mysql用户来说,只有两种方案:独立数据库和共享数据库,共享数据表。

独立数据库可以做到各租户数据互不影响,所以我们选择的是独立数据库。

而对于上面的三种方案来说:

如果想有较好的隔离性,选择方案一或方案二,但是设计和实现的难度和成本较高。

如果想有较好的共享性,选择方案三或方案二,这时运行成本较低,并且同一运行成本下支持的用户较多。

如果租户数量较多,可以选择方案三或方案二,这样可以降低成本

如果想针对每一租户提供附加的服务,比如数据备份和恢复等,可以选择方案一,这样有更好的隔离性

 

独立数据库架构实现多租户

选用技术

mycat + druid + zk

实现方式

mycat实现分库,拦截带有注释头的sql流向不同的数据库;druid拦截初始sql,添加注释头;程序通过zk修改mycat的配置文件,比如添加新租户时,通过zk添加新的schema和datanode节点,然后同步到mycat的schema.xml文件。

注解

MyCat对自身不支持的Sql语句提供了一种解决方案——在要执行的SQL语句前添加额外的一段由注解SQL组织的代码,这样Sql就能正确执行,这段代码称之为“注解”。

注解的使用相当于对mycat不支持的sql语句做了一层透明代理转发,直接交给目标的数据节点进行sql语句执行,其中注解SQL用于确定最终执行SQL的schema或DB。

注解格式

/*!mycat:schema=[schemaName] */

schema.xml配置文件

  1. <!DOCTYPE mycat:schema SYSTEM "schema.dtd">
  2. <mycat:schema xmlns:mycat="http://io.mycat/">
  3. //每个租户对应一个schema节点
  4. <schema name="667078BnaL7HrFBqYVsLude4MNVd" checkSQLschema="false" sqlMaxLimit="100" dataNode="auth_667078_integral_dev"/>
  5. <schema name="111222BnaL7HrFBqYVsLude4MNVd" checkSQLschema="false" sqlMaxLimit="100" dataNode="auth_111222_integral_dev"/>
  6. <schema name="667098BnaL7HrFBqYVsLude4MNVd" checkSQLschema="false" sqlMaxLimit="100" dataNode="auth_667098_integral_dev"/>
  7. <schema name="667108BnaL7HrFBqYVsLude4MNVd" checkSQLschema="false" sqlMaxLimit="100" dataNode="auth_667108_integral_dev"/>
  8. //auth服务器中的四个库,是四个租户的数据库
  9. <dataNode name="auth_667078_integral_dev" dataHost="auth" database="667078_integral_dev"/>
  10. <dataNode name="auth_111222_integral_dev" dataHost="auth" database="111222_integral_dev"/>
  11. <dataNode name="auth_667098_integral_dev" dataHost="auth" database="667098_integral_dev"/>
  12. <dataNode name="auth_667108_integral_dev" dataHost="auth" database="667108_integral_dev"/>
  13. <dataHost balance="0" maxCon="1000" minCon="10" name="auth" writeType="0" switchType="1" slaveThreshold="100" dbType="mysql" dbDriver="native">
  14. <heartbeat>select user()</heartbeat>
  15. <writeHost host="hostM1" url="192.168.25.54:3306" password="XXX" user="XXX"/>
  16. </dataHost>
  17. </mycat:schema>

server.xml

在server.xml配置文件的user用户配置节点,将所有的schema节点的name属性值配置在property属性中。

  1. <user name="root">
  2. <property name="password">123456</property>
  3. <property name="schemas">667108BnaL7HrFBqYVsLude4MNVd,667098BnaL7HrFBqYVsLude4MNVd,111222BnaL7HrFBqYVsLude4MNVd,667078BnaL7HrFBqYVsLude4MNVd</property>
  4. </user>

说明

对SQL加注解的实现则交由druid的插件功能完成,通过自定义druid的Interceptor类,拦截要执行的sql语句加上对应注解,实现方式见下边的  2.2、过滤sql语句。这样就实现了数据库的多租户改造。下面分几个部分来说明。

前提说明

公司旗下,有权限项目和多个其他项目,权限项目实现了多租户的配置,其他项目如果接入多租户,只需要按步骤配置即可实现多租户。

springBoot实现场景图

接下来的配置以springBoot的项目为例,使用场景图如下:

步骤1到步骤2,不涉及到服务间调用,需要配置两个过滤器,步骤1到步骤3到步骤4,需要服务间调用,除了配置两个过滤器外,还需要维护租户id的传递工作,接下来我们就看看如果一个项目想引入多租户,需要配置哪些方面。

项目引入多租户代码实现

1.引入依赖项

公司封装了工具服务itoo-tool,工具服务中包括多租户的一个文件,所以需要引入该服务,使用其中的配置文件,需要使用的主要配置文件就是场景图中的两个过滤器。

2、过滤器配置

2.1、过滤request请求

拦截浏览器发来的请求,获取租户id,并放到ThreadLocal中

  1. public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain chain) throws IOException, ServletException {
  2. HttpServletRequest request = (HttpServletRequest)servletRequest;
  3. String companyId = request.getHeader("companyId");
  4. TenancyContext.TenancyID.set(companyId);
  5. chain.doFilter(servletRequest, servletResponse);
  6. }

2.2、过滤sql语句

数据库连接池使用的是druid,所以过滤sql,添加mycat注释头,也用druid拦截实现。首先自定义DruidMycatFilter,继承durid的WallFilter,重写check方法,拦截sql,添加注释头,然后流到数据库的sql就是带着注释头的。然后配置mycatFilter Bean,实例化自定义的DruidMycatFilter,注册到spring容器。

  1. @Configuration
  2. public class DruidMycatConfig {
  3. private static final String FILTER_MYCAT_PREFIX = "spring.datasource.druid.filter.mycat";
  4. @Bean
  5. @ConfigurationProperties(FILTER_MYCAT_PREFIX)
  6. @ConditionalOnProperty(prefix = FILTER_MYCAT_PREFIX, name = "enabled", matchIfMissing = true)
  7. @ConditionalOnMissingBean
  8. public DruidMycatFilter mycatFilter(){
  9. //指向第一步中依赖项中的DruidMycatFilter文件
  10. return new DruidMycatFilter();
  11. }
  12. }

DruidMycatFilter文件主要是重写了Druid的拦截器方法,拦截到sql后,在sql前边添加了上边提到的注解,用于指定相应的逻辑库。

3、服务间调用配置

服务间调用大致分两种情况,一种就是前段请求后端,后端连接mycat;另一种就是设计服务A服务B,这时就需要服务A将租户id传给服务B,然后服务B进行连接mycat。

但服务A调用服务B时,首先我们会在服务A的@EnableFeignClient注解的请求头中加入服务B的api,

@EnableFeignClients(basePackages ={"com.dmsdbj.integral.training.api"})

然后通过拦截器拦截请求,获取租户idcompanyId,然后将其放到ThreadLocal中。

  1. public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
  2. this.initUserInfo((HttpServletRequest)request);
  3. chain.doFilter(request, response);
  4. }
  5. private void initUserInfo(HttpServletRequest request) {
  6. //拦截请求,获取租户id
  7. String companyId = request.getHeader("companyId");
  8. String token = request.getHeader("Authorization");
  9. try {
  10. UserInfoModel userInfoModel = new UserInfoModel();
  11. if (StringUtils.isNotBlank(companyId)) {
  12. companyId = URLDecoder.decode(companyId, "UTF-8");
  13. //将租户id放到userInfoModel
  14. userInfoModel.setCompanyId(companyId);
  15. }
  16. if (StringUtils.isNotBlank(token)) {
  17. token = URLDecoder.decode(token, "UTF-8");
  18. userInfoModel.setToken(token);
  19. }
  20. //将userInfoModel放到TheadLocal中
  21. UserInfoContext.setUser(userInfoModel);
  22. } catch (UnsupportedEncodingException var5) {
  23. log.error("init userInfo error", var5);
  24. }
  25. }

4、前端拦截器配置

项目是前后端分离的,需要在前端连接器中配置租户id,然后传到后端。

5、最后就是数据库连接

如果之前该接入多租户的项目没有用到mycat,那么只需要把之前连接musql的地方换成mycat,端口由mysql默认的3306编程mycat的8066即可。

权限项目实现多租户

公司的其他项目如果想引入多租户,需要在权限项目提供的审批界面发申请,然后权限项目在审批申请的时候,如果申请通过,项目会做两件事:一件是创建该租户的数据库、表结构;另一件事修改mycat的配置文件,主要是schemal.xml。接下来我们主要介绍这两件事儿,如何自动实现。

创建租户的数据库及表结构

  1. 首先创建一个租户id
  2. 接着利用租户id+项目id+环境,拼接层数据库名
  3. 然后创建数据库及数据库表,项目数据表sql在权限项目中都有存储,读取相应的sql文件,然后执行就可以
  1. //1.创建租户id
  2. if (StringUtils.isEmpty(companyProjectModel.getSchoolNo())) {
  3. schoolNo = companyService.queryMaxDbNum();
  4. if (schoolNo == 0) {
  5. schoolNo = dbSchoolNo;
  6. } else {
  7. schoolNo = schoolNo + 10;
  8. }
  9. }else {
  10. schoolNo = Integer.parseInt(companyProjectModel.getSchoolNo());
  11. }
  12. String dataBaseName = schoolNo + "_" + companyProjectModel.getProjectNameEn()+"_"+env;
  13. //2.查询该注册用户的详细信息,方便以后存储
  14. CompanyExample example = new CompanyExample();
  15. CompanyCriteria criteria = example.createCriteria();
  16. criteria.andIsDeleteEqualTo((byte) 0);
  17. criteria.andIdEqualTo(id);
  18. List<CompanyEntity> companyEntities = companyService.selectByExample(example);
  19. CompanyEntity companyEntity = new CompanyEntity();
  20. if (!CollectionUtils.isEmpty(companyEntities)) {
  21. companyEntity = companyEntities.get(0);
  22. }
  23. //3.根据生成的schoolNo创建数据库并执行SQL创建数据表等
  24. String userId = BaseUuidUtils.base58Uuid();
  25. if (createDatabase(dataBaseName)){
  26. if (createDbTable(dataBaseName, companyEntity, userId,companyProjectModel.getProjectNameEn())){
  27. ..........
  28. }
  29. }

创建数据库表的代码如下:

  1. private boolean createDbTable(String dataBaseName, CompanyEntity companyEntity, String userId,String projectNameEn) {
  2. boolean flag=true;
  3. Connection conn = null;
  4. PreparedStatement pstmt = null;
  5. try {
  6. //执行SQL脚本创建数据库表
  7. Class.forName(dbDriver);
  8. String newDbUrl = dbUrl + dataBaseName + "?useUnicode=true&characterEncoding=UTF-8";
  9. conn = DriverManager.getConnection(newDbUrl, dbUserName, dbPassWord);
  10. ClassPathResource rc = new ClassPathResource("sqlfile/"+projectNameEn+dbenv);
  11. EncodedResource er = new EncodedResource(rc, "utf-8");
  12. boolean exists = rc.exists();
  13. if (exists==true) {
  14. try {
  15. ScriptUtils.executeSqlScript(conn, er);
  16. } catch (Exception e) {
  17. logger.error("创建数据表失败!", e);
  18. flag=false;
  19. }
  20. }else{
  21. flag=false;
  22. }
  23. } catch (Exception e) {}

项目各环境对应的数据库表sql

修改mycat的schemal.xml配置文件

执行完上边的步骤,表示该租户已创建成功。然后我们需要修改mycat的schemal.xml文件,添加新的schemal,我们选择的是通过zookeeper来添加节点,为什么选择zk呢 ?因为mycat支持通过zk来修改配置文件信息,配置简单,使用方便。

配置zk管理mycat集群

只需要配置myid.properties文件,zk即可管理mycat。

  1. loadZk=true
  2. zkURL=192.168.152.130:2181
  3. clusterId=mycat-cluster-1
  4. myid=mycat_fz_01
  5. clusterNodes=mycat_fz_01,mycat_fz_02,mycat_fz_04
  6. #server booster ; booster install on db same server,will reset all minCon to 1
  7. type=server
  8. boosterDataHosts=dn2,dn3
  9. //myid.properties配置说明:
  10. //loadZk:默认值false。代表mycat集群是否使用ZK,true表示使用
  11. //zkURL:zk集群的地址
  12. //clusterId:mycat集群名字
  13. //myid:当前的mycat服务器名称
  14. //clusterNodes:把所有集群中的所有mycat服务器罗列进行以逗号隔开,比如:clusterNodes=mycat_fz_01,mycat_fz_02,mycat_fz_03

 使用zookeeper的客户端工具ZooInspector连接zookeeper,即可看到mycat的树节点和mycat的conf目录下的schema.xml和rule.xml文件是对应的,这样就可以通过修改zk中的mycat配置文件中的schema、dataNode节点信息。

代码实现:

  1. //修改zk的配置:租户id,数据库名称
  2. if (zkOperation.createSchema(companyId,database)){
  3. //记录发送邮件应该成功的次数标志
  4. sendFlag=true;
  5. }
  1. package com.dmsdbj.itoo.tool.zkoperation;
  2. @Component
  3. public class ZkOperation {
  4. private static final Logger logger = LoggerFactory.getLogger(ZkOperation.class);
  5. @Value("${ZK_IP}")
  6. private String zkIp;
  7. @Value("${ZK_Nodes}")
  8. private String zkNodes;
  9. @Value("${dataHost}")
  10. private String dataHost;
  11. private static final String SCHEMA_JSON = "schemaJson";
  12. private static final String USER_JSON = "userJson";
  13. private static final String DATA_NODE_JSON = "dataNodeJson";
  14. public ZkOperation() {
  15. }
  16. //传入租户id和数据库名
  17. public boolean createSchema(String companyId, String databaseName) {
  18. boolean flag = false;
  19. List<String> nodesList = this.getZkNodes();
  20. Iterator var5 = nodesList.iterator();
  21. while(var5.hasNext()) {
  22. String aNodesList = (String)var5.next();
  23. flag = this.create(companyId, databaseName, aNodesList);
  24. if (!flag) {
  25. break;
  26. }
  27. }
  28. return flag;
  29. }
  30. //创建新的节点信息
  31. private boolean create(String companyId, String databaseName, String path) {
  32. String dnName1 = this.dataHost;
  33. //读取配置文件已有的信息
  34. Map<String, String> jsonMap = this.readSchemaData(path);
  35. if (jsonMap.isEmpty()) {
  36. return false;
  37. } else {
  38. //获取配置文件中的schema、datanode、user三部分信息
  39. String schemaJson = (String)jsonMap.get("schemaJson");
  40. List<eSchema> schemasList = JSON.parseArray(schemaJson, eSchema.class);
  41. String userJson = (String)jsonMap.get("userJson");
  42. List<eUser> userList = JSON.parseArray(userJson, eUser.class);
  43. String dataNodeJson = (String)jsonMap.get("dataNodeJson");
  44. List<eDataNode> dataNodeList = JSON.parseArray(dataNodeJson, eDataNode.class);
  45. eProperty prope = new eProperty();
  46. StringBuilder sbSchema = new StringBuilder();
  47. eUser user = this.getUser(userList);
  48. if (user != null) {
  49. prope = this.getProperty(user.getProperty());
  50. }
  51. //将新的节点信息插入
  52. String dnName = dnName1 + "_" + databaseName;
  53. eSchema schema = new eSchema();
  54. schema.setName(companyId);
  55. schema.setDataNode(dnName);
  56. schemasList.add(schema);
  57. eDataNode node = new eDataNode();
  58. node.setName(dnName);
  59. node.setDataHost(this.dataHost);
  60. node.setDatabase(databaseName);
  61. dataNodeList.add(node);
  62. sbSchema.append(companyId).append(",");
  63. if (prope != null) {
  64. prope.setValue(sbSchema + prope.getValue());
  65. }
  66. Map<String, String> newJsonMap = new HashMap(16);
  67. newJsonMap.put("userJson", JSON.toJSON(userList).toString());
  68. newJsonMap.put("schemaJson", JSON.toJSON(schemasList).toString());
  69. newJsonMap.put("dataNodeJson", JSON.toJSON(dataNodeList).toString());
  70. boolean flag = this.writeSchemaData(newJsonMap, path);
  71. return flag;
  72. }
  73. }
  74. private List<String> getDataBase() {
  75. List<String> list = new ArrayList();
  76. String str = this.getProperties("DataBases");
  77. String[] arr = str.split(",");
  78. list.addAll(Arrays.asList(arr));
  79. return list;
  80. }
  81. private List<String> getZkNodes() {
  82. List<String> list = new ArrayList();
  83. String str = this.zkNodes;
  84. String[] arr = str.split(",");
  85. list.addAll(Arrays.asList(arr));
  86. return list;
  87. }
  88. private String getProperties(String key) {
  89. Properties pro = new Properties();
  90. try {
  91. System.out.println((new File(".")).getAbsolutePath());
  92. FileInputStream in = new FileInputStream("src/profiles/conf.properties");
  93. pro.load(in);
  94. } catch (FileNotFoundException var5) {
  95. logger.error("ZkOperation.getProperties():path.properties文件未找到");
  96. } catch (IOException var6) {
  97. logger.error("ZkOperation.getProperties():文件加载异常");
  98. }
  99. return pro.getProperty(key);
  100. }
  101. //读取配置文件中已有信息
  102. private Map<String, String> readSchemaData(String path) {
  103. Map<String, String> jsonMap = new HashMap(16);
  104. ZkClient zc = this.getZkConnection();
  105. if (zc == null) {
  106. return jsonMap;
  107. } else {
  108. String schemaJson = (String)zc.readData(path + "/schema/schema");
  109. String dataNodeJson = (String)zc.readData(path + "/schema/dataNode");
  110. String userJson = (String)zc.readData(path + "/server/user");
  111. jsonMap.put("schemaJson", schemaJson);
  112. jsonMap.put("dataNodeJson", dataNodeJson);
  113. jsonMap.put("userJson", userJson);
  114. return jsonMap;
  115. }
  116. }
  117. public boolean writeSchemaData(Map<String, String> newJsonMap, String path) {
  118. boolean flag;
  119. try {
  120. ZkClient zc = this.getZkConnection();
  121. if (zc == null) {
  122. return false;
  123. }
  124. zc.writeData(path + "/schema/schema", newJsonMap.get("schemaJson"));
  125. zc.writeData(path + "/schema/dataNode", newJsonMap.get("dataNodeJson"));
  126. zc.writeData(path + "/server/user", newJsonMap.get("userJson"));
  127. flag = true;
  128. } catch (Exception var5) {
  129. flag = false;
  130. }
  131. return flag;
  132. }
  133. private ZkClient getZkConnection() {
  134. String ipAddress = this.zkIp;
  135. ZkClient zc = null;
  136. try {
  137. zc = new ZkClient(ipAddress, 10000, 10000, new ItooZkClient());
  138. } catch (Exception var4) {
  139. logger.error("ZkOperation.getZkConnection():连接zk失败");
  140. }
  141. return zc;
  142. }
  143. private eUser getUser(List<eUser> userList) {
  144. Iterator var2 = userList.iterator();
  145. eUser user;
  146. do {
  147. if (!var2.hasNext()) {
  148. logger.debug(" the method of getUser don't get 'eUser' to return");
  149. return null;
  150. }
  151. user = (eUser)var2.next();
  152. } while(!"root".equals(user.getName()));
  153. return user;
  154. }
  155. private eProperty getProperty(List<eProperty> propList) {
  156. Iterator var2 = propList.iterator();
  157. eProperty p;
  158. do {
  159. if (!var2.hasNext()) {
  160. logger.debug(" the method of getProperty don't get 'eProperty' to return");
  161. return null;
  162. }
  163. p = (eProperty)var2.next();
  164. } while(!e_enum_user.schemas.equals(p.getName()));
  165. return p;
  166. }
  167. }

结语

到此,多租户利用mycat+druid+zk就可以实现多租户了,有不正确之处望各位指正。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/407400
推荐阅读
相关标签
  

闽ICP备14008679号