当前位置:   article > 正文

分布式集群架构场景化解决方案相关_分布式集群方案

分布式集群方案


前言

在我们日常工作中,分布式和集群几乎天天都会听到,那么什么是分布式?什么是集群呢?这两是一个东西吗?
分布式和集群是不一样的

看图看图:
在这里插入图片描述
分布式一定是集群,但是集群不一定是分布式(因为集群就是多个实例一起工作(只要满足有多个实例就是集群),而分布式将一个系统拆分之后那就是多个实例;集群并不一定是分布式,因为复制型的集群只是进行了复制,没有进行拆分,而分布式的关键是拆分)

一、一致性Hash算法

Hash算法:比如说在安全加密领域MD5、SHA等加密算法,在数据存储和查找方面有Hash表等
以上,都应用到了Hash算法。

为什么要使用Hash?
Hash算法较多的应用在数据存储和查找领域,最经典的就是Hash表,它的查询效率非常之高,其中的 哈希算法如果设计的比较ok的话,那么Hash表的数据查询时间复杂度可以接近于O(1)

示例:提供一组数据 1,5,7,6,3,4,8,对这组数据进行存储,然后随便给定一个数n,请你判断n是否存在于刚才的数据集中?

怎么解呢?一般就是几种方法
顺序查找法:通过循环判断实现
list:List[1,5,7,6,3,4,8]
for(int element:list){
if (n==element) {
如果相等,说明n存在于数据集中
}
}
通过循环完成比较原始,虽然能完成,但是效率不高

二分查找法,排序后折半查找,效率也只是比循环查询好了一些而已

那能不能不循环也不二分,通过一次查询就把数据n从数据集中查询处理呢???

定义一个数组,数组长度大于数据集长度,这里的长度是9,数据1就存在下标为1的位置,3就存在下标为3的地方…依次类推

这个时候,比如n=5,我想查一下5是否存在,那我只需要判断数组下标为5的元素是否存在,如果为空表示5不存在,如果不为空表示5在数据集中,通过一次查询就找到了,时间复杂度为O(1)
在这里插入图片描述
这种就是直接寻址法:直接把数据和数组的下标绑定到一起,查找的时候,直接array[n]就取出了数据

优点:查询速度快,一次查询就得到了结果
缺点:
1)浪费空间,比如 1,5,7,6,3,4,8,12306 ,最大值12306 ,按照上述方式需要定义一个比如⻓度为12307的数组,但是只存储零星的几个数据,其他位置空间都浪费着
2)数据如:1,5,7,6,3,4,8,12,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2最大值12,比如开辟13个空间,存储不了这么多内容

现在,换一种设计,如果数据是3,5,7,12306,一共4个数据,我们开辟任意个空间,比如5个,那 么具体数据存储到哪个位置呢,我们可以对数据进行求模(对空间位置数5),根据求模余数确定存储 位置的下标,比如3%5=3,就可以把3这个数据放到下标为3的位置上,12306%5=1,就把12306这个数据存储到下标为1的位置上

在这里插入图片描述
上面对数据求模 (数据%空间位置数) 他就是一个hash算法,只不过这是一种比较普通又简单的hash 算法,这种构造Hash算法的方式叫做除留余数法

但是还有问题:如果数据是1,6,7,8,把这4个数据存储到上面的数组中,就会出现hash冲突

在这里插入图片描述
那继续想办法:在此基础上采用开放寻址法
开放寻址法:1放进去了,6再来的时候,向前或者向后找空闲位置存放,
这种方法问题很明显:如果数组⻓度定义好了比如10,⻓度不能扩展,来了11个数据,不管Hash冲突不冲突,肯定存不下这么多数据

那很明显,开放寻址法不能用,怎么办呢?
我们可以使用拉链法
拉链法:数据⻓度定义好了,怎么存储更多内容呢,算好Hash值,在数组元素存储位置放了一个链表
在这里插入图片描述
如果Hash算法设计的比较好的话,那么查询效率会更接近于O(1),如果Hash算法设计的比较low,那么查询效率就会很低了

比如有一组数据:1,2,3,3,4…但是经过设计的Hash算法后都集中放到同一个位置了,那就会导致效率很差

在这里插入图片描述

所以,Hash表的查询效率高不高取决于Hash算法,hash算法能够让数据平均分布,既能够节省空间又 能提高查询效率。Hash算法的研究是很深的一⻔学问,比较复杂,⻓久以来,Hash表内部的Hash算法 也一直在更新,很多数学家也在研究

1.1 Hash算法应用场景

Hash算法在分布式集群架构中的应用场景
Hash算法在很多分布式集群产品中都有应用,比如分布式集群架构Redis、Hadoop、ElasticSearch,Mysql分库分表,Nginx负载均衡等

主要的应用场景归纳起来两个
1、请求的负载均衡(比如nginx的ip_hash策略)
Nginx的IP_hash策略可以在客户端ip不变的情况下,将其发出的请求始终路由到同一个目标服务器上,实现会话粘滞,避免处理session共享问题
如果没有IP_hash策略,那么如何实现会话粘滞?
可以维护一张映射表,存储客户端IP或者sessionid与具体目标服务器的映射关系 <ip,tomcat1>
缺点
1)那么,在客户端很多的情况下,映射表非常大,浪费内存空间
2)客户端上下线,目标服务器上下线,都会导致重新维护映射表,映射表维护成本很大
如果使用哈希算法,事情就简单很多,我们可以对ip地址或者sessionid进行计算哈希值,哈希值与服务器数量进行取模运算,得到的值就是当前请求应该被路由到的服务器编号,如此,同一个客户端ip发送过来的请求就可以路由到同一个目标服务器,实现会话粘滞。
在这里插入图片描述
2、分布式存储
以分布式内存数据库Redis为例,集群中有redis1,redis2,redis3 三台Redis服务器
那么,在进行数据存储时,<key1,value1>数据存储到哪个服务器当中呢?针对key进行hash处理,hash(key1)%3=index, 使用余数index锁定存储的具体服务器节点

1.2 普通Hash算法存在的问题

普通Hash算法存在一个问题,以ip_hash为例,假定下载用户ip固定没有发生改变,现在tomcat3出现了问题,down机了,服务器数量由3个变为了2个,之前所有的求模都需要重新计算。
在这里插入图片描述
如果在真实生产情况下,后台服务器很多台,客户端也有很多,那么影响是很大的,缩容和扩容都会存在这样的问题,大量用户的请求会被路由到其他的目标服务器处理,用户在原来服务器中的会话都会丢失。

1.3 一致性Hash算法

一致性哈希算法思路如下:
首先有一条直线,直线开头和结尾分别定为为1和2的32次方减1,每一个点都相当于一个地址
在这里插入图片描述
对于这样一条线,弯过来构成一个圆环形成闭环,这样的一个圆环称为hash环。我们把服务器的ip或者主机名求 hash值然后对应到hash环上,那么针对客户端用户,也根据它的ip进行hash求值,对应到环上某个位置
然后如何确定一个客户端路由到哪个服务器处理呢?----按照顺时针方向找最近的服务器节点
在这里插入图片描述
假如将服务器3下线,服务器3下线后,原来路由到3的客户端重新路由到服务器4,对于其他客户端没有影响只是这一小部分受影响(请求的迁移达到了最小,这样的算法对分布式集群来说非常合适的,避免了大量请求迁移 )
在这里插入图片描述

增加服务器5之后,原来路由到3的部分客户端路由到新增服务器5上,对于其他客户端没有影响只是这 一小部分受影响(请求的迁移达到了最小,这样的算法对分布式集群来说非常合适的,避免了大量请求迁移 )

在这里插入图片描述
如前所述,每一台服务器负责一段,一致性哈希算法对于节点的增减都只需重定位环空间中的一小部分数据,具有较好的容错性和可扩展性。但是,一致性哈希算法在服务节点太少时,容易因为节点分部不均匀而造成数据倾斜问题。例如系统中只有两台服务器,其环分布如下图左,节点2只能负责非常小的一段,大量的客户端请求落在了节点1上,这就是数据(请求)倾斜问题
在这里插入图片描述
为了解决这种数据倾斜问题,一致性哈希算法引入了虚拟节点机制,即对每一个服务节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点
具体做法可以在服务器ip或主机名的后面增加编号来实现。比如,可以为每台服务器计算三个虚拟节点,于是可以分别计算 “节点1的ip#1”、“节点1的ip#2”、“节点1的ip#3”、“节点2的ip#1”、“节点2的 ip#2”、“节点2的ip#3”的哈希值,于是形成六个虚拟节点,当客户端被路由到虚拟节点的时候其实是被 路由到该虚拟节点所对应的真实节点

二、集群时钟同步问题

2.1 时钟不同步导致的问题

时钟此处指服务器时间,如果集群中各个服务器时钟不一致势必导致一系列问题,试想 “集群是各个服务器一起团队化作战,大家工作都不在一个点上,岂不乱了套!”
举一个例子,电商网站业务中,新增一条订单,那么势必会在订单表中增加了一条记录,该条记录中应 该会有“下单时间”这样的字段,往往我们会在程序中获取当前系统时间插入到数据库或者直接从数据库服务器获取时间。那我们的订单子系统是集群化部署,或者我们的数据库也是分库分表的集群化部署, 然而他们的系统时钟缺不一致,比如有一台服务器的时间是昨天,那么这个时候下单时间就成了昨天, 那我们的数据将会混乱!如下
在这里插入图片描述

2.2 集群时钟同步配置

2.2.1 场景一:分布式集群中各个服务器节点都能访问互联网

这种场景的时钟同步思路如下:
在这里插入图片描述
操作方式:

#使用 ntpdate 网络时间同步命令
ntpdate -u ntp.api.bz #从一个时间服务器同步时间(这个是上海的一个服务器集群)
  • 1
  • 2

我们尝试修改下远程服务器时间,然后再校准一下看看效果:
改之前是正确的,如下图:
在这里插入图片描述
然后我们把时间改成早上八点:
在这里插入图片描述
可以看到时间被我们手动改坏了,已经按照上午八点的时间继续走了,接下来我们校准一次看看
在这里插入图片描述
可以看到校准成功了,并且还给了一个偏差时长
当然实际生产中我们不可能每次发现有问题了再上服务器去改,所以我们可以使用定时任务,不论是windows服务器还是linux服务器,都是支持定时任务的,我们可以每十分钟或者每半小时甚至每天都行

2.2.2 场景二:分布式集群中某一个服务器节点可以访问互联网或者所有节点都不能够访问互联网

在这里插入图片描述
操作方式:
选取集群中的一个服务器节点A(172.17.0.17)作为时间服务器(整个集群时间从这台服务器同步,如果这台服务器能够访问互联网,可以让这台服务器和网络时间保持同步,如果不能就手动设置一个时间)

首先设置好A的时间
把A配置为时间服务器(修改/etc/ntp.conf文件)

1、如果有 restrict default ignore,注释掉它 
2、添加如下几行内容
restrict 172.17.0.0 mask 255.255.255.0 nomodify notrap # 放开局 域网同步功能,172.17.0.0是你的局域网网段
server 127.127.1.0 # local clock
fudge 127.127.1.0 stratum 10 
3、重启生效并配置ntpd服务开机自启动
service ntpd restart
chkconfig ntpd on
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

集群中其他节点就可以从A服务器同步时间了

ntpdate 172.17.0.17
  • 1

三、分布式ID解决方案

分布式Id是什么?:分布式集群环境下的全局唯一ID

为什么需要分布式Id?:分表后ID不能够重复,因此不能继续用主键自增,所以需要一种机制来提供一个全局唯一ID
在这里插入图片描述

3.1 UUID

UUID 是指Universally Unique Identifier,翻译为中文是通用唯一识别码
产生重复 UUID 并造成错误的情况非常低,是故大可不必考虑此问题。

Java中得到一个UUID,可以使用java.util包提供的方法

public class UuidTest {
        public static void main(String[] args) {
            System.out.println(java.util.UUID.randomUUID().toString());
        }
}
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述
优点:使用方便,jdk自带使用方法

UUID的缺点:
1)字符较长,不好作为索引
2)没有什么规律,实际使用中可能会有些不方便

3.2 独立数据库的自增ID(基本不用)

比如A表分表为A1表和A2表,那么肯定不能让A1表和A2表的ID自增,那么ID怎么获取呢?我们可以单独的创建一个Mysql数据库,在这个数据库中创建一张表,这张表的ID设置为自增,其他地方需要全局唯一ID的时候,就模拟向这个Mysql数据库的这张表中模拟插入一条记录,此时ID会自增,然后我们可以通过Mysql的select last_insert_id() 获取到刚刚这张表中自增生成的ID

缺点:性能和可靠性都不够好,因为你需要代码连接到数据库才能获取到id,性能无法保障,另外mysql数据库实例挂掉了,那么就无法获取分布式id了

3.3 SnowFlake 雪花算法

雪花算法是Twitter推出的一个用于生成分布式ID的策略。
雪花算法是一个算法,基于这个算法可以生成ID,生成的ID是一个long型,那么在Java中一个long 型是8个字节,算下来是64bit,如下是使用雪花算法生成的一个ID的二进制形式示意:
在这里插入图片描述
所以,雪花算法的意思就是一台机器的一个毫秒可以产生四千多个唯一ID

另外,一些互联网公司也基于上述的方案封装了一些分布式ID生成器,比如滴滴的tinyid(基于数据库实现)、百度的uidgenerator(基于SnowFlake)和美团的leaf(基于数据库和SnowFlake) 等。

3.4 借助Redis的Incr命令获取全局唯一ID

Redis Incr 命令将 key 中储存的数字值增一。如果 key 不存在,那么 key 的值会先被初始化为 0,然后再执行 INCR 操作。
<key,value>
<id,>
.incr(id) 1 2 3 4

在这里插入图片描述
安装好redis客户端后测试下:

Jedis jedis = new Jedis("127.0.0.1",6379);
try {
  long id = jedis.incr("id");
System.out.println("从redis中获取的分布式id为:" + id); } finally {
  if (null != jedis) {
    jedis.close();
} }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

四、分布式调度问题(定时任务的分布式)

实际工作中大部分有经验的同学都使用过定时任务

4.1 定时任务的场景

定时任务形式:每隔一定时间/特定某一时刻执行
例如:
订单审核、出库
订单超时自动取消、支付退款
礼券同步、生成、发放作业
物流信息推送、抓取作业、退换货处理作业
数据积压监控、日志监控、服务可用性探测作业
定时备份数据
金融系统每天的定时结算
数据归档、清理作业
报表、离线数据分析作业

4.2 什么是分布式调度

有两层含义
1)运行在分布式集群环境下的调度任务(同一个定时任务程序部署多份,只应该有一个定时任务在执行)
2)定时任务的分布式(定时任务拆分):即为把一个大的作业任务拆分为多个小的作业任务,同时执行

分布式调度—>定时任务的分布式—>定时任务的拆分
在这里插入图片描述

4.3 定时任务与消息队列的对比

区别:本质区别是定时任务是时间驱动,而消息队列是事件驱动

共同点:
1)都是异步处理:比如注册、下单事件
2)应用解耦:不论是定时任务还是消息队列都可以实现两个应用间的解耦
3)流量削封:双十一的时候,任务作业和MQ都可以用来扛流量,后端系统根据服务能力定时处理订单或者从MQ抓取订单抓取到一个订单到来事件的话触发处理,对于前端用户来说看到的结果是已经下单成功了,下单是不受任何影响的

所以,定时任务更偏向于批处理,消息队列更偏向于逐条处理

4.4 定时任务的实现方式

定时任务的实现方式有多种。早期没有定时任务框架的时候,我们会使用JDK中的Timer机制和多线程机制(Runnable+线程休眠)来实现定时或者间隔一段时间执行某一段程序;后来有了定时任务框架,比如大名鼎鼎的Quartz任务调度框架,使用时间表达式(包括:秒、分、时、日、周、年)配置某一个任务什么时间去执行:

我们来回顾一下任务调度框架Quartz的简单使用:
引入相关的jar包:

<!--任务调度框架quartz-->
<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

具体使用的代码如下:

package quartz;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

/*
 * @since 2022/5/2 22:55
 * @description
 */
public class QuartzMain {

    // 1、创建任务调度器(好比公交调度站)
    private static Scheduler createScheduler() throws SchedulerException {
        SchedulerFactory schedulerFactory = new StdSchedulerFactory();
        return schedulerFactory.getScheduler();
    }

    // 2、创建一个任务(好比某一个公交车的出行)
    private static JobDetail createJob() {
        JobBuilder jobBuilder = JobBuilder.newJob(MyJob.class);
        jobBuilder.withIdentity("jobName", "myJob");
        return jobBuilder.build();
    }

    // 3、创建任务的时间触发器(好比这个公交车的出行时间表)
    /**
     * cron表达式由七个位置组成,空格分隔
     * 示例:
     * 0 0 11 * * ? 每天的11点触发执行一次
     * 0 30 10 1 * ? 每月1号上午10点半触发执行一次
     * 1、Seconds(秒) 0~59
     * 2、Minutes(分) 0~59
     * 3、Hours(小时) 0~23
     * 4、Day of Month(天)1~31,注意有的月份不足31天 * 5、Month(月) 0~11,或者
     * JAN,FEB,MAR,APR,MAY,JUN,JUL,AUG,SEP,OCT,NOV,DEC
     * 6、Day of Week(周) 1~7,1=SUN或者 SUN,MON,TUE,WEB,THU,FRI,SAT
     * 7、Year(年)1970~2099 可选项
     */
    private static Trigger createTrigger() {
        // 定义cron表达式 每隔两秒执行一次
        String cronStr = "*/2 * * * * ?";
        // 创建时间触发器
        return TriggerBuilder.newTrigger()
                .withIdentity("triggerName", "myTrigger")
                .startNow()
                .withSchedule(CronScheduleBuilder.cronSchedule(cronStr)).build();
    }


    /**
     * main函数中开启定时任务
     */
    public static void main(String[] args) throws SchedulerException {
        // 1、创建任务调度器(好比公交调度站)
        Scheduler scheduler = QuartzMain.createScheduler();
        // 2、创建一个任务(好比某一个公交车的出行)
        JobDetail jobDetail = QuartzMain.createJob();
        // 3、创建任务的时间触发器(好比这个公交车的出行时间表)
        Trigger trigger = QuartzMain.createTrigger();
        // 4、使用任务调度器根据时间触发器来执行我们的任务(根据出行事件表发车)
        scheduler.scheduleJob(jobDetail,trigger);
        scheduler.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
package quartz;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/*
 * @since 2022/5/2 23:29
 * @description
 */
public class MyJob implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        System.out.println("定时任务执行啦~~~~~");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

效果如下:
在这里插入图片描述

4.5 分布式调度框架Elastic-Job

Elastic-Job介绍
Elastic-Job是当当网开源的一个分布式调度解决方案,基于Quartz二次开发的,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。我们要学习的是 Elastic-Job-Lite,它定位为轻量级无中心化解决方案,使用Jar包的形式提供分布式任务的协调服务,而Elastic-Job-Cloud子项目需要结合Mesos以及Docker在云环境下使用
Elastic-Job的github地址:https://github.com/elasticjob
主要功能介绍
分布式调度协调:在分布式环境中,任务能够按指定的调度策略执行,并且能够避免同一任务多实例重复执行
丰富的调度策略:基于成熟的定时任务作业框架Quartz cron表达式执行定时任务
弹性扩容缩容:当集群中增加某一个实例,它应当也能够被选举并执行任务;当集群减少一个实例时,它所执行的任务能被转移到别的实例来执行。
失效转移:某实例在任务执行失败后,会被转移到其他实例执行
错过执行作业重触发:若因某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业完成后自动触发。
支持并行调度:支持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行。
作业分片一致性:当任务被分片后,保证同一分片在分布式环境中仅一个执行实例。

Elastic-Job依赖于Zookeeper进行分布式协调,所以需要安装Zookeeper软件(3.4.6版本以上)

安装完Zookeeper后就可以引入Elastic-Job-Lite的jar包来进行实际的调度开发了
Elastic-Job使用
需求:每隔两秒钟执行一次定时任务(resume表中未归档的数据归档到resume_bak表中,每次归档1条记录)
准备下数据:
建两张表resume和resume_bak,表结构完全一致

CREATE TABLE `resume` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `sex` varchar(255) DEFAULT NULL,
  `phone` varchar(255) DEFAULT NULL,
  `address` varchar(255) DEFAULT NULL,
  `education` varchar(255) DEFAULT NULL,
  `state` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


CREATE TABLE `resume_bak` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `sex` varchar(255) DEFAULT NULL,
  `phone` varchar(255) DEFAULT NULL,
  `address` varchar(255) DEFAULT NULL,
  `education` varchar(255) DEFAULT NULL,
  `state` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

并且给resume表中初始化一些数据

drop procedure  test;
CREATE PROCEDURE test()
begin
    DECLARE n int DEFAULT 1;
    DECLARE sex_1 varchar(24) DEFAULT '男';
    DECLARE name_1 varchar(24) DEFAULT '';
    WHILE n < 1001
        DO
            if n % 2 = 0
            then
                set sex_1 = '女';
            else
                set sex_1 = '男';
            end if;
            if n % 4 = 0
            then
                set name_1 = '张三';
            elseif n % 4 = 1
            then
                set name_1 = '李四';
            elseif n % 4 = 2
            then
                set name_1 = '王五';
            elseif n % 4 = 3
            then
                set name_1 = '刘二';
            end if;

            insert into resume(name, sex, phone, address, education, state)
            values (name_1, sex_1, '17729380' + n, '北京', 'doctor', '未归档');
            set n = n + 1;
        END WHILE;
end;

call test();

update resume set education = if(id % 3 = 0,'bachelor',if(id % 3 = 1,'master','doctor'));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

数据准备好了开始写代码吧

引入我们需要的包

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.46</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

先拿一个之前写好的JdbcUtil

package utils;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/*
 * @since 2022/5/3 17:15
 * @description
 */
public class JdbcUtil {
    //url
    private static String url = "jdbc:mysql://localhost:3306/lagou?characterEncoding=utf8&useSSL=false";
    //user
    private static String user = "root";
    //password
    private static String password = "root";
    //驱动程序类
    private static String driver = "com.mysql.jdbc.Driver";

    static {
        try {
            Class.forName(driver);
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public static Connection getConnection() {
        try {
            return DriverManager.getConnection(url, user,
                    password);

        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return null;
    }

    public static void close(ResultSet rs, PreparedStatement ps, Connection con) {
        if (rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                if (ps != null) {
                    try {
                        ps.close();
                    } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } finally {
                        if (con != null) {
                            try {
                                con.close();
                            } catch (
                                    SQLException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        }
    }


    public static void executeUpdate(String sql, Object... obj) {
        Connection con = getConnection();
        PreparedStatement ps = null;
        try {
            ps = con.prepareStatement(sql);
            for (int i = 0; i < obj.length; i++) {
                ps.setObject(i + 1, obj[i]);
            }
            ps.executeUpdate();

        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            close(null, ps, con);
        }
    }

    public static List<Map<String, Object>> executeQuery(String sql, Object... obj) {
        Connection con = getConnection();
        ResultSet rs = null;
        PreparedStatement ps = null;
        try {
            ps = con.prepareStatement(sql);
            for (int i = 0; i < obj.length; i++) {
                ps.setObject(i + 1, obj[i]);
            }
            rs = ps.executeQuery();
            List<Map<String, Object>> list = new ArrayList<>();
            int count = rs.getMetaData().getColumnCount();
            while (rs.next()) {
                Map<String, Object> map = new HashMap<>();
                for (int i = 0; i < count; i++) {
                    Object ob = rs.getObject(i + 1);
                    String key = rs.getMetaData().getColumnName(i + 1);
                    map.put(key, ob);
                }
                list.add(map);
            }
            return list;
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            close(rs, ps, con);
        }
        return null;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123

定时任务类

package elasticjob;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import utils.JdbcUtil;

import java.util.List;
import java.util.Map;

/*
 * @since 2022/5/3 17:11
 * @description ElasticJobLite定时任务业务逻辑处理类
 */
public class ArchivieJob implements SimpleJob {

    /**
     * execute方法中放定时任务的主逻辑
     */
    @Override
    public void execute(ShardingContext shardingContext) {
        // 1、从resume表中查询一条记录(未归档)
        String selectSql = "select * from resume where state = '未归档' limit 1";
        List<Map<String, Object>> list = JdbcUtil.executeQuery(selectSql);
        if (list == null || list.size() == 0) {
            System.out.println("数据已经处理完毕!!!!");
            return;
        }
        // 2、'未归档'更改为'已归档'
        Map<String, Object> stringObjectMap = list.get(0);
        long id = (long) stringObjectMap.get("id");
        String name = (String) stringObjectMap.get("name");

        System.out.println("====>id:" + id + "name: " + name);
        String updateSql = "update resume set state = '已归档' where id = ?";
        JdbcUtil.executeUpdate(updateSql, id);

        // 3、归档这条记录,把这条数据插入到resume_bak表
        String insertSql = "insert into resume_bak select * from resume where id = ?";
        JdbcUtil.executeUpdate(insertSql, id);

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

最后是主类

package elasticjob;

import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

/*
 * @since 2022/5/3 17:59
 * @description
 */
public class ElasticJobMain {

    public static void main(String[] args) {
        // 配置分布式协调服务(注册中心)Zookeeper
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181","data-archive-job");
        CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        coordinatorRegistryCenter.init();

        // 配置任务(时间事件、定时任务业务逻辑、调度器)
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job", "*/2 * * * * ?", 1).build();
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,ArchivieJob.class.getName());

        // 调度器执行任务
        JobScheduler jobScheduler = new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build());
        jobScheduler.init();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

然后我们测试一下:
1)可先启动一个进程,然后再启动一个进程(两个进程模拟分布式环境下,通一个定时任务部署了两份在工作)
2)两个进程逐个启动,观察现象
在这里插入图片描述
在这里插入图片描述
可以看到的是第二个进程起来后第一个进程就没有动作了

3)关闭其中执行的进程,观察现象
在这里插入图片描述
可以看到关掉第二个进程后就自动切换回第一个进程了,这就是分布式调度的意义,保证只有一个实例在执行调度

需要知道的是发现新的实例切换、发现当前实例异常切换这些动作都是注册中心自动完成的,也就是zookeeper

Elastic-Job-Lite轻量级去中心化的特点
轻量级
1)只需要依赖一个zookeeper即可
2)不需要独立部署,引入jar包即可使用

去中心化
1)程序和jar包一样,唯一不一样的可能是分片:执行节点对等
2)定时调度自触发(没有中心调度节点分配)
3)服务自发现(通过注册中心的服务发现)
4)主节点非固定
在这里插入图片描述

任务分片:一个大的任务分成多个多个机器实例去处理(可以横向扩展处理能力)
一个大的非常耗时的作业Job,比如:一次要处理一亿的数据,那这一亿的数据存储在数据库中,如果用一个作业节点处理一亿数据要很久,在互联网领域是不太能接受的,互联网领域更希望机器的增加去横向扩展处理能力。所以,ElasticJob可以把作业分为多个的task(每一个task就是一个任务分片),每 一个task交给具体的一个机器实例去处理(一个机器实例是可以处理多个task的),但是具体每个task执行什么逻辑由我们自己来指定

在这里插入图片描述

Strategy策略定义这些分片项怎么去分配到各个机器上去,默认是平均去分,可以定制,比如某一个机器负载比较高或者预配置比较高,那么就可以写策略。分片和作业本身是通过一个注册中心协调的,因为在分布式环境下,状态数据肯定集中到一点,才可以在分布式中沟通
代码需要做一些修改:
添加分片参数:
在这里插入图片描述
接收分片参数:
在这里插入图片描述

弹性扩容
在这里插入图片描述
新增加一个运行实例app3,它会自动注册到注册中心,注册中心发现新的服务上线,注册中心会通知 ElasticJob 进行重新分片,那么总的分片项有多少,就可以搞多少个实例机器
比如完全可以分1000片,只要机器能承受住,完全可以分1000片,如果一台机器承受不住,那就多搞一些机器,1000个分片,那么就可以搞1000台机器一起执行作业
注意:
1)分片项也是一个JOB配置,修改配置,重新分片,在下一次定时运行之前会重新调用分片算法,那么这个分片算法的结果就是:哪台机器运行哪一个分片,这个结果存储到zk中的,主节点会把分片给分好放到注册中心去,然后执行节点从注册中心获取信息(执行节点在定时任务开启的时候获取相应的分片)。
2)如果所有的节点挂掉值剩下一个节点,所有分片都会指向剩下的一个节点,这也是ElasticJob的高可用。

我们看一下刚才代码改动后的结果:
总共三个实例:
在这里插入图片描述
启动第一个:三个分片在同一个实例中运行
在这里插入图片描述
启动第二个:重新分片,可以看到此时第一个实例只执行一个分片,第二个实例执行两个分片
在这里插入图片描述
在这里插入图片描述
在启动第三个:可以看到三个都只执行一个分片
在这里插入图片描述
关掉一个或者两个结果就会反过来将停止的实例正在执行的分片切换给其他的在执行的实例
好啦,就到这里啦

五、Session共享(一致性)问题

Session共享及Session保持或者叫做Session一致性

在这里插入图片描述

5.1 Session问题原因分析

出现这个问题的原因,从根本上来说是因为Http协议是无状态的协议。客户端和服务端在某次会话中产生的数据不会被保留下来,所以第二次请求服务端无法认识到你曾经来过, Http为什么要设计为无状态协议?早期都是静态⻚面无所谓有无状态,后来有动态的内容更丰富,就需要有状态,出现了两种用于保持Http状态的技术,那就是Cookie和Session。而出现上述不停让登录的问题,分析如下图:

首先:路由策略是默认的(轮询):
在这里插入图片描述

5.2 解决Session一致性的方案

1)Nginx的 IP_Hash 策略(可以使用)
同一个客户端IP的请求都会被路由到同一个目标服务器,也叫做会话粘滞
优点:
配置简单,不入侵应用,不需要额外修改代码
缺点:
服务器重启Session丢失
存在单点负载高的⻛险
单点故障问题
2)Session复制(不推荐)
也就是多个tomcat之间通过修改配置文件,达到Session之间的复制
在这里插入图片描述
优点:
不入侵应用
便于服务器水平扩展
能适应各种负载均衡策略
服务器重启或者宕机不会造成Session丢失
缺点:
性能低
内存消耗
不能存储太多数据,否则数据越多越影响性能
延迟性

3)Session共享,Session集中存储(推荐)
Session的本质就是缓存,那Session数据为什么不交给专业的缓存中间件呢?比如Redis

在这里插入图片描述
优点:
能适应各种负载均衡策略
服务器重启或者宕机不会造成Session丢失
扩展能力强
适合大集群数量使用
缺点:
对应用有入侵,引入了和Redis的交互代码
实际应用:
1)引入相关的jar包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.session</groupId>
    <artifactId>spring-session-data-redis</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2)配置redis

spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
  • 1
  • 2
  • 3

3)添加注解
在这里插入图片描述

原理如下:
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/290372
推荐阅读
相关标签
  

闽ICP备14008679号