赞
踩
- log-bin=C:/Program Files/MySQL/MySQL Server 5.5/data/mysql-bin //自己mysql路径
- binlog-format=ROW
- server_id=1
- --从数据库取并存入redis(用于同步信息)
-
- ngx.header.content_type="application/json;charset=utf8"
- local function close_redis(red)
- if not red then
- return
- end
- -- 释放连接(连接池实现),毫秒
- local pool_max_idle_time = 10000
- -- 连接池大小
- local pool_size = 100
- local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
- local log = ngx_log
- if not ok then
- log(ngx_ERR, "set redis keepalive error : ", err)
- end
- end
-
- local uri_args = ngx.req.get_uri_args()
- local cid = uri_args['cid']
-
- local mysqlModel = require("resty.mysql")
- local db = mysqlModel:new()
- db:set_timeout(1000)
- local ok = db:connect{
- host="127.0.0.1",
- port=3306,
- database="shop_content",
- user="root",
- password="ok"
- }
-
- if not ok then
- ngx.say('链接失败')
- db:close()
- return false;
- end
-
- res = db:query("SELECT * FROM `tb_content` WHERE category_id="..cid)
-
- local cjson = require("cjson")
- --ngx.say(cjson.encode(res))
-
- db:close()
-
-
- local redisModel = require("resty.redis")
- local redis = redisModel.new()
- redis:set_timeout(1000)
- local ok = redis:connect('127.0.0.1',6379)
-
- if not ok then
- ngx.say('链接redis失败')
- return close_redis(redis)
- end
-
-
- redis:select(0)
-
- redis:set("content:"..cid,cjson.encode(res))
-
-
- close_redis(redis)
- ngx.say("{'flag':'success'}")
依赖
- <dependencies>
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.1.2</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client-adapter</artifactId>
- <version>1.1.2</version>
- <type>pom</type>
- </dependency>
代码结构
- package com.zb.util;
-
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
- import org.springframework.web.client.RestTemplate;
-
- import java.util.List;
-
- @Component
- public class AsyncProccess {
-
- @Autowired
- private RestTemplate restTemplate;
-
- //多线程
- @Async
- public void updateContentSync(List<CanalEntry.Column> columns) {
- for (CanalEntry.Column column : columns) {
- if (column.getName().equals("category_id")) {//根据id查询
- System.out.println("图片数据同步");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- String cid = column.getValue();
- String url = "http://localhost:9098/mysave?cid=" + cid;//lua请求地址
- String data = restTemplate.getForObject(url, String.class);
- System.out.println(data);
- }
- }
- }
-
- }
- package com.zb.util;
-
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.common.utils.AddressUtils;
- import com.alibaba.otter.canal.protocol.CanalEntry.*;
- import com.alibaba.otter.canal.protocol.Message;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
- import org.springframework.web.client.RestTemplate;
-
- import java.net.InetSocketAddress;
- import java.util.List;
-
- @Component
- public class ClientSample {
-
- @Autowired
- private AsyncProccess asyncProccess;
-
- public void main() {
- System.out.println("开启同步");
- // 创建链接
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
- 11111), "example", "", "");
- int batchSize = 1000;
- try {
- //创建连接
- connector.connect();
- //监听mysql所有的库和表
- connector.subscribe(".*\\..*");
- //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
- connector.rollback();
- boolean flag = true;
- while (flag) {
- Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
- long batchId = message.getId();
- int size = message.getEntries().size();
- //用户没有更改数据库中的数据
- if (batchId == -1 || size == 0) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } else {
- //获取修改的每一条记录
- printEntry(message.getEntries());
- }
- connector.ack(batchId); // 提交确认
- }
- } finally {
- connector.disconnect();
- }
- }
-
-
- private void printEntry(List<Entry> entrys) {
- for (Entry entry : entrys) {
- //检查到当前执行的代码是事物操作, 跳转下次
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
-
- //代码固定,获取rowChage对象
- RowChange rowChage = null;
- try {
- rowChage = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
- e);
- }
-
- //rowChage getEventType 获取事件类型对象
- EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
- if (entry.getHeader().getSchemaName().equals("shop_content") && entry.getHeader().getTableName().equals("tb_content")) {
- for (RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == EventType.DELETE) {
- //rowData.getBeforeColumnsList()获取删除之前的数据
- printColumn(rowData.getBeforeColumnsList());
- } else if (eventType == EventType.INSERT) {
- //rowData.getAfterColumnsList()获取添加之后的数据
- asyncProccess.updateContentSync(rowData.getBeforeColumnsList());
- } else {
- System.out.println("1---");
- asyncProccess.updateContentSync(rowData.getBeforeColumnsList());
- System.out.println("3---");
- }
- }
- }
-
- }
- }
-
-
- private void printColumn(List<Column> columns) {
- for (Column column : columns) {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- }
- }
- }
- package com.zb;
-
- import com.zb.util.ClientSample;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
- import org.springframework.context.ConfigurableApplicationContext;
- import org.springframework.context.annotation.Bean;
- import org.springframework.scheduling.annotation.EnableAsync;
- import org.springframework.web.client.RestTemplate;
-
- @SpringBootApplication
- @EnableDiscoveryClient
- @EnableAsync //开启异步
- public class ContentApplication {
- public static void main(String[] args) {
- ConfigurableApplicationContext run = SpringApplication.run(ContentApplication.class, args);
- ClientSample bean = run.getBean(ClientSample.class);
- bean.main();
- }
-
- @Bean
- public RestTemplate createRestTemplate() {
- return new RestTemplate();
- }
-
-
- }
首先会先从redis中取若没有会从数据库取并放到redis中最后存到缓存中
- --多级缓存
-
- ngx.header.content_type="application/json;charset=utf8"
- local uri_args = ngx.req.get_uri_args()
- local cid = uri_args['cid']
- --获取nginx缓存对象
- local ngxCache = ngx.shared.my_cache
- --获取缓存数据
- local ngxData = ngxCache:get('content:'..cid)
- if ngxData=="" or ngxData==nil then
- local redisModel = require("resty.redis")
- local redis = redisModel.new()
- redis:set_timeout(1000)
- local ok = redis:connect('127.0.0.1',6379)
- redis:select(0)
- local redisData = redis:get("content:"..cid);
- if ngx.null==redisData then
- local mysqlModel = require("resty.mysql")
- local db = mysqlModel:new()
- db:set_timeout(1000)
- local ok = db:connect{
- host="127.0.0.1",
- port=3306,
- database="shop_content",
- user="root",
- password="ok"
- }
-
- if not ok then
- ngx.say('链接失败')
- db:close()
- return false;
- end
-
- res = db:query("SELECT * FROM `tb_content` WHERE category_id="..cid)
-
- local cjson = require("cjson")
- local jsonVal = cjson.encode(res)
- ngx.say("================>DB")
- ngx.say(jsonVal)
- redis:set("content:"..cid,jsonVal)
- else
- ngx.say("================>redis")
- ngx.say(redisData)
- --redis数据存储到缓存中
- ngxCache:set('content:'..cid,redisData,60)//缓存存储时间60s
- end
- else
- ngx.say("================>nginx")
- ngx.say(ngxData)
- end
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。