当前位置:   article > 正文

springCloud+canal.deploy实现同步数据和多级缓存_springcloud集成canal

springcloud集成canal

1.打开mysql C:\Program Files\MySQL\MySQL Server 5.5\my.ini添加三行代码

  1. log-bin=C:/Program Files/MySQL/MySQL Server 5.5/data/mysql-bin //自己mysql路径
  2. binlog-format=ROW
  3. server_id=1

2.将canal.deploy中 E:\fwq\canal.deployer-1.1.5-SNAPSHOT\conf\example\instance.properties修改为自己数据库账号密码

 3.编写lua脚本

          查询数据库的数据然后存入redis(lua脚本)

  1. --从数据库取并存入redis(用于同步信息)
  2. ngx.header.content_type="application/json;charset=utf8"
  3. local function close_redis(red)
  4. if not red then
  5. return
  6. end
  7. -- 释放连接(连接池实现),毫秒
  8. local pool_max_idle_time = 10000
  9. -- 连接池大小
  10. local pool_size = 100
  11. local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
  12. local log = ngx_log
  13. if not ok then
  14. log(ngx_ERR, "set redis keepalive error : ", err)
  15. end
  16. end
  17. local uri_args = ngx.req.get_uri_args()
  18. local cid = uri_args['cid']
  19. local mysqlModel = require("resty.mysql")
  20. local db = mysqlModel:new()
  21. db:set_timeout(1000)
  22. local ok = db:connect{
  23. host="127.0.0.1",
  24. port=3306,
  25. database="shop_content",
  26. user="root",
  27. password="ok"
  28. }
  29. if not ok then
  30. ngx.say('链接失败')
  31. db:close()
  32. return false;
  33. end
  34. res = db:query("SELECT * FROM `tb_content` WHERE category_id="..cid)
  35. local cjson = require("cjson")
  36. --ngx.say(cjson.encode(res))
  37. db:close()
  38. local redisModel = require("resty.redis")
  39. local redis = redisModel.new()
  40. redis:set_timeout(1000)
  41. local ok = redis:connect('127.0.0.1',6379)
  42. if not ok then
  43. ngx.say('链接redis失败')
  44. return close_redis(redis)
  45. end
  46. redis:select(0)
  47. redis:set("content:"..cid,cjson.encode(res))
  48. close_redis(redis)
  49. ngx.say("{'flag':'success'}")

        使用java代码实现redis数据同步

依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.alibaba.otter</groupId>
  4. <artifactId>canal.client</artifactId>
  5. <version>1.1.2</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>com.alibaba.otter</groupId>
  9. <artifactId>canal.client-adapter</artifactId>
  10. <version>1.1.2</version>
  11. <type>pom</type>
  12. </dependency>

代码结构

        AsyncProccess工具类通过restTemplate请求调用lua脚本实现

  1. package com.zb.util;
  2. import com.alibaba.otter.canal.protocol.CanalEntry;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.scheduling.annotation.Async;
  5. import org.springframework.stereotype.Component;
  6. import org.springframework.web.client.RestTemplate;
  7. import java.util.List;
  8. @Component
  9. public class AsyncProccess {
  10. @Autowired
  11. private RestTemplate restTemplate;
  12. //多线程
  13. @Async
  14. public void updateContentSync(List<CanalEntry.Column> columns) {
  15. for (CanalEntry.Column column : columns) {
  16. if (column.getName().equals("category_id")) {//根据id查询
  17. System.out.println("图片数据同步");
  18. try {
  19. Thread.sleep(5000);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. String cid = column.getValue();
  24. String url = "http://localhost:9098/mysave?cid=" + cid;//lua请求地址
  25. String data = restTemplate.getForObject(url, String.class);
  26. System.out.println(data);
  27. }
  28. }
  29. }
  30. }

ClientSample工具类

  1. package com.zb.util;
  2. import com.alibaba.otter.canal.client.CanalConnector;
  3. import com.alibaba.otter.canal.client.CanalConnectors;
  4. import com.alibaba.otter.canal.common.utils.AddressUtils;
  5. import com.alibaba.otter.canal.protocol.CanalEntry.*;
  6. import com.alibaba.otter.canal.protocol.Message;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.scheduling.annotation.Async;
  9. import org.springframework.stereotype.Component;
  10. import org.springframework.web.client.RestTemplate;
  11. import java.net.InetSocketAddress;
  12. import java.util.List;
  13. @Component
  14. public class ClientSample {
  15. @Autowired
  16. private AsyncProccess asyncProccess;
  17. public void main() {
  18. System.out.println("开启同步");
  19. // 创建链接
  20. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
  21. 11111), "example", "", "");
  22. int batchSize = 1000;
  23. try {
  24. //创建连接
  25. connector.connect();
  26. //监听mysql所有的库和表
  27. connector.subscribe(".*\\..*");
  28. //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
  29. connector.rollback();
  30. boolean flag = true;
  31. while (flag) {
  32. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  33. long batchId = message.getId();
  34. int size = message.getEntries().size();
  35. //用户没有更改数据库中的数据
  36. if (batchId == -1 || size == 0) {
  37. try {
  38. Thread.sleep(1000);
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. } else {
  43. //获取修改的每一条记录
  44. printEntry(message.getEntries());
  45. }
  46. connector.ack(batchId); // 提交确认
  47. }
  48. } finally {
  49. connector.disconnect();
  50. }
  51. }
  52. private void printEntry(List<Entry> entrys) {
  53. for (Entry entry : entrys) {
  54. //检查到当前执行的代码是事物操作, 跳转下次
  55. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  56. continue;
  57. }
  58. //代码固定,获取rowChage对象
  59. RowChange rowChage = null;
  60. try {
  61. rowChage = RowChange.parseFrom(entry.getStoreValue());
  62. } catch (Exception e) {
  63. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
  64. e);
  65. }
  66. //rowChage getEventType 获取事件类型对象
  67. EventType eventType = rowChage.getEventType();
  68. System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
  69. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  70. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  71. eventType));
  72. if (entry.getHeader().getSchemaName().equals("shop_content") && entry.getHeader().getTableName().equals("tb_content")) {
  73. for (RowData rowData : rowChage.getRowDatasList()) {
  74. if (eventType == EventType.DELETE) {
  75. //rowData.getBeforeColumnsList()获取删除之前的数据
  76. printColumn(rowData.getBeforeColumnsList());
  77. } else if (eventType == EventType.INSERT) {
  78. //rowData.getAfterColumnsList()获取添加之后的数据
  79. asyncProccess.updateContentSync(rowData.getBeforeColumnsList());
  80. } else {
  81. System.out.println("1---");
  82. asyncProccess.updateContentSync(rowData.getBeforeColumnsList());
  83. System.out.println("3---");
  84. }
  85. }
  86. }
  87. }
  88. }
  89. private void printColumn(List<Column> columns) {
  90. for (Column column : columns) {
  91. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  92. }
  93. }
  94. }

4.启动类

  1. package com.zb;
  2. import com.zb.util.ClientSample;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
  6. import org.springframework.context.ConfigurableApplicationContext;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.scheduling.annotation.EnableAsync;
  9. import org.springframework.web.client.RestTemplate;
  10. @SpringBootApplication
  11. @EnableDiscoveryClient
  12. @EnableAsync //开启异步
  13. public class ContentApplication {
  14. public static void main(String[] args) {
  15. ConfigurableApplicationContext run = SpringApplication.run(ContentApplication.class, args);
  16. ClientSample bean = run.getBean(ClientSample.class);
  17. bean.main();
  18. }
  19. @Bean
  20. public RestTemplate createRestTemplate() {
  21. return new RestTemplate();
  22. }
  23. }

以上就是数据同步的全部步骤

多级缓存(lua脚本)

首先会先从redis中取若没有会从数据库取并放到redis中最后存到缓存中

  1. --多级缓存
  2. ngx.header.content_type="application/json;charset=utf8"
  3. local uri_args = ngx.req.get_uri_args()
  4. local cid = uri_args['cid']
  5. --获取nginx缓存对象
  6. local ngxCache = ngx.shared.my_cache
  7. --获取缓存数据
  8. local ngxData = ngxCache:get('content:'..cid)
  9. if ngxData=="" or ngxData==nil then
  10. local redisModel = require("resty.redis")
  11. local redis = redisModel.new()
  12. redis:set_timeout(1000)
  13. local ok = redis:connect('127.0.0.1',6379)
  14. redis:select(0)
  15. local redisData = redis:get("content:"..cid);
  16. if ngx.null==redisData then
  17. local mysqlModel = require("resty.mysql")
  18. local db = mysqlModel:new()
  19. db:set_timeout(1000)
  20. local ok = db:connect{
  21. host="127.0.0.1",
  22. port=3306,
  23. database="shop_content",
  24. user="root",
  25. password="ok"
  26. }
  27. if not ok then
  28. ngx.say('链接失败')
  29. db:close()
  30. return false;
  31. end
  32. res = db:query("SELECT * FROM `tb_content` WHERE category_id="..cid)
  33. local cjson = require("cjson")
  34. local jsonVal = cjson.encode(res)
  35. ngx.say("================>DB")
  36. ngx.say(jsonVal)
  37. redis:set("content:"..cid,jsonVal)
  38. else
  39. ngx.say("================>redis")
  40. ngx.say(redisData)
  41. --redis数据存储到缓存中
  42. ngxCache:set('content:'..cid,redisData,60)//缓存存储时间60s
  43. end
  44. else
  45. ngx.say("================>nginx")
  46. ngx.say(ngxData)
  47. end

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/85629
推荐阅读
相关标签
  

闽ICP备14008679号