当前位置:   article > 正文

MySQL向Es数据同步策略_mysql同步es

mysql同步es

哈喽,大家好!目前有有一个小项目功能落到自己手中,也是一个面试必考点。如何保证MySQL与Es、Redis等之间的数据一致性,带着大家的问题,我给提供一种解决方案(最终一致性)

代码如下:

  1. @Component
  2. @Slf4j
  3. @EnableAsync
  4. @AllArgsConstructor
  5. public class EsSynchronizeTasksJob {
  6. private final ISysUserAllESService sysUserAllESService;
  7. private final SysUserAllESDocMapper userAllESDocMapper;
  8. private final SysPositRequestDocMapper sysPositRequestDocMapper;
  9. private final ISysPositRequestService sysPositRequestService;
  10. // @Scheduled(fixedRate = 10000)
  11. // @Scheduled(cron = "0 0 3 * * ?")
  12. @Async("pushMysqlToEsExecutor")
  13. @Transactional(rollbackFor = Exception.class)
  14. public void syncDataToEsBySysUserAllES() throws Exception {
  15. log.info("<----异步执行数据同步开始---->");
  16. // 获取系统时间
  17. Long startTime = System.currentTimeMillis();
  18. process(sysPositRequestService, sysPositRequestDocMapper);
  19. process(sysUserAllESService, userAllESDocMapper);
  20. Long endTime = System.currentTimeMillis();
  21. log.info("<----异步执行数据同步结束---->");
  22. log.info("<----异步执行数据同步耗时---->" + (endTime - startTime) + "ms");
  23. }
  24. private <D extends BaseEntity, S extends IService<D>, M extends BaseEsMapper<D>> void process(IService service, BaseEsMapper mapper) throws Exception {
  25. Class<D> docClass = mapper.getEntityClass();
  26. String indexName = docClass.getAnnotation(Document.class).indexName();
  27. if (!mapper.existsIndex(indexName)) {
  28. boolean success = mapper.createIndex(indexName);
  29. if (success) {
  30. System.out.println("索引创建成功!indexName===>" + indexName);
  31. }
  32. } else {
  33. List<D> sysUserAllESDocs = mapper.selectList(new LambdaEsQueryWrapper<>());
  34. System.out.println("es中有" + sysUserAllESDocs.size());
  35. // TODO 现在是全量同步,后面待定时任务开启后就是每天新增的数据
  36. Integer delete = mapper.delete(new LambdaEsQueryWrapper<>());
  37. System.out.println("删除ES之前的数据" + delete);
  38. }
  39. // TODO 现在是全量同步,后面待定时任务开启后就是每天新增的数据
  40. List<D> list = service.lambdaQuery().getBaseMapper().selectList(new LambdaQueryWrapper<>());
  41. List<D> document = new ArrayList<>();
  42. for (D o : list) {
  43. D docInstance = docClass.getDeclaredConstructor().newInstance();
  44. BeanUtils.copyProperties(o, docInstance);
  45. Method setIdMethod = docClass.getMethod("setMysqlId", Long.class);
  46. Long idValue = (Long) o.getClass().getMethod("getId").invoke(o);
  47. setIdMethod.invoke(docInstance, idValue);
  48. document.add(docInstance);
  49. }
  50. int successCount = mapper.insertBatch(document);
  51. System.out.println("在MySQL表中有:" + list.size() + "条数据");
  52. System.out.println("向ES中成功添加:" + successCount + "条数据");
  53. if (list.size() == successCount) {
  54. System.out.println("同步成功");
  55. } else {
  56. int i = list.size() - successCount;
  57. System.out.println("同步失败,有" + i + "条数据未写入ES,请联系管理员!");
  58. }
  59. }
  60. }

自定义线程池类:

  1. package com.ruoyi.web.es.conifg;
  2. import com.google.common.util.concurrent.ThreadFactoryBuilder;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  6. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  7. import java.util.concurrent.LinkedBlockingQueue;
  8. import java.util.concurrent.ThreadPoolExecutor;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * 七大参数:
  12. * 1 corePoolSize:线程池核心线程数量,核心线程不会被回收,即使没有任务执行,也会保持空闲状态。如果线程池中的线程少于此数目,则在执行任务时创建。
  13. * 2 maximumPoolSize:池允许最大的线程数,当线程数量达到corePoolSize,且workQueue队列塞满任务了之后,继续创建线程。
  14. * 3 keepAliveTime:超过corePoolSize之后的“临时线程”的存活时间。
  15. * 4 unit:keepAliveTime的单位。
  16. * 5 workQueue:当前线程数超过corePoolSize时,新的任务会处在等待状态,并存在workQueue中,BlockingQueue是一个先进先出的阻塞式队列实现,底层实现会涉及Java并发的AQS机制,有关于AQS的相关知识,我会单独写一篇,敬请期待。
  17. * 6 threadFactory:创建线程的工厂类,通常我们会自顶一个threadFactory设置线程的名称,这样我们就可以知道线程是由哪个工厂类创建的,可以快速定位。
  18. * 7 handler:线程池执行拒绝策略,当线数量达到maximumPoolSize大小,并且workQueue也已经塞满了任务的情况下,线程池会调用handler拒绝策略来处理请求。
  19. * 四大拒绝策略:
  20. * 1 AbortPolicy:为线程池默认的拒绝策略,该策略直接抛异常处理。
  21. * 2 DiscardPolicy:直接抛弃不处理。
  22. * 3 DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务
  23. * 4 CallerRunsPolicy:使用当前调用的线程来执行此任务
  24. *
  25. * @author gao
  26. */
  27. @Configuration
  28. public class ThreadPoolTaskExecutorConfig {
  29. // 核心线程数
  30. private static final int corePoolSize = 4;
  31. // 最大线程数
  32. private static final int maxPoolSize = 4;
  33. // 允许线程空闲时间(单位:默认为秒)
  34. private static final int keepAliveTime = 10;
  35. // 缓冲队列数
  36. private static final int queueCapacity = 15;
  37. // 线程池名前缀
  38. private static final String threadNamePrefix = "Thread-PushMessage-Service-";
  39. @Bean("pushMysqlToEsExecutor")
  40. public ThreadPoolTaskExecutor pushMessageExecutor() {
  41. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  42. executor.setCorePoolSize(corePoolSize);
  43. executor.setMaxPoolSize(maxPoolSize);
  44. executor.setQueueCapacity(queueCapacity);
  45. executor.setKeepAliveSeconds(keepAliveTime);
  46. executor.setThreadNamePrefix(threadNamePrefix);
  47. // setWaitForTasksToCompleteOnShutdown(true): 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。
  48. executor.setWaitForTasksToCompleteOnShutdown(true);
  49. // setAwaitTerminationSeconds(60): 该方法用来设置线程池中 任务的等待时间,如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
  50. executor.setAwaitTerminationSeconds(60);
  51. // 线程池对拒绝任务的处理策略
  52. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
  53. // 初始化
  54. executor.initialize();
  55. return executor;
  56. }
  57. @Bean(name = "pushMysqlToEsScheduler")
  58. public ThreadPoolTaskScheduler taskScheduler() {
  59. ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
  60. ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
  61. corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
  62. new LinkedBlockingQueue<>(queueCapacity),
  63. new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").build()
  64. );
  65. scheduler.setPoolSize(customThreadPool.getMaximumPoolSize());
  66. scheduler.setThreadNamePrefix(threadNamePrefix);
  67. return scheduler;
  68. }
  69. }

大家知道有几种保证数据一致性的解决方案呢?

①通过读写锁来实现

②通过中间件canal伪装成MySQL的从节点来实现(参考地址:超详细的Canal入门,看这篇就够了!-CSDN博客

③通过MQ异步消息推送实现,MySQL数据发生变化时向MQ发送消息

④通过定时任务获取MySQL中更新的数据

..........

最后:①②是强一致性的解决方案,③④是最终一致性的解决方案。项目实战是一个漫长的过程,山高路远,道阻且长!加油

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/882758
推荐阅读
相关标签