赞
踩
目录
RandomLoadBalance即随机调用实现负载均衡是Dubbo默认的一种策略,加权随机算法对provider不同实例设置不同的权重,按照权重来负载均衡,权重越大分配流量越高,一般用这个默认的就可以了。
算法思想
假设有一组服务器servers = [A, B, C]
,他们对应的权重为weights = [5, 3, 2]
,权重总和为 10。现在把这些权重值平铺在一维坐标值上,[0, 5)
区间属于服务器A,[5, 8)
区间属于服务器 B,[8, 10)
区间属于服务器C。接下来通过随机数生成器生成一个范围在[0, 10)
之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器A对应的区间上,此时返回服务器A即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。比如经过一万次选择后,服务器A被选中的次数大约为5000次,服务器B被选中的次数约为3000次,服务器C被选中的次数约为2000次。
8核 + 16G机器申请了2台,4核 + 8G的机器一台,两台8核16G的机器设置权重4,4核8G的机器设置权重2,默认是将流量均匀地打到各个机器上去,如果每个机器的性能不一样,容易导致性能差的机器负载过高。所以需要调整权重让性能差的机器承载权重小一些流量少一些,即加权轮询算法。
算法思想
使用本地权重表,根据调用情况动态调整。每次调用根据算法更新权重表,设置本地权重为本地所有权重加上配置权重,选出本地权重最大的服务,并设置它的本地权重减去本轮总权重。权重表回收,删除1分内未被调用的实例。预热期权重算法,预热期默认10分钟,warmWeight = uptime/(warmup/weight),如20权重服务,在启动5分钟时的预热权重 = 5/(10/20) = 5/0.5=10。
原始权重:服务设置中的weight
动态权重:每次选取操作调整后的权重
动态权重总和:每次调整完后的所有服务动态权重总和
本地动态权重表:记录本地服务选取时的动态权重信息,每次调用选取算法都会更新
- package org.apache.dubbo.rpc.cluster.loadbalance;
-
- import org.apache.dubbo.common.URL;
- import org.apache.dubbo.rpc.Invocation;
- import org.apache.dubbo.rpc.Invoker;
-
- import java.util.Collection;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentMap;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.atomic.AtomicLong;
-
- /**
- * 轮询负载均衡策略
- * Round robin load balance.
- */
- public class RoundRobinLoadBalance extends AbstractLoadBalance {
- //策略名称
- public static final String NAME = "roundrobin";
- //动态权重更新时间
- private static final int RECYCLE_PERIOD = 60000;
-
- //路由权重
- protected static class WeightedRoundRobin {
- private int weight;
- //动态权重
- private AtomicLong current = new AtomicLong(0);
- //最后选取时间
- private long lastUpdate;
- public int getWeight() {
- return weight;
- }
- public void setWeight(int weight) {
- this.weight = weight;
- current.set(0);
- }
- //每次选取操作增加原始权重
- public long increaseCurrent() {
- return current.addAndGet(weight);
- }
- //每次选中减去动态总权重
- public void sel(int total) {
- current.addAndGet(-1 * total);
- }
- public long getLastUpdate() {
- return lastUpdate;
- }
- public void setLastUpdate(long lastUpdate) {
- this.lastUpdate = lastUpdate;
- }
- }
-
- private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
- //更新锁
- private AtomicBoolean updateLock = new AtomicBoolean();
-
- /**
- * get invoker addr list cached for specified invocation
- * <p>
- * <b>for unit test only</b>
- * 获取url对应的权重路由
- * 结构如下:
- * {
- * "bike.get":{
- * "url1": WeightedRoundRobin,
- * "url2": WeightedRoundRobin,
- * ...
- * },
- * "bike.update:{
- * "url1": WeightedRoundRobin,
- * "url2": WeightedRoundRobin,
- * ...
- * }
- * }
- * @param invokers
- * @param invocation
- * @return
- */
- protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
- String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
- //获取url对应的权重路由
- Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
- if (map != null) {
- return map.keySet();
- }
- return null;
- }
-
- /**
- * 根据动态权重表选取服务
- * @param invokers 实例列表
- * @param url 请求url 在这没啥用
- * @param invocation 请求调用信息
- * @param <T>
- * @return 选出的实例调度器
- */
- @Override
- protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
- //获取url对应的动态权重表
- ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
- //如果权重表为空,则新建
- if (map == null) {
- methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
- map = methodWeightMap.get(key);
- }
- //动态权重总和,用于计算更新动态权重
- int totalWeight = 0;
- //计算时动态权重最小值
- long maxCurrent = Long.MIN_VALUE;
- //当前时间,设置为动态权重表最后选取时间
- long now = System.currentTimeMillis();
- Invoker<T> selectedInvoker = null;
- WeightedRoundRobin selectedWRR = null;
- //循环所有注册服务
- for (Invoker<T> invoker : invokers) {
- //获取服务id
- String identifyString = invoker.getUrl().toIdentityString();
- //获取服务对应的本地动态权信息
- WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
- //获取权重,预热期返回预热权重,否则为原始权重
- int weight = getWeight(invoker, invocation);
- //新建本地动态权重信息
- if (weightedRoundRobin == null) {
- weightedRoundRobin = new WeightedRoundRobin();
- weightedRoundRobin.setWeight(weight);
- map.putIfAbsent(identifyString, weightedRoundRobin);
- }
- //是否为预热权重,预热情况更新权重
- if (weight != weightedRoundRobin.getWeight()) {
- //weight changed
- weightedRoundRobin.setWeight(weight);
- }
- //每次选取调整对应的动态选择
- long cur = weightedRoundRobin.increaseCurrent();
- //更新最后选取时间,为什么不在increaseCurrent方法里面更新?
- //入long cur = weightedRoundRobin.increaseCurrent(now);
- weightedRoundRobin.setLastUpdate(now);
- //获取最大权重服务
- if (cur > maxCurrent) {
- maxCurrent = cur;
- selectedInvoker = invoker;
- selectedWRR = weightedRoundRobin;
- }
- //相加计算总的权重
- totalWeight += weight;
- }
- //移除过期的实例,默认60秒没访问移除
- //调度器数和权重集合数不一致是,更新权重集合
- if (!updateLock.get() && invokers.size() != map.size()) {
- if (updateLock.compareAndSet(false, true)) {
- try {
- // copy -> modify -> update reference
- ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
- newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
- methodWeightMap.put(key, newMap);
- } finally {
- updateLock.set(false);
- }
- }
- }
- //减少选中服务的动态权重值
- if (selectedInvoker != null) {
- selectedWRR.sel(totalWeight);
- return selectedInvoker;
- }
- // should not happen here
- // 没有选出调度器的时候返回第一个服务。
- return invokers.get(0);
- }
-
- }
- package org.apache.dubbo.rpc.cluster.loadbalance;
-
- import org.apache.dubbo.common.URL;
- import org.apache.dubbo.common.utils.CollectionUtils;
- import org.apache.dubbo.rpc.Invocation;
- import org.apache.dubbo.rpc.Invoker;
- import org.apache.dubbo.rpc.cluster.LoadBalance;
-
- import java.util.List;
-
- import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
- import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP;
- import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT;
- import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
- import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
-
- /**
- * AbstractLoadBalance
- */
- public abstract class AbstractLoadBalance implements LoadBalance {
- /**
- * Calculate the weight according to the uptime proportion of warmup time
- * the new weight will be within 1(inclusive) to weight(inclusive)
- * 计算预热期权重,最小为1
- * warmWeight = uptime/(warmup/weight),
- * 如20权重服务,在启动5分钟时的预热权重 = 5/(10/20) = 5/0.5=10
- * @param uptime the uptime in milliseconds 上线时间
- * @param warmup the warmup time in milliseconds 预热时间
- * @param weight the weight of an invoker 原值权重
- * @return weight which takes warmup into account
- */
- static int calculateWarmupWeight(int uptime, int warmup, int weight) {
- int ww = (int) ( uptime / ((float) warmup / weight));
- return ww < 1 ? 1 : (Math.min(ww, weight));
- }
-
- @Override
- public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- if (CollectionUtils.isEmpty(invokers)) {
- return null;
- }
- if (invokers.size() == 1) {
- return invokers.get(0);
- }
- return doSelect(invokers, url, invocation);
- }
-
- protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
-
-
- /**
- * Get the weight of the invoker's invocation which takes warmup time into account
- * if the uptime is within the warmup time, the weight will be reduce proportionally
- * 获取调用程序的调用权重,其中考虑了预热时间如果正常运行时间在预热时间内,则权重将按比例减少
- * @param invoker the invoker
- * @param invocation the invocation of this invoker
- * @return weight
- */
- int getWeight(Invoker<?> invoker, Invocation invocation) {
- int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
- if (weight > 0) {
- //请求时间
- long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
- if (timestamp > 0L) {
- //处理时间,当前时间-invoker上线时间
- long uptime = System.currentTimeMillis() - timestamp;
- if (uptime < 0) {
- return 1;
- }
- //预热时间10分钟
- int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
- //上线时间小于预热时间,返回预热中的权重
- if (uptime > 0 && uptime < warmup) {
- weight = calculateWarmupWeight((int)uptime, warmup, weight);
- }
- }
- }
- //正常情况返回invoker权重
- return Math.max(weight, 0);
- }
- }
官网对LeastActiveLoadBalance
的解释是最小活跃数负载均衡,活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求,那么此时请求会优先分配给该服务提供者。
最小活跃数负载均衡算法思想
最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。每个服务提供者会对应着一个活跃数active,
初始情况下所有服务提供者的active
为0,每当收到一个请求,对应的服务提供者的active
会加 1,处理完请求后,active
会减 1。如果服务提供者性能较好,处理请求的效率就越高,那么active
也会下降的越快。因此可以给这样的服务提供者优先分配请求。除了最小活跃数,LeastActiveLoadBalance
在实现上还引入了权重值。所以准确的来说LeastActiveLoadBalance
是基于加权最小活跃数算法实现的。
- @Override
- protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- // 服务提供者列表的长度
- int length = invokers.size(); // Number of invokers
-
-
- // 最活跃初始值是-1
- int leastActive = -1; // The least active value of all invokers
- //具有相同的最小活动值的调用程序的数量(leastActive)
- int leastCount = 0; // The number of invokers having the same least active value (leastActive)
- int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
-
- // 权重和
- int totalWeight = 0; // The sum of with warmup weights
- //初始值 用于比较
- int firstWeight = 0; // Initial value, used for comparision
-
- // 每个invoker是否是相同的权重?
- boolean sameWeight = true; // Every invoker has the same weight value?
- for (int i = 0; i < length; i++) {
- Invoker<T> invoker = invokers.get(i);
-
- // 获取当前这个invoker并发数
- int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
- // 计算权重值
- int afterWarmup = getWeight(invoker, invocation); // Weight
-
- // 第一个元素的后 或者 当前invoker并发数 小于 最小并发数(初始值是-1)
- if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
- // 记录leastActive 为当前的活跃数
- leastActive = active; // Record the current least active value
- //重置最小计数,基于当前最小计数重新计数
- leastCount = 1; // Reset leastCount, count again based on current leastCount
-
-
- //在0下标出放入这个索引
- leastIndexs[0] = i; // Reset
-
- // 总权重就是 当前invoker的权重
- totalWeight = afterWarmup; // Reset
- //第一个权重
- firstWeight = afterWarmup; // Record the weight the first invoker
-
- sameWeight = true; // Reset, every invoker has the same weight value?
- } else if (active == leastActive) {
-
- // 当前invoker的活跃数 与 leastActive相等
-
- // If current invoker's active value equals with leaseActive, then accumulating.
-
- // 记录索引位置,具有相同最小活跃数的计数器 +1
- leastIndexs[leastCount++] = i; // Record index number of this invoker
-
- //总权重 = 总权重+当前权重
- totalWeight += afterWarmup; // Add this invoker's weight to totalWeight.
- // If every invoker has the same weight?
- if (sameWeight && i > 0
- && afterWarmup != firstWeight) {
- sameWeight = false;
- }
- }
- }
- // assert(leastCount > 0)
- if (leastCount == 1) {//如果我们恰好有一个调用程序具有最少的活动值,那么直接返回这个调用程序。
- // If we got exactly one invoker having the least active value, return this invoker directly.
- return invokers.get(leastIndexs[0]);
- }
- // -----------------------------------------------------------------------------------------------------------
- // 如果每个invoker有不同的权重 && totalWeight > 0
- if (!sameWeight && totalWeight > 0) {
- // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
-
- // 在totalWeight 范围内随机一个值
- int offsetWeight = random.nextInt(totalWeight) + 1;
- // Return a invoker based on the random value.
- for (int i = 0; i < leastCount; i++) {
- // 获取i位置的那个最小活跃 在invokers 里面的位置信息
- int leastIndex = leastIndexs[i];
-
- //offsetWeight - leastIndex 位置invoker的权重
- offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
-
- // offsetWeight 小于0的话
- if (offsetWeight <= 0)
- // 返回这个位置的这个
- return invokers.get(leastIndex);
- }
- }
- // 具有相同权重或者是 总权重=0 的话就均匀返回
- // If all invokers have the same weight value or totalWeight=0, return evenly.
- return invokers.get(leastIndexs[random.nextInt(leastCount)]);
- }
在分布式系统中解决负载均衡问题的时侯使用一致性Hash算法将固定的一部分请求落在同一台机器上,每台服务器会固定的处理同一部分请求,来起到负载均衡的作用。provider挂掉的时候,会基于虚拟节点均匀分配剩余的流量,抖动不会太大。如果需要的不是随机负载均衡,要一类请求都到一个节点,那就走这个一致性Hash策略。
关于 dubbo 负载均衡策略更加详细的描述,可以查看官网 http://dubbo.apache.org/zh-cn/docs/source_code_guide/loadbalance.html 。
算法思想
普通的余数的hash(hash(key)%机器数)算法伸缩性很差,每当新增或者下线机器的时候,某个key与机器的映射会大量的失效,一致性hash则利用hash环对其进行了改进。比如我现在有4台服务器,他们对应的ip地址分别是ip1,ip2,ip3,ip4,通过计算这4个ip的hash值(这里假设hash(ip4)> hash(ip3)>hash(ip2)>hash(ip1)),然后按照hash值的大小顺时针分布到hash环上。
从0开始然后按照4个ip的hash值大小顺时针方向(这个趋向正无穷大)散落在环上(这里假设hash(ip4)> hash(ip3)>hash(ip2)>hash(ip1)),当用户调用请求打过来的时候,计算用户某个参数的hash(key)值,比如用户u1的key的hash值正好在hash(ip3)值 与hash(2)值之间,这时候u1的请求就要交给hash(ip3)也就是ip3的这台机器处理,当ip3的机器挂了的时候,hash环是这样分布的:
这时u1的请求就会被重新分配到ip4的机器上 。
ConsistentHashLoadBalance继承AbstractLoadBalance抽象类,重写doSelect方法
- @Override
- protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
-
- // 获取方法名
- String methodName = RpcUtils.getMethodName(invocation);
- // 拼接key, 接口全类名.方法名
- String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
-
- //根据invokers 计算个hashcode
- int identityHashCode = System.identityHashCode(invokers);
-
- // 根据key 从缓存中获取ConsistentHashSelector
- ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
-
-
- // selector==null 或者是 selector的hashcode != 现在算出来的,说明这个invokers 变了
- if (selector == null || selector.identityHashCode != identityHashCode) {
-
- // 创建ConsistentHashSelector 放入缓存中
- selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
- // 获取新的selector
- selector = (ConsistentHashSelector<T>) selectors.get(key);
- }
-
- // 是用selector 进行选择
- return selector.select(invocation);
- }
-
首先是拼装key= 接口全类名.方法名,计算invokers的hash值,通过key从selectors缓存中获取对应的值ConsistentHashSelector,如果selector是null或者不一致(说明invokers集合中的invoker有变动,有下线或者上线情况,导致算出来的hash值与之前存的hash值不一致)就新new ConsistentHashSelector然后塞到selectors缓存中,然后调用当前selector的select(invocation)方法。ConsistentHashSelector 的构造方法:
- ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
-
- // 虚拟的invoker
- this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
- // hashcode
- this.identityHashCode = identityHashCode;
- // 获取url
- URL url = invokers.get(0).getUrl();
-
-
- //获取hash.nodes ,缺省是160 每个实例节点的个数
- this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
-
-
- // 获取hash.arguments 缺省是0 然后进行切割
- String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
-
- argumentIndex = new int[index.length];
- for (int i = 0; i < index.length; i++) {
- argumentIndex[i] = Integer.parseInt(index[i]);
- }
- for (Invoker<T> invoker : invokers) {
- // 获取地址
- String address = invoker.getUrl().getAddress();
-
- for (int i = 0; i < replicaNumber / 4; i++) {
- byte[] digest = md5(address + i);
- for (int h = 0; h < 4; h++) {
- long m = hash(digest, h);//计算位置
- virtualInvokers.put(m, invoker);
- }
- }
- }
- }
先创建virtualInvokers存储虚拟节点的treemap,获取属性hash.nodes的值,缺省是160(每个invoker的节点个数默认160个),获取hash.arguments属性值,缺省是0(要使用哪个位置的参数,可以是多个用,逗号隔开,默认是使用第一个参数)。然后为每个invoker根据其ip+port生成replicaNumber个的节点(生成虚拟节点),塞到virtualInvokers的treemap中,key就是算出来的hash值,value就是invoker,TreeMap是按照key的值从小到大排序的。ConsistentHashSelector 的select(inv)方法:
- public Invoker<T> select(Invocation invocation) {
- // 将参数转成key
- String key = toKey(invocation.getArguments());
- byte[] digest = md5(key);
- return selectForKey(hash(digest, 0));
- }
-
- private String toKey(Object[] args) {
- StringBuilder buf = new StringBuilder();
- for (int i : argumentIndex) {
- if (i >= 0 && i < args.length) {
- buf.append(args[i]);// 参数
- }
- }
- return buf.toString();
- }
-
- private Invoker<T> selectForKey(long hash) {//tailMap 是返回键值大于或等于key的那部分 ,然后再取第一个
- Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
- if (entry == null) {//如果没有取到的话就说明hash就是最大的了,下面那个就是 treemap 第一个了
- entry = virtualInvokers.firstEntry();
- }// 返回对应的那个invoker
- return entry.getValue();
- }
-
- private long hash(byte[] digest, int number) {
- return (((long) (digest[3 + number * 4] & 0xFF) << 24)
- | ((long) (digest[2 + number * 4] & 0xFF) << 16)
- | ((long) (digest[1 + number * 4] & 0xFF) << 8)
- | (digest[number * 4] & 0xFF))
- & 0xFFFFFFFFL;
- }
-
- private byte[] md5(String value) {
- MessageDigest md5;
- try {
- md5 = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- md5.reset();
- byte[] bytes;
- try {
- bytes = value.getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- md5.update(bytes);
- return md5.digest();
- }
-
select方法中先是利用调用参数生成key,看下toKey方法中,就是根据我们hash.arguments 参数取出对应位的参数,拼接成key。在使用md5对key计算,使用hash算法算出key对应的hash值。然后调用selectForKey 方法根据这个key的hash值找出对应的invoker。
这个selectForKey方法,virtualInvokers.tailMap(hash, true).firstEntry()找出对应的节点,tailMap方法返回键值大于或等于key的那部分,使用firstEntry方法获取这部分的第一个。如果获取的entry是null的话,说明这个hash值就是最大的了,要想找对应的invoker ,就要找TreeMap的第一个元素,然后返回这个invoker。dubbo的一致性hash算法到这就结束了。
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。