赞
踩
哈喽,大家好!目前有有一个小项目功能落到自己手中,也是一个面试必考点。如何保证MySQL与Es、Redis等之间的数据一致性,带着大家的问题,我给提供一种解决方案(最终一致性)
代码如下:
- @Component
- @Slf4j
- @EnableAsync
- @AllArgsConstructor
- public class EsSynchronizeTasksJob {
-
-
- private final ISysUserAllESService sysUserAllESService;
-
- private final SysUserAllESDocMapper userAllESDocMapper;
-
- private final SysPositRequestDocMapper sysPositRequestDocMapper;
-
- private final ISysPositRequestService sysPositRequestService;
-
- // @Scheduled(fixedRate = 10000)
- // @Scheduled(cron = "0 0 3 * * ?")
- @Async("pushMysqlToEsExecutor")
- @Transactional(rollbackFor = Exception.class)
- public void syncDataToEsBySysUserAllES() throws Exception {
- log.info("<----异步执行数据同步开始---->");
- // 获取系统时间
- Long startTime = System.currentTimeMillis();
-
- process(sysPositRequestService, sysPositRequestDocMapper);
- process(sysUserAllESService, userAllESDocMapper);
-
-
- Long endTime = System.currentTimeMillis();
- log.info("<----异步执行数据同步结束---->");
- log.info("<----异步执行数据同步耗时---->" + (endTime - startTime) + "ms");
- }
-
-
- private <D extends BaseEntity, S extends IService<D>, M extends BaseEsMapper<D>> void process(IService service, BaseEsMapper mapper) throws Exception {
- Class<D> docClass = mapper.getEntityClass();
- String indexName = docClass.getAnnotation(Document.class).indexName();
- if (!mapper.existsIndex(indexName)) {
- boolean success = mapper.createIndex(indexName);
- if (success) {
- System.out.println("索引创建成功!indexName===>" + indexName);
- }
- } else {
- List<D> sysUserAllESDocs = mapper.selectList(new LambdaEsQueryWrapper<>());
- System.out.println("es中有" + sysUserAllESDocs.size());
-
- // TODO 现在是全量同步,后面待定时任务开启后就是每天新增的数据
- Integer delete = mapper.delete(new LambdaEsQueryWrapper<>());
- System.out.println("删除ES之前的数据" + delete);
- }
-
- // TODO 现在是全量同步,后面待定时任务开启后就是每天新增的数据
- List<D> list = service.lambdaQuery().getBaseMapper().selectList(new LambdaQueryWrapper<>());
- List<D> document = new ArrayList<>();
- for (D o : list) {
- D docInstance = docClass.getDeclaredConstructor().newInstance();
- BeanUtils.copyProperties(o, docInstance);
- Method setIdMethod = docClass.getMethod("setMysqlId", Long.class);
- Long idValue = (Long) o.getClass().getMethod("getId").invoke(o);
- setIdMethod.invoke(docInstance, idValue);
- document.add(docInstance);
- }
- int successCount = mapper.insertBatch(document);
- System.out.println("在MySQL表中有:" + list.size() + "条数据");
- System.out.println("向ES中成功添加:" + successCount + "条数据");
- if (list.size() == successCount) {
- System.out.println("同步成功");
- } else {
- int i = list.size() - successCount;
- System.out.println("同步失败,有" + i + "条数据未写入ES,请联系管理员!");
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
自定义线程池类:
- package com.ruoyi.web.es.conifg;
-
- import com.google.common.util.concurrent.ThreadFactoryBuilder;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
-
- /**
- * 七大参数:
- * 1 corePoolSize:线程池核心线程数量,核心线程不会被回收,即使没有任务执行,也会保持空闲状态。如果线程池中的线程少于此数目,则在执行任务时创建。
- * 2 maximumPoolSize:池允许最大的线程数,当线程数量达到corePoolSize,且workQueue队列塞满任务了之后,继续创建线程。
- * 3 keepAliveTime:超过corePoolSize之后的“临时线程”的存活时间。
- * 4 unit:keepAliveTime的单位。
- * 5 workQueue:当前线程数超过corePoolSize时,新的任务会处在等待状态,并存在workQueue中,BlockingQueue是一个先进先出的阻塞式队列实现,底层实现会涉及Java并发的AQS机制,有关于AQS的相关知识,我会单独写一篇,敬请期待。
- * 6 threadFactory:创建线程的工厂类,通常我们会自顶一个threadFactory设置线程的名称,这样我们就可以知道线程是由哪个工厂类创建的,可以快速定位。
- * 7 handler:线程池执行拒绝策略,当线数量达到maximumPoolSize大小,并且workQueue也已经塞满了任务的情况下,线程池会调用handler拒绝策略来处理请求。
- * 四大拒绝策略:
- * 1 AbortPolicy:为线程池默认的拒绝策略,该策略直接抛异常处理。
- * 2 DiscardPolicy:直接抛弃不处理。
- * 3 DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务
- * 4 CallerRunsPolicy:使用当前调用的线程来执行此任务
- *
- * @author gao
- */
- @Configuration
- public class ThreadPoolTaskExecutorConfig {
-
- // 核心线程数
- private static final int corePoolSize = 4;
-
- // 最大线程数
- private static final int maxPoolSize = 4;
-
- // 允许线程空闲时间(单位:默认为秒)
- private static final int keepAliveTime = 10;
- // 缓冲队列数
- private static final int queueCapacity = 15;
- // 线程池名前缀
- private static final String threadNamePrefix = "Thread-PushMessage-Service-";
-
- @Bean("pushMysqlToEsExecutor")
- public ThreadPoolTaskExecutor pushMessageExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(corePoolSize);
- executor.setMaxPoolSize(maxPoolSize);
- executor.setQueueCapacity(queueCapacity);
- executor.setKeepAliveSeconds(keepAliveTime);
- executor.setThreadNamePrefix(threadNamePrefix);
- // setWaitForTasksToCompleteOnShutdown(true): 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。
- executor.setWaitForTasksToCompleteOnShutdown(true);
- // setAwaitTerminationSeconds(60): 该方法用来设置线程池中 任务的等待时间,如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
- executor.setAwaitTerminationSeconds(60);
- // 线程池对拒绝任务的处理策略
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
- // 初始化
- executor.initialize();
- return executor;
- }
-
-
- @Bean(name = "pushMysqlToEsScheduler")
- public ThreadPoolTaskScheduler taskScheduler() {
- ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
- ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
- corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(queueCapacity),
- new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").build()
- );
- scheduler.setPoolSize(customThreadPool.getMaximumPoolSize());
- scheduler.setThreadNamePrefix(threadNamePrefix);
- return scheduler;
- }
-
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
大家知道有几种保证数据一致性的解决方案呢?
①通过读写锁来实现
②通过中间件canal伪装成MySQL的从节点来实现(参考地址:超详细的Canal入门,看这篇就够了!-CSDN博客)
③通过MQ异步消息推送实现,MySQL数据发生变化时向MQ发送消息
④通过定时任务获取MySQL中更新的数据
..........
最后:①②是强一致性的解决方案,③④是最终一致性的解决方案。项目实战是一个漫长的过程,山高路远,道阻且长!加油
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。