赞
踩
本章介绍令牌桶Token Bucket算法在流量限速场景的原理,以及C++实现和相关测试验证。
常见的限流算法有计数限流,固定窗口限流,滑动窗口限流,漏桶算法限流和令牌桶算法限流。令牌桶算法是限流算法的一种,其原理是系统会以一个恒定的速度往桶里放入固定数量的令牌,而如果请求需要被处理,则需要先从桶里获取对应令牌,当桶里没有令牌可取时,则拒绝服务。
令牌桶算法可应用于多种场景,本章是针对网络流控制限制场景的使用,对外发的网络数据进行控制,使数据以长期的平均速率外发,并运行一个瞬时的最高流量。
定义:令牌桶提供了一种机制,限制流的平均速率,并允许流中达到所需的突发级别。
需求:项目的需求是文件下载的服务器端限流,下载请求过来后(请求其他模块有限制,这里忽略这个问题),在独立线程读取文件并发送数据到该请求的socket上,要求发送到网络的总数据要限流,但是数据不能丢弃,可以延迟等待发送。
实现:
结论:
限速速率不一定是匀速的,但长期速率是一定的。
在令牌桶满的情况下,如果有突发流量过来,会瞬时消耗掉令牌桶的令牌,此时的理论上限速率为:令牌桶的容量+速率。比如令牌桶的容量为60M,限速速度为50MB/s,那此时的速率最高可到110MB/s的速度。
单次发送的数据不能大于令牌容量,否则获取不到令牌。
令牌的获取线程安全,可多线程获取。
类图如下:
具体代码如下:
CountSemaphore是信号量的封装。
信号量的容量即令牌桶的容量,提供获取令牌和投递令牌2个操作。该对象封装可作为信号量的公共库使用。
代码分析如下:
具体代码如下:
countsemaphore.h
#ifndef COUNTSEMAPHORE_H
#define COUNTSEMAPHORE_H
#include <mutex>
#include <condition_variable>
#include <climits>
class CountSemaphore
{
public:
CountSemaphore(unsigned long long initCount, unsigned long long maxCount);
bool acquire(unsigned long long count = 1);
void release(unsigned long long count = 1);
private:
std::mutex m_mtx;
std::condition_variable m_cv;
// 当前可用数量
unsigned long long m_updateCount = 0;
// 最大数量
unsigned long long m_maxCount = ULLONG_MAX;
};
#endif // COUNTSEMAPHORE_H
countsemaphore.cpp
#include "countsemaphore.h"
CountSemaphore::CountSemaphore(unsigned long long initCount, unsigned long long maxCount)
: m_updateCount(initCount > maxCount ? maxCount : initCount)
, m_maxCount(maxCount)
{
}
bool CountSemaphore::acquire(unsigned long long count)
{
std::unique_lock<std::mutex> lck(m_mtx);
// 获取的数量大于最大值,不可能成功
if (count > m_maxCount)
{
return false;
}
m_cv.wait(lck, [&]() -> bool { return m_updateCount >= count; });
m_updateCount -= count;
return true;
}
void CountSemaphore::release(unsigned long long count)
{
std::unique_lock<std::mutex> lck(m_mtx);
auto tobeCount = m_updateCount + count;
if (tobeCount > m_maxCount)
{
m_updateCount = m_maxCount;
}
else
{
m_updateCount = tobeCount;
}
m_cv.notify_all();
}
TokenSpeedLimiter是令牌桶的封装。
包含令牌桶的限速速度,令牌的投递时间间隔和令牌桶的容量。提供开始和结束投递操作和获取令牌的操作。
其中投递的时间间隔以毫秒为单位,越小速率越均匀。
tokenspeedlimiter.h
#ifndef TOKENSPEEDLIMITER_H
#define TOKENSPEEDLIMITER_H
#include "countsemaphore.h"
#include <thread>
class TokenSpeedLimiter
{
public:
TokenSpeedLimiter(unsigned long long speed, unsigned long long capacity, unsigned long long deliveryIntervalMs);
void begin();
void end();
bool acquireToken(unsigned long long tokenCount);
private:
void workingThread();
private:
// 限速速度(字节/s)
unsigned long long m_limitSpeed;
// 令牌投递时间间隔(毫秒)
unsigned long long m_deliveryIntervalMs;
// 信号量
CountSemaphore m_semaphore;
// 是否运行
bool m_runing = false;
// 线程
std::shared_ptr<std::thread> m_thread = nullptr;
};
#endif // TOKENSPEEDLIMITER_H
tokenspeedlimiter.cpp
#include "tokenspeedlimiter.h"
#include <functional>
TokenSpeedLimiter::TokenSpeedLimiter(unsigned long long speed, unsigned long long capacity, unsigned long long deliveryIntervalMs)
: m_limitSpeed(speed)
, m_deliveryIntervalMs(deliveryIntervalMs)
, m_semaphore(0, capacity)
{
}
void TokenSpeedLimiter::begin()
{
if (m_runing)
{
return;
}
m_runing = true;
m_thread.reset(new std::thread(std::bind(&TokenSpeedLimiter::workingThread, this)));
}
void TokenSpeedLimiter::end()
{
m_runing = false;
if (m_thread != nullptr)
{
m_thread->join();
m_thread = nullptr;
}
}
bool TokenSpeedLimiter::acquireToken(unsigned long long tokenCount)
{
return m_semaphore.acquire(tokenCount);
}
void TokenSpeedLimiter::workingThread()
{
auto lastTime = std::chrono::steady_clock::now();
while(m_runing)
{
// 延时定时投递
std::this_thread::sleep_for(std::chrono::milliseconds(m_deliveryIntervalMs));
// 计算投递时间差
auto curTime = std::chrono::steady_clock::now();
auto elapsedMs = std::chrono::duration<double, std::milli>(curTime - lastTime).count();
lastTime = curTime;
// 根据时间差计算投递令牌的数量(除以1000换算成毫秒投递数量,然后再乘以毫秒时间差)
auto tokens = m_limitSpeed * elapsedMs / 1000;
// 投递令牌
m_semaphore.release((unsigned long long)tokens);
}
}
main.cpp
包含令牌桶对象的调用及测试结果打印。
#include <QCoreApplication>
#include "tokenspeedlimiter.h"
#include <iostream>
#include <map>
#include <sstream>
#include <iomanip>
// 网络发送字节数,用于统计
unsigned long long sendCount = 0;
std::mutex mutexCount;
std::map<unsigned int, unsigned long long> mapTheadIdCount;
// 网络数据发送测试线程函数
void sendDatatoNet(TokenSpeedLimiter* speedLimiter)
{
// 每次发送的数据包大小
const int sizeOnePacket = 2 * 1024;
while(true)
{
// 获取令牌
if (!speedLimiter->acquireToken(sizeOnePacket))
{
continue;
}
// 统计总的发送包数量
std::unique_lock<std::mutex> lck(mutexCount);
sendCount += sizeOnePacket;
// 统计每个线程发送的数包
auto threadId = std::this_thread::get_id();
auto theId = *(unsigned int *)&threadId;
auto it = mapTheadIdCount.find(theId);
if (it != mapTheadIdCount.end())
{
it->second += sizeOnePacket;
}
else
{
mapTheadIdCount.insert(std::make_pair(theId, sizeOnePacket));
}
}
}
void statisticNetwork()
{
auto lastTime = std::chrono::steady_clock::now();
while(true)
{
// 1秒统计一次
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
// 计算投递时间差
auto curTime = std::chrono::steady_clock::now();
auto elapsedMs = std::chrono::duration<double, std::milli>(curTime - lastTime).count();
lastTime = curTime;
// 打印总速率
std::unique_lock<std::mutex> lck(mutexCount);
if (elapsedMs > 0)
{
// * 1000 / elapsedMs为毫秒转换为秒
auto curSpeed = (double)sendCount * 1000 / 1024 / 1024 / elapsedMs;
std::cout << "speed: " << curSpeed << " MB/s" << std::endl;
}
// 打印每个线程发送的百分比
std::cout << "thread send count: ";
for (auto it: mapTheadIdCount)
{
std::cout << it.first << "(" << std::setfill(' ') << std::setw(2) << 100 * it.second / sendCount << "%),";
}
std::cout << std::endl;
mapTheadIdCount.clear();
sendCount = 0;
}
}
int main(int argc, char *argv[])
{
// 构造限速器:限速50M/s,容量为6M,间隔10ms投递令牌;当前的流量峰值为56M(50的速度 + 6M的容量)左右
TokenSpeedLimiter speedLimiter(50 * 1024 * 1024, 50 * 1024 * 1024 / 10 * 1.2, 10);
speedLimiter.begin();
// 延时5秒,填满令牌桶容量
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
// 启动网络发送线程
for (int i = 0; i < 10; ++i)
{
new std::thread(sendDatatoNet, &speedLimiter);
}
// 启动统计
statisticNetwork();
return 0;
}
测试输出如下以及分析如下:
speed: 55.9062 MB/s
thread send count: 356( 9%),1644( 8%),2312(11%),11112(12%),13472( 7%),14696( 6%),20588( 7%),21096(13%),22080(10%),22500(11%),
speed: 49.3314 MB/s
thread send count: 356( 8%),1644(11%),2312(10%),11112(12%),13472( 7%),14696( 9%),20588( 9%),21096(13%),22080( 9%),22500( 7%),
speed: 50.6701 MB/s
thread send count: 356( 7%),1644(11%),2312(16%),11112( 7%),13472(11%),14696( 9%),20588( 7%),21096( 6%),22080(10%),22500(11%),
speed: 49.9935 MB/s
thread send count: 356( 7%),1644(10%),2312( 6%),11112(12%),13472(10%),14696( 6%),20588(16%),21096( 6%),22080(11%),22500(11%),
speed: 49.3036 MB/s
thread send count: 356(13%),1644(11%),2312( 8%),11112( 6%),13472( 9%),14696(10%),20588( 9%),21096( 9%),22080(11%),22500(10%),
speed: 50.6954 MB/s
thread send count: 356(11%),1644(10%),2312(14%),11112(10%),13472( 9%),14696( 4%),20588(10%),21096( 8%),22080(10%),22500( 9%),
speed: 49.2754 MB/s
thread send count: 356(12%),1644( 8%),2312( 5%),11112(11%),13472(13%),14696( 9%),20588( 9%),21096(11%),22080( 8%),22500( 9%),
speed: 50.0361 MB/s
thread send count: 356(13%),1644( 8%),2312( 7%),11112(10%),13472( 9%),14696( 7%),20588(12%),21096(10%),22080(10%),22500( 8%),
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。