当前位置:   article > 正文

分布式集群场景化解决方案

分布式集群场景化解决方案

分布式与集群

分布式和集群是不一样的,分布式一定是集群,但集群不一定是分布式。

分布式:

分布式把一个系统拆分成多个子系统,每个子系统负责各自的那部分功能,独立部署,各司其职。

集群:

集群就是多个实例共同工作,最简单/最常见的集群就是把一个应用复制多份部署。
在这里插入图片描述

一致性Hash算法

Hash算法应用场景

Hash算法在分布式中应用广泛,比如分布式集群架构Redis、Hadoop、ElasticSearch、MySQL分库分表、Nginx负载均衡等。

主要应用场景归纳起来就是两个:

  • 请求负载均衡(比如Nginx的ip_hash策略)

    Nginx的IP_Hash策略在客户端的IP不变的情况下,Nginx会将其发出请求始终转发到同一台目标服务器上,实现会话粘滞,避免处理session共享问题。

  • 分布式存储

    以分布式内存数据库Redis为例,集群中有Redis1、Redis2、Redis3三台服务器,在数据存储时,可根据key值取模服务器数量,使用取模余数确定存储到哪台Redis服务器节点。

普通Hash算法存在的问题

以IP_Hash为例,假设用户IP固定不变,服务器某个节点发生故障宕机,服务器数量发生改变,之前的所有求模计算都要重新进行。
在这里插入图片描述
在真实的生产情况下,后台服务器和客户端数量是很多的,缩容和扩容对系统的影响是很大的,大量用户的请求会重新被路由到其他的目标服务器,导致用户在原来服务器上的会话丢失不可用。

一致性hash算法

一致性hash算法思路如下:
在这里插入图片描述
首先一条直线,直线的开头个结尾分别是0和2的32次方减1。 这相当于⼀个地址,对于这样⼀条 线,弯过来构成⼀个圆环形成闭环,这样的⼀个圆环称为hash环。我们把服务器的ip或者主机名求 hash值然后对应到hash环上,那么客户访问的时候,根据自身IP地址求hash值,对应到环上某个位 置,然后按照顺时针⽅向找最近的服务器节点 作为目标服务器。
在这里插入图片描述
采用一致性Hash算法,此时如果某个节点宕机下线之后,影响的只有小部分用户;比如当节点3的服务器宕机下线,影响的只是客户端IP地址hash值落在节点2和节点3之间的用户,其他用户将不受影响。

同理,增加服务器数量的时候也是影响一小部分用户。如下图,当增加服务器5之后,影响的只有节点2到节点5之间的用户。
在这里插入图片描述
总结:

综上所述,每一台服务器只负责一小段,一致性Hash算法对于节点的增加和减少都只需重新定位hash环上的一小段数据,具有良好的容错性和扩展性。

但是,当一致性hash算法在服务节点太少时,容易因为节点分布不均匀而造成数据倾斜问题,例如系统中只有两台服务器,其环分布如下图,阶段2只负责一小段数据,大量请求落在服务节点1上,这就是数据(请求)倾斜问题。
在这里插入图片描述
解决办法:

为了解决数据(请求)倾斜问题,一致性Hash算法引入了虚拟节点机制,即对每个服务器节点计算多个Hash值,每个计算节点都放在Hash环上,称为虚拟节点。

具体做法,可以在服务器IP或主机名称后面增加编号来实现。比如 可以为每台服务器计算三个虚拟节 点,于是可以分别计算 “节点1的ip#1”、“节点1的ip#2”、“节点1的ip#3”、“节点2的ip#1”、“节点2的 ip#2”、“节点2的ip#3”的哈希值,于是形成六个虚拟节点,当客户端被路由到虚拟节点的时候其实是被 路由到该虚拟节点所对应的真实节点 。

手写实现一致性Hash算法

普通Hash算法

/**
* 普通Hash算法实现
*/
public class GeneralHash {
     public static void main(String[] args) {
         // 定义客户端IP
         String[] clients = new String[]{"10.78.12.3","113.25.63.1","126.12.3.8"};
         // 定义服务器数量
         int serverCount = 5;// (编号对应0,1,2)
         // hash(ip)%node_counts=index
         //根据index锁定应该路由到的tomcat服务器
         for(String client: clients) {
             int hash = Math.abs(client.hashCode());
             int index = hash%serverCount;
             System.out.println("客户端:" + client + " 被路由到服务器编号为:"+ index);
         }
     }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

一致性Hash算法(不含虚拟节点)

import java.util.SortedMap;
import java.util.TreeMap;
public class ConsistentHashNoVirtual {
    public static void main(String[] args) {
        //step1 初始化:把服务器节点IP的哈希值对应到哈希环上
        // 定义服务器ip
        String[] tomcatServers = new String[]
                {"123.111.0.0","123.101.3.1","111.20.35.2","123.98.26.3"};
        //按照Key值排序的Map集合
        SortedMap<Integer,String> hashServerMap = new TreeMap<>();
        for(String tomcatServer: tomcatServers) {
            // 求出每⼀个ip的hash值,对应到hash环上,存储hash值与ip的对应关系
            int serverHash = Math.abs(tomcatServer.hashCode());
            // 存储hash值与ip的对应关系
            hashServerMap.put(serverHash,tomcatServer);
        }
        //step2 针对客户端IP求出hash值
        // 定义客户端IP
        String[] clients = new String[]
                {"10.78.12.3","113.25.63.1","126.12.3.8"};
        for(String client : clients) {
            int clientHash = Math.abs(client.hashCode());
            //step3 针对客户端,找到能够处理当前客户端请求的服务器(哈希环上顺时针最
            近)
            // 根据客户端ip的哈希值去找出哪⼀个服务器节点能够处理()
                //tailMap方法找到key值比clientHash大的 所有集合
            SortedMap<Integer, String> integerStringSortedMap =
                    hashServerMap.tailMap(clientHash);
            if(integerStringSortedMap.isEmpty()) {
                // 取哈希环上的顺时针第⼀台服务器
                Integer firstKey = hashServerMap.firstKey();
                System.out.println("==========>>>>客户端:" + client + " 被路由到服务器:" + hashServerMap.get(firstKey));
            }else{
                Integer firstKey = integerStringSortedMap.firstKey();
                System.out.println("==========>>>>客户端:" + client + " 被路由到服务器:" + hashServerMap.get(firstKey));
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

一致性Hash算法(含虚拟节点)

import java.util.SortedMap;
import java.util.TreeMap;

public class ConsistentHashWithVirtual {
    public static void main(String[] args) {
        //step1 初始化:把服务器节点IP的哈希值对应到哈希环上
        // 定义服务器ip
        String[] tomcatServers = new String[]
                {"123.111.0.0", "123.101.3.1", "111.20.35.2", "123.98.26.3"};
        SortedMap<Integer, String> hashServerMap = new TreeMap<>();
        // 定义针对每个真实服务器虚拟出来⼏个节点
        int virtaulCount = 3;
        for (String tomcatServer : tomcatServers) {
            // 求出每⼀个ip的hash值,对应到hash环上,存储hash值与ip的对应关系
            int serverHash = Math.abs(tomcatServer.hashCode());
            // 存储hash值与ip的对应关系
            hashServerMap.put(serverHash, tomcatServer);
            // 处理虚拟节点
            for (int i = 0; i < virtaulCount; i++) {
                int virtualHash = Math.abs((tomcatServer + "#" +
                        i).hashCode());
                hashServerMap.put(virtualHash, "----由虚拟节点" + i + "映射过来的请求:"+ tomcatServer);
            }
        }
        //step2 针对客户端IP求出hash值
        // 定义客户端IP
        String[] clients = new String[]
                {"10.78.12.3", "113.25.63.1", "126.12.3.8"};
        for (String client : clients) {
            int clientHash = Math.abs(client.hashCode());
            //step3 针对客户端,找到能够处理当前客户端请求的服务器(哈希环上顺时针最近)
            // 根据客户端ip的哈希值去找出哪⼀个服务器节点能够处理()
            SortedMap<Integer, String> integerStringSortedMap =
                    hashServerMap.tailMap(clientHash);
            if (integerStringSortedMap.isEmpty()) {
                // 取哈希环上的顺时针第⼀台服务器
                Integer firstKey = hashServerMap.firstKey();
                System.out.println("==========>>>>客户端:" + client + " 被路由到服务器:" + hashServerMap.get(firstKey));
            } else {
                Integer firstKey = integerStringSortedMap.firstKey();
                System.out.println("==========>>>>客户端:" + client + " 被路由到服务器:" + hashServerMap.get(firstKey));
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

Nginx配置一致性Hash负载均衡策略

Nginx自带IP_Hash策略,但是原生不支持一致性Hash负载均衡策略。

ngx_http_upstream_consistent_hash 模块是一个负载均衡器,使用内部的一致性Hash算法来选择合适的端点。

该模块可根据配置不同的参数来采取不同的方式将请求均匀映射到后台服务器:

consistent_hash $remote_addr: 根据客户端IP地址映射;

consistent_hash $request_uri: 根据客户端请求路径映射;

consistent_hash $args: 根据客户端请求携带参数进行映射。

ngx_http_upstream_consistent_hash是一个第三方模块,我们需要下载并安装才能使用。

ngx_http_upstream_consistent_hash下载安装

1) github下载nginx⼀致性hash负载均衡模块 https://github.com/replay/ngx_http_consistent_hash
在这里插入图片描述
2)将下载的压缩包上传到Nginx服务器上,并解压。

3) 我们已经编译安装过nginx,此时进⼊当时nginx的源码⽬录,执⾏如下命令

./configure —add-module=/root/ngx_http_consistent_hash-master

make

make install

4) Nginx就可以使⽤啦,在nginx.conf⽂件中配置

#负载均衡配置
upstream lagouserver{
	 consistent_hash $remote_addr;
	 server 127.0.0.1:8080;
	 server 127.0.0.1:8081;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

集群时钟同步问题

时钟不同步导致的问题

举⼀个例⼦,电商⽹站业务中,新增⼀条订单,那么势必会在订单表中增加了⼀条记录,该条记录中应 该会有“下单时间”这样的字段,往往我们会在程序中获取当前系统时间插⼊到数据库或者直接从数据库 服务器获取时间。那我们的订单⼦系统是集群化部署,或者我们的数据库也是分库分表的集群化部署, 然⽽他们的系统时钟缺不⼀致,
有⼀台服务器的时间是昨天,那么这个时候下单时间就成了昨天, 那我们的数据将会混乱!如下 :
⽐如

集群时钟同步配置

集群时钟同步思路

  • 分布式集群中各个服务器都嫩连接互联网

    各个节点服务器向互联网时间服务器同步时间。
    在这里插入图片描述
    操作方式:

#使⽤ ntpdate ⽹络时间同步命令
#从⼀个时间服务器同步时间
ntpdate -u ntp.api.bz 
  • 1
  • 2
  • 3

可以采用系统的定时任务,每隔一段时间去执行一次ntpdate命令。

  • 分布式服务器节点中有一个服务器节点可以连接互联网或者都不能访问互联网

    各服务器节点以其中一台服务器作为时间服务器同步时间
    在这里插入图片描述
    操作方式:

1)选取集群中的⼀个服务器节点A(172.17.0.17)作为时间服务器(整个集群时间从这台服务 器同步,如果这台服务器能够访问互联⽹,可以让这台服务器和⽹络时间保持同步,如果不 能就⼿动设置⼀个时间) 。

  • 首先设置好A服务器的时间

  • 把A配置为时间服务器(修改/etc/ntp.conf⽂件)

1、如果有 restrict default ignore,注释掉它
2、添加如下⼏⾏内容
 restrict 172.17.0.0 mask 255.255.255.0 nomodify notrap # 放开局
域⽹同步功能,172.17.0.0是你的局域⽹⽹段
 server 127.127.1.0 # local clock
 fudge 127.127.1.0 stratum 10
3、重启⽣效并配置ntpd服务开机⾃启动
 service ntpd restart
 chkconfig ntpd on
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 集群中其他节点就可以从A服务器同步时间了
ntpdate 172.17.0.17
  • 1

分布式ID解决方案

在分布式集群环境下,如何保证ID的全局唯一性和自增。分库分表中如何保证表主键的连续。
在这里插入图片描述

解决办法

  • UUID(可以使用)

    UUID中文翻译过来是通用唯一识别码,可以使用Java.util包中额工具类尝试UUID。

    public class MyTest {
         public static void main(String[] args) {
            System.out.println(java.util.UUID.randomUUID().toString());
         }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 独立数据库自增ID

    比如A表分为A1表和A2表,在设计表的时候我们不能让A1表和A2表的数据表主键ID同时自增,我们可以单独创建一个数据库,在数据库中创建一张表,让其主键ID自增,在其他地方需要全局自增主键ID的时候,从这张表中新增一条数据并返回其主键ID,此ID作为全局自增主键。

    比如 我们创建了⼀个数据库实例global_id_generator,在其中创建了⼀个数据表,表结构如 下:

    -- ----------------------------
    -- Table structure for DISTRIBUTE_ID
    -- ----------------------------
    DROP TABLE IF EXISTS `DISTRIBUTE_ID`;
    CREATE TABLE `DISTRIBUTE_ID` (
     `id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '主键',
     `createtime` datetime DEFAULT NULL,
     PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    当分布式集群环境中哪个应⽤需要获取⼀个全局唯⼀的分布式ID的时候,就可以使⽤代码连接这个 数据库实例,执⾏如下sql语句即可。

    insert into DISTRIBUTE_ID(createtime) values(NOW());
    select LAST_INSERT_ID()
    • 1
    • 2

    注意:

    1)这里的createtime毫无意义, 是为了随便插⼊⼀条数据以⾄于能够⾃增id。

    2) 使⽤独⽴的Mysql实例⽣成分布式id,虽然可⾏,但是性能和可靠性都不够好,因为你需要代 码连接到数据库才能获取到id,性能⽆法保障,另外mysql数据库实例挂掉了,那么就⽆法获取分 布式id了。

    3) 有⼀些开发者⼜针对上述的情况将⽤于⽣成分布式id的mysql数据库设计成了⼀个集群架构, 那么其实这种⽅式现在基本不⽤,因为过于麻烦了。

  • SnowFlake雪花算法(可以用,推荐)

    SnowFlake雪花算法是Twitter公司退出的一个用于分布式ID策略。

    雪花算法是⼀个算法,基于这个算法可以⽣成ID,⽣成的ID是⼀个long型,那么在Java中⼀个long 型是8个字节,算下来是64bit,如下是使⽤雪花算法⽣成的⼀个ID的⼆进制形式示意:
    在这里插入图片描述
    另外,一些互联网公司也基于SnowFlake雪花算法进行第二次开发生成了一些分布式ID生成器,比如滴滴的tinyid(基于数据库实现)、 百度的uidgenerator(基于SnowFlake)和美团的leaf(基于数据库和SnowFlake) 等。

  • 借助于Redis的Incr命令获取全局唯一ID(推荐)

    Redis的Incr命令是将Key中存储的数字增一,如果key值不存在,那么key的值会被初始化为0,然后再执行Incr自增操作。
    在这里插入图片描述
    操作:

  • Java代码中使⽤Jedis客户端调⽤Reids的incr命令获得⼀个全局的id

  • 引⼊jedis客户端jar

<dependency>
 <groupId>redis.clients</groupId>
 <artifactId>jedis</artifactId>
 <version>2.9.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • Java代码(此处我们就是连接单节点,也不使⽤连接池)
Jedis jedis = new Jedis("127.0.0.1",6379);
try {
 long id = jedis.incr("id");
 System.out.println("从redis中获取的分布式id为:" + id);
} finally {
 if (null != jedis) {
 jedis.close();
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

分布式调度问题

调度就是定时任务,分布式调度就是在分布式集群环境下定时任务。

什么是分布式调度

分布式调度有两层含义:

1)运行在分布式环境下的调度任务(同一个定时任务部署多份,应该只有一个定时任务生效)

2)分布式调度–>定时任务额分布式–>定时任务的拆分(即把一个大的作业拆分成多个小的作业任务,同时执行)。
在这里插入图片描述

定时任务与消息队列的区别

  • 共同点

    • 异步处理

    • 应用解耦

      不管定时任务作业还是MQ都可以作为两个应⽤之间的⻮轮实现应⽤解耦,这个⻮轮可以中转 数据,当然单体服务不需要考虑这些,服务拆分的时候往往都会考虑

    • 流量削峰

      双⼗⼀的时候,任务作业和MQ都可以⽤来扛流量,后端系统根据服务能⼒定时处理订单或者 从MQ抓取订单抓取到⼀个订单到来事件的话触发处理,对于前端⽤户来说看到的结果是已经 下单成功了,下单是不受任何影响的

  • 本质不同

    • 定时任务是时间驱动,MQ是事件驱动

      时间驱动是不可代替的,⽐如⾦融系统每⽇的利息结算,不是说利息来⼀条(利息到来事件)就算 ⼀下,⽽往往是通过定时任务批量计算;

所以定时任务更趋向于批处理,MQ趋向于逐条处理。

定时任务的实现方式

早期会使用JDK中的Timer机制和多线程机制(Runnable+线程休眠)来实现定时或者间隔一段时间执行某一段程序;后来才出现的定时任务框架,比较出名的就是Quartz框架。Quartz框架采用Corn时间表达式(包括:秒、分、时、天、月、年、周)配置某一个任务的执行时机。

Quartz定时任务框架回顾

  • 引入jar包
<!--任务调度框架quartz-->
<!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
<dependency>
     <groupId>org.quartz-scheduler</groupId>
     <artifactId>quartz</artifactId>
     <version>2.3.2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 定时任务作业主调度程序
package quartz;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

public class QuartzMain {
    // 创建作业任务调度器(类似于公交调度站)
    public static Scheduler createScheduler() throws
            SchedulerException {
        SchedulerFactory schedulerFactory = new StdSchedulerFactory();
        Scheduler scheduler = schedulerFactory.getScheduler();
        return scheduler;
    }

    // 创建⼀个作业任务(类似于⼀辆公交⻋)
    public static JobDetail createJob() {
        JobBuilder jobBuilder = JobBuilder.newJob(DemoJob.class);
        jobBuilder.withIdentity("jobName", "myJob");
        JobDetail jobDetail = jobBuilder.build();
        return jobDetail;
    }

    /**
     * 创建作业任务时间触发器(类似于公交⻋出⻋时间表)
     * cron表达式由七个位置组成,空格分隔
     * 1、Seconds(秒) 0~59
     * 2、Minutes(分) 0~59
     * 3、Hours(⼩时) 0~23
     * 4、Day of Month(天)1~31,注意有的⽉份不⾜31天
     * 5、Month(⽉) 0~11,或者
     * JAN,FEB,MAR,APR,MAY,JUN,JUL,AUG,SEP,OCT,NOV,DEC
     * 6、Day of Week(周) 1~7,1=SUN或者 SUN,MON,TUE,WEB,THU,FRI,SAT
     * 7、Year(年)1970~2099 可选项
     * 示例:
     * 0 0 11 * * ? 每天的11点触发执⾏⼀次
     * 0 30 10 1 * ? 每⽉1号上午10点半触发执⾏⼀次
     */
    public static Trigger createTrigger() {
        // 创建时间触发器,按⽇历调度
        CronTrigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("triggerName", "myTrigger")
                .startNow()
                .withSchedule(CronScheduleBuilder.cronSchedule("0/2 *
                        * * * ? "))
                .build();
        // 创建触发器,按简单间隔调度
         /*SimpleTrigger trigger1 = TriggerBuilder.newTrigger()
         .withIdentity("triggerName","myTrigger")
         .startNow()
         .withSchedule(SimpleScheduleBuilder
         .simpleSchedule()
        .withIntervalInSeconds(3)
        .repeatForever())
         .build();*/
        return trigger;
    }

    // 定时任务作业主调度程序
    public static void main(String[] args) throws SchedulerException {
        // 创建⼀个作业任务调度器(类似于公交调度站)
        Scheduler scheduler = QuartzMain.createScheduler();
        // 创建⼀个作业任务(类似于⼀辆公交⻋)
        JobDetail job = QuartzMain.createJob();
        // 创建⼀个作业任务时间触发器(类似于公交⻋出⻋时间表)
        Trigger trigger = QuartzMain.createTrigger();
        // 使⽤调度器按照时间触发器执⾏这个作业任务
        scheduler.scheduleJob(job, trigger);
        scheduler.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 定义一个Job,需要实现JOb接口
package quartz;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
public class DemoJob implements Job {
    public void execute(JobExecutionContext jobExecutionContext)
            throws JobExecutionException {
        System.out.println("我是⼀个定时任务逻辑");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

分布式调度框架Elastic-Job

Elastic-Jon介绍

Elastic-Job是当当网开源的一个分布式调度解决方案,是基于Quartz二次开发的,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。我们主要介绍Elastic-Job-Lite,它是轻量级无中心化解决方案, 使⽤Jar包的形式提供分布式任务的协调服务,⽽Elastic-Job-Cloud⼦项⽬需要结合Mesos 以及Docker在云环境下使⽤。

主要功能

  • 分布式调度

    在分布式环境中,任务能按照指定的策略去执行,并且能够避免同一任务多实例重复执行。

  • 丰富的调度策略

    由于急于优秀的任务调度矿机Quartz,所以Elastic-Job具有丰富的调度策略。

  • 弹性扩容缩容

    当集群中增加某⼀个实例,它应当也能够被选举并执⾏任务;当集群减少⼀个实例 时,它所执⾏的任务能被转移到别的实例来执⾏。

  • 失效转移

    某实例在任务执⾏失败后,会被转移到其他实例执⾏ 。

  • 错过执行作业重触发

    若因某种原因导致作业错过执⾏,⾃动记录错过执⾏的作业,并在上次作业 完成后⾃动触发。

  • 支持并行调度

    ⽀持任务分⽚,任务分⽚是指将⼀个任务分为多个⼩任务项在多个实例同时执⾏。

  • 作业分片一致性

    当任务被分⽚后,保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。

Elastic-Job-Lite的应用

Elastic-Job依赖于Zookeeper进⾏分布式协调,所以需要安装Zookeeper软件(3.4.6版本以上)。

  • 安装zookeeper
  • 引入jar包
<!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-core
-->
<dependency>
     <groupId>com.dangdang</groupId>
     <artifactId>elastic-job-lite-core</artifactId>
     <version>2.1.5</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 创建定时任务实例

    • 需求: 每隔两秒钟执⾏⼀次定时任务(resume表中未归档的数据归档到resume_bak表中, 每次归档1条记录)

      1)resume_bak和resume表结构完全⼀样

      2)resume表中数据归档之后不删除,只将state置为"已归档"

    • 数据表结构

    -- ----------------------------
    -- Table structure for resume
    -- ----------------------------
    DROP TABLE IF EXISTS `resume`;
    CREATE TABLE `resume` (
     `id` bigint(20) NOT NULL AUTO_INCREMENT,
     `name` varchar(255) DEFAULT NULL,
     `sex` varchar(255) DEFAULT NULL,
     `phone` varchar(255) DEFAULT NULL,
     `address` varchar(255) DEFAULT NULL,
     `education` varchar(255) DEFAULT NULL,
     `state` varchar(255) DEFAULT NULL,
     PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8;
    SET FOREIGN_KEY_CHECKS = 1;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 程序开发

      • 定时任务类

        package elasticjob;
        
        import com.dangdang.ddframe.job.api.ShardingContext;
        import com.dangdang.ddframe.job.api.simple.SimpleJob;
        import util.JdbcUtil;
        
        import java.sql.Connection;
        import java.sql.PreparedStatement;
        import java.sql.ResultSet;
        import java.util.List;
        import java.util.Map;
        
        public class BackupJob implements SimpleJob {
            // 定时任务每执⾏⼀次都会执⾏如下的逻辑
            @Override
            public void execute(ShardingContext shardingContext) {
         /*
         从resume数据表查找1条未归档的数据,将其归档到resume_bak
        表,并更新状态为已归档(不删除原数据)
         */
                // 查询出⼀条数据
                String selectSql = "select * from resume where
                state = '未归档' limit 1 ";
                List<Map<String, Object>> list =
                        JdbcUtil.executeQuery(selectSql);
                if (list == null || list.size() == 0) {
                    return;
                }
                Map<String, Object> stringObjectMap = list.get(0);
                long id = (long) stringObjectMap.get("id");
                String name = (String) stringObjectMap.get("name");
                String education = (String)
                        stringObjectMap.get("education");
                // 打印出这条记录
                System.out.println("======>>>id:" + id + " name:" +
                        name + " education:" + education);
                // 更改状态
                String updateSql = "update resume set state='已归档'
                where id =?";
                JdbcUtil.executeUpdate(updateSql, id);
                // 归档这条记录
                String insertSql = "insert into resume_bak select *
                from resume where id =?";
                JdbcUtil.executeUpdate(insertSql, id);
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44
        • 45
        • 46
        • 主类
        package elasticjob;
        
        import com.dangdang.ddframe.job.config.JobCoreConfiguration;
        import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
        import com.dangdang.ddframe.job.lite.api.JobScheduler;
        import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
        import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
        import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
        import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
        
        public class ElasticJobMain {
            public static void main(String[] args) {
                // 配置注册中⼼zookeeper,zookeeper协调调度,不能让任务重复执⾏,通过命名空间分类管理任务,对应到zookeeper的⽬录
                ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "data-archive-job");
                CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
                coordinatorRegistryCenter.init();
                // 配置任务 1表示不分片
                JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job", "*/2 * * * * ? ",1).build();
                SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, BackupJob.class.getName());
                // 启动任务
                new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build()).init();
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • JDBCUtil工具类
        package elasticjob;
        
        import java.sql.*;
        import java.util.ArrayList;
        import java.util.HashMap;
        import java.util.List;
        import java.util.Map;
        
        public class JdbcUtil {
            //url
            private static String url = "jdbc:mysql://localhost:3306/job?characterEncoding=utf8&useSSL=false";
            //user
            private static String user = "root";
            //password
            private static String password = "123456";
            //驱动程序类
            private static String driver = "com.mysql.jdbc.Driver";
        
            static {
                try {
                    Class.forName(driver);
                } catch (ClassNotFoundException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        
            public static Connection getConnection() {
        
                try {
                    return DriverManager.getConnection(url, user, password);
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return null;
        
            }
        
            public static void close(ResultSet rs, PreparedStatement ps, Connection con) {
                if (rs != null) {
                    try {
                        rs.close();
                    } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } finally {
                        if (ps != null) {
                            try {
                                ps.close();
                            } catch (SQLException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            } finally {
                                if (con != null) {
                                    try {
                                        con.close();
                                    } catch (SQLException e) {
                                        // TODO Auto-generated catch block
                                        e.printStackTrace();
                                    }
                                }
                            }
                        }
                    }
                }
            }
        
        
            /***
             * DML操作(增删改)
             * 1.获取连接数据库对象
             * 2.预处理
             * 3.执行更新操作
             * @param sql
             * @param obj
             */
            //调用者只需传入一个sql语句,和一个Object数组。该数组存储的是SQL语句中的占位符
            public static void executeUpdate(String sql,Object...obj) {
                Connection con = getConnection();//调用getConnection()方法连接数据库
                PreparedStatement ps = null;
                try {
                    ps = con.prepareStatement(sql);//预处理
                    for (int i = 0; i < obj.length; i++) {//预处理声明占位符
                        ps.setObject(i + 1, obj[i]);
                    }
                    ps.executeUpdate();//执行更新操作
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } finally {
                    close(null, ps, con);//调用close()方法关闭资源
                }
            }
        
        
        
            /***
             * DQL查询
             * Result获取数据集
             *
             * @param sql
             * @param obj
             * @return
             */
            public static List<Map<String,Object>> executeQuery(String sql, Object...obj) {
                Connection con = getConnection();
                ResultSet rs = null;
                PreparedStatement ps = null;
                try {
                    ps = con.prepareStatement(sql);
                    for (int i = 0; i < obj.length; i++) {
                        ps.setObject(i + 1, obj[i]);
                    }
                    rs = ps.executeQuery();
                    //new 一个空的list集合用来存放查询结果
                    List<Map<String, Object>> list = new ArrayList<>();
                    //获取结果集的列数
                    int count = rs.getMetaData().getColumnCount();
                    //对结果集遍历每一条数据是一个Map集合,列是k,值是v
                    while (rs.next()) {
                        //一个空的map集合,用来存放每一行数据
                        Map<String, Object> map = new HashMap<String, Object>();
                        for (int i = 0; i < count; i++) {
                            Object ob = rs.getObject(i + 1);//获取值
                            String key = rs.getMetaData().getColumnName(i + 1);//获取k即列名
                            map.put(key, ob);
                        }
                        list.add(map);
                    }
                    return list;
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } finally {
        
                    close(rs, ps, con);
                }
        
                return null;
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44
        • 45
        • 46
        • 47
        • 48
        • 49
        • 50
        • 51
        • 52
        • 53
        • 54
        • 55
        • 56
        • 57
        • 58
        • 59
        • 60
        • 61
        • 62
        • 63
        • 64
        • 65
        • 66
        • 67
        • 68
        • 69
        • 70
        • 71
        • 72
        • 73
        • 74
        • 75
        • 76
        • 77
        • 78
        • 79
        • 80
        • 81
        • 82
        • 83
        • 84
        • 85
        • 86
        • 87
        • 88
        • 89
        • 90
        • 91
        • 92
        • 93
        • 94
        • 95
        • 96
        • 97
        • 98
        • 99
        • 100
        • 101
        • 102
        • 103
        • 104
        • 105
        • 106
        • 107
        • 108
        • 109
        • 110
        • 111
        • 112
        • 113
        • 114
        • 115
        • 116
        • 117
        • 118
        • 119
        • 120
        • 121
        • 122
        • 123
        • 124
        • 125
        • 126
        • 127
        • 128
        • 129
        • 130
        • 131
        • 132
        • 133
        • 134
        • 135
        • 136
        • 137
        • 138
        • 139
        • 140
        • 141
        • 142

        先开一个进程执行程序,再开一个进程执行程序,发现有一个定时任务停止不执行,始终只有一个定时任务执行。

        Leader节点选举机制 :

        每个Elastic-Job的任务执⾏实例App作为Zookeeper的客户端来操作ZooKeeper的znode

        (1)多个实例同时创建/leader节点

        (2)/leader节点只能创建⼀个,后创建的会失败,创建成功的实例会被选为leader节点, 执⾏任务

Elastic-Job-Lite轻量级去中心化的特点

在这里插入图片描述

Elastic-Job-Lite任务分片

当一个大的非常耗时的作业Job, ⽐如:⼀次要处理⼀亿的数据,那这⼀亿的数据存储在数据库中,如果 ⽤⼀个作业节点处理⼀亿数据要很久,在互联⽹领域是不太能接受的,互联⽹领域更希望机器的增加去 横向扩展处理能⼒。所以,ElasticJob可以把作业分为多个的task(每⼀个task就是⼀个任务分⽚),每 ⼀个task交给具体的⼀个机器实例去处理(⼀个机器实例是可以处理多个task的),但是具体每个task 执⾏什么逻辑由我们⾃⼰来指定。
在这里插入图片描述
Strategy策略定义这些分⽚项怎么去分配到各个机器上去,默认是平均去分,可以定制,⽐如某⼀个机 器负载 ⽐较⾼或者预配置⽐较⾼,那么就可以写策略。分⽚和作业本身是通过⼀个注册中⼼协调的,因 为在分布式环境下,状态数据肯定集中到⼀点,才可以在分布式中沟通。


public class ElasticJobMain {

    public static void main(String[] args) {
        // 配置分布式协调服务(注册中心)Zookeeper
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181","data-archive-job");
        CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        coordinatorRegistryCenter.init();

        // 配置任务(时间事件、定时任务业务逻辑、调度器)
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
            //分3片
                .newBuilder("archive-job", "*/2 * * * * ?", 3)
            //按学位不同分片处理 
                .shardingItemParameters("0=bachelor,1=master,2=doctor").build();
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,ArchivieJob.class.getName());

        JobScheduler jobScheduler = new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build());
        jobScheduler.init();


    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
package elasticjob;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;

import java.util.List;
import java.util.Map;

/**
 * ElasticJobLite定时任务业务逻辑处理类
 */
public class ArchivieJob implements SimpleJob {

    /**
     * 需求:resume表中未归档的数据归档到resume_bak表中,每次归档1条记录
     * execute方法中写我们的业务逻辑(execute方法每次定时任务执行都会执行一次)
     * @param shardingContext
     */
    @Override
    public void execute(ShardingContext shardingContext) {
        int shardingItem = shardingContext.getShardingItem();
        System.out.println("=====>>>>当前分片:" + shardingItem);

        // 获取分片参数// 0=bachelor,1=master,2=doctor
        String shardingParameter = shardingContext.getShardingParameter(); 

        // 1 从resume表中查询出1条记录(未归档)
        String selectSql = "select * from resume where state='未归档' and education='"+ shardingParameter +"' limit 1";
        List<Map<String, Object>> list = JdbcUtil.executeQuery(selectSql);
        if(list == null || list.size() ==0 ) {
            System.out.println("数据已经处理完毕!!!!!!");
            return;
        }
        // 2 "未归档"更改为"已归档"
        Map<String, Object> stringObjectMap = list.get(0);
        long id = (long) stringObjectMap.get("id");
        String name = (String) stringObjectMap.get("name");
        String education = (String) stringObjectMap.get("education");

        System.out.println("=======>>>>id:" + id + "  name:" + name + " education:" + education);

        String updateSql = "update resume set state='已归档' where id=?";
        JdbcUtil.executeUpdate(updateSql,id);

        // 3 归档这条记录,把这条记录插入到resume_bak表
        String insertSql = "insert into resume_bak select * from resume where id=?";
        JdbcUtil.executeUpdate(insertSql,id);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

分布式集群中的Session共享问题

在这里插入图片描述
第一次请求Nginx将请求转发到Tomcat1上登录成功,session保存在Tomcat1服务器上,当第二次请求来临,Nginx会默认将请求转发到Tomcat2上,此时Tomcat2上找不到session,就会重定向到登录页面。
在这里插入图片描述

Session不共享解决方案

采用Nginx的IP_Hash策略(可以使用)

只要客户端的IP地址不变,Nginx采用IP_Hash策略,请求会始终被分配到同一台服务器上。

优点:

​ 配置简单,不⼊侵应⽤,不需要额外修改代码 。

缺点:

​ 服务器重启Session丢失 ;

​ 存在单点负载⾼的⻛险 ;

​ 单点故障问题 。

Session共享,集中存储(推荐)

Session的本质就是缓存, 那么Session可以交给Redis集中存储。
在这里插入图片描述
优点:

​ 能适应各种负载均衡策略 ;

​ 服务器重启或者宕机不会造成Session丢失 ;

​ 扩展能⼒强 ;

​ 适合⼤集群数量使⽤ ;

缺点:

​ 对应⽤有⼊侵,引⼊了和Redis的交互代码

Spring Session使得基于Redis的Session共享应⽤起来⾮常之简单

  • 引入jar
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.session</groupId>
     <artifactId>spring-session-data-redis</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 配置Redis
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
  • 1
  • 2
  • 3
  • 启动类添加注解
@SpringBootApplication
@EnableCaching
@EnableRedisHttpSession//开启Redis Session
public class LoginApplication  extends SpringBootServletInitializer {

    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
        return builder.sources(LoginApplication.class);
    }

    public static void main(String[] args) {
        SpringApplication.run(LoginApplication.class, args);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

ession丢失 ;

​ 扩展能⼒强 ;

​ 适合⼤集群数量使⽤ ;

缺点:

​ 对应⽤有⼊侵,引⼊了和Redis的交互代码

Spring Session使得基于Redis的Session共享应⽤起来⾮常之简单

  • 引入jar
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.session</groupId>
     <artifactId>spring-session-data-redis</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 配置Redis
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
  • 1
  • 2
  • 3
  • 启动类添加注解
@SpringBootApplication
@EnableCaching
@EnableRedisHttpSession//开启Redis Session
public class LoginApplication  extends SpringBootServletInitializer {

    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
        return builder.sources(LoginApplication.class);
    }

    public static void main(String[] args) {
        SpringApplication.run(LoginApplication.class, args);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/290374
推荐阅读
相关标签
  

闽ICP备14008679号