赞
踩
文中的“ 多线程服务器” 是指运行在 Linux 操作系统上的独占式网络应用程序。
不涉及 Windows 系统,不涉及人机交互界面(无论命令行或图形) ;不考虑文件读写(往磁盘写 log 除外) ,不考虑数据库操作,不考虑 Web 应用;只考虑 TCP,不考虑 UDP,也不考虑除了局域网络之外的其他数据收发方式 。
有了以上这么多限制,那么我将要谈的“ 网络应用程序”的基本功能可以归纳为“ 收到数据,算一算,再发出去”。
目录
多路复用的三种方式(都是上面的I/O的多路复用,但是进行了改进)
扩展:https://blog.csdn.net/u014252478/article/details/89176346
“ nonblocking IO+IO multiplexing ” 这种模型,即 Reactor 模式。
非阻塞I/O
非阻塞IO很简单,通过fcntl(POSIX)或ioctl(Unix)设为非阻塞模式,这时,当你调用read时,如果有数据收到,就返回数据,如果没有数据收到,就立刻返回一个错误,如EWOULDBLOCK。这样是不会阻塞线程了,但是你还是要不断的轮询来读取或写入。相当于你去查看有没有数据,告诉你没有,过一会再来吧!应用过一会再来问,有没有数据?没有数据,会有一个返回。但是依旧很不好。应用必须得过一会来一下,问问内核有木有数据啊。
(1)当用户进程发出read操作时,如果kernel中的数据还没有准备好;
(2)那么它并不会block用户进程,而是立刻返回一个error,从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果;
(3)用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call;
(4)那么它马上就将数据拷贝到了用户内存,然后返回。
所以,nonblocking IO的特点是用户进程在内核准备数据的阶段需要不断的主动询问数据好了没有。
I/O多路复用
多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符(FileDescription,简称FD),如果有一个文件描述符(FileDescription)就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)。虾米意思?就是派一个代表,同时监听多个文件描述符是否有数据到来。等着等着,如有有数据,就告诉某某你的数据来啦!赶紧来处理吧。有没有很感动,一个人待着,帮了很多人。
(1)当用户进程调用了 select,那么整个进程会被 block;
(2)而同时,kernel 会“监视”所有 select 负责的 socket ;
(3)当任何一个 socket 中的数据准备好了,select 就会返回;
(4)这个时候用户进程再调用 read 操作,将数据从 kernel 拷贝到用户进程。
所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。
这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。
所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用多线程 + 阻塞 IO的web server性能更好,可能延迟还更大。
select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
多路复用的三种方式(都是上面的I/O的多路复用,但是进行了改进)
1、select
【1】每次调用select()都需要把fd(文件描述符)从用户态拷贝到内核态,开销比较大
【2】每次都需要在内核遍历传入的fd(文件描述符)
【3】select支持文件数量比较小,默认是1024
2、poll
poll的实现和select非常相似,只是描述fd集合的方式不同,poll使用pollfd结构而不是select的fd_set结构,支持的文件数量比较多,不仅仅是1024
3、epoll
select/poll只提供了一个函数,select/poll函数,但是epoll一下子就提供了3个函数,真是人多力量大,难怪这么强,如下3个函数:
epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注册要监听的事件类型;epoll_wait则是等待事件的产生。
epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此之前,我们先看一下epoll和select和poll的调用接口上的不同,select和poll都只提供了一个函数——select或者poll函数。而epoll提供了三个函数,epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注册要监听的事件类型;epoll_wait则是等待事件的产生。
对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次。
对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。
对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。
总结:
(1)select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用epoll_wait不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间。这就是回调机制带来的性能提升。
(2)select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要一次拷贝,而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内部定义的等待队列)。这也能节省不少的开销。
“ non-blocking IO+IO multiplexing ” 这种模型下,程序的基本结构是一个事件循环(eventloop): (代码仅为示意,没有完整考虑各种情况)
Reactor 模型的优点很明显,编程简单,效率也不错。不仅网络读写可以用,连接的建立( connect/accept)甚至 DNS 解析都可以用非阻塞方式进行,以提高并发度和吞吐量(throughput)。
one loop per thread + thread pool 模式
在此中模型下,程序里的每个 IO 线程有一个 event loop(或者叫 Reactor),用于处理读写和定时事件,代码框架同上。
这种方式的好处是:
简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态,当有新的任务进来,从线程池中取出一个空闲的线程处理任务,然后当任务处理完成之后,该线程被重新放回到线程池中,供其他的任务使用,当线程池中的线程都在处理任务时,就没有空闲线程供使用,此时,若有新的任务产生,只能等待线程池中有线程结束任务空闲才能执行。
线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。
线程池其中一项很重要的技术点就是任务的队列,队列虽然属于一种基础的数据结构,但是发挥了举足轻重的作用。
对于没有 IO 只有计算任务的线程,使用 event loop 有点浪费,这时使用基于 blocking queue 实现的任务队列(TaskQueue)。
使用 Sockets,优势在于:可以跨主机,具有伸缩性。如果一台及其处理能力不够,就把进程分散到同一局域网的多台机器上,程序改改 host:port 配置就能继续用。
TCP Sockets 和 pipe 都是一个文件描述符,都可以 read/write/fcntl/select/poll 等。不同的是 TCP 是双向的,pipe 是单向的,进程间双向通信得开两个文件描述符,而且进程要有父子关系才能用 pipe 。
TCP port 由一个进程独占,而且操作系统会自动回收(listening port 和已建立连接的 TCP socket 都是文件描述符,在进程结束时操作系统会关闭所有文件描述符)。即使程序意外退出,也不会给系统留下垃圾。
两个进程通过 TCP 通信,如果一个崩溃了,操作系统就会关闭连接,这样另一个进程几乎立刻就能感知到。
此处提到的进程是指 fork() 的产物,线程指的是 pthread_create() / clone() 的产物。
首先,一个由多台机器组成的分布式系统必然是多进程的,因为进程不能跨越 OS 边界。下面将讨论比较两种模式:运行一个多线程的进程;运行多个单线程的进程。
只有单线程程序可以 fork:fork 之后一般有两种行为,立即执行 exec(),,就是在调用进程内部执行一个可执行文件,变为别的程序;继续运行当前程序,通过共享的文件描述符与父进程通信,协同完成任务。
单线程程序可以限制程序的 CPU 占用率:单线程的进程一个进程只占用一个 core,可以避免过分抢夺系统资源。
优点:简单,用 IO 多路复用的 event loop 或 阻塞式 IO。
缺点:非抢占式,可能会造成一个优先级高的事件等待优先级低的事件,优先级反转。
需要提高响应速度,让 IO 和“计算”相互重叠,降低等待延迟。虽然多线程不能提高绝对性能,但能提高平均响应性能。
一个程序要用多线程实现的必要条件:
多线程之线程的分类
- IO 线程,放入 EventLoop 中。这类线程的主循环是 IO 多路复用,阻塞地等待在 epoll_wait() 等系统调用上。有些简单的计算,如消息的编码或解码也可以放在其中。
- 计算线程,放入 ThreadPool 中。这类线程的主循环是 blocking queue ,阻塞地等待在 condition variable 上。这类线程一般位于 thread pool 中。
- 第三方库所用的线程,如 logging ,database connection等。
多线程的并发度
- 单纯的采用 thread per connection 模型,一个线程只处理一个连接,则并发度受限于能同时启动的最大线程数,32位系统中,一个进程的地址空间是4G,用户态能访问3G左右,一个线程的默认栈大约10MB,300个线程左右就到达了上限。
- 基于事件的 IO multiplexing event loop 即 Reactor 模式,一个线程的一个 event loop 就足以提供上万的并发量。
- multi_loop 的多线程程序 即 one loop per thread,能轻松支持数万的并发量,且与 CPU 数目成正比。
多线程的响应时间与吞吐量
- 相比于多核心,每核单线程处理一个耗时的计算服务,使用多线程并行算法8个核一起算,首次请求的响应时间会降低很多,吞吐量不会改变(实际上算法的并行度肯定达不到100%,也就是说8核并行计算的加速比达不到8)甚至还会有所下降,不过以此为代价,换得响应时间的提升,在有些应用场合也是值得的。
在进行多线程网络编程的时候,遇到的几个很自然的问题是:如何处理 IO?能否用多个线程同时读写一个 socket 文件描述符?
作者所建议的多线程程序应该遵循的原则是:每个文件描述符只由一个线程操作,从而轻松解决消息收发的顺序性问题,也避免了关闭 socket 时的各种 race condition。同时一个线程可以操作多个文件描述符,但是不能操作别的线程已经拥有的文件描述符。
epoll() 也遵循相同的原则。Linux 下,应将对同一个 epoll fd 的操作(添加、删除、修改、等待)都放到同一个线程中执行。
线程相关的api并不复杂,然而无论是linux还是windows系统,都是c风格的接口,我们只需简单的封装成对象,方便易用即可。任务队列是设计成用来进行线程间通信,使用任务队列进行线程间通信设计到一些模式,原理并不难理解,我们需要做到是弄清楚,在什么场景下选用什么样的模式即可。
任务队列的定义
任务队列对线程间通信进行了抽象,限定了线程间通过“任务”传递信息,而相关的数据及操作则被任务封装保存。任务队列这个名词可能在其他场景定义过其他意义,这里讨论的任务队列定义为:能够把封装了数据和操作的任务在多线程间传递的线程安全的先入先出的队列。其与线程关系示意图如下:
两个虚线框分别表示线程A和线程B恩能够访问的数据边界,由此可见 任务队列是线程间通信的媒介。
在任务队列中,生产和消费的对象是“任务”,这里把任务定义为组合了数据和操作的对象,或者简单理解成包含了 void(void*)类型的函数指针和 void*数据指针的结构。我们将任务定位成类 task_t,下面分析一下 task_t 的实现。
- class task_impl_i
- {
- public:
- virtual ~task_impl_i(){}
- virtual void run() = 0;
- virtual task_impl_i* fork() = 0;
- };
-
- class task_impl_t: public task_impl_i
- {
- public:
- task_impl_t(task_func_t func_, void* arg_):
- m_func(func_),
- m_arg(arg_)
- {}
-
- virtual void run()
- {
- m_func(m_arg);
- }
-
- virtual task_impl_i* fork()
- {
- return new task_impl_t(m_func, m_arg);
- }
-
- protected:
- task_func_t m_func;
- void* m_arg;
- };
-
- struct task_t
- {
- static void dumy(void*){}
- task_t(task_func_t f_, void* d_):
- task_impl(new task_impl_t(f_, d_))
- {
- }
- task_t(task_impl_i* task_imp_):
- task_impl(task_imp_)
- {
- }
- task_t(const task_t& src_):
- task_impl(src_.task_impl->fork())
- {
- }
- task_t()
- {
- task_impl = new task_impl_t(&task_t::dumy, NULL);
- }
- ~task_t()
- {
- delete task_impl;
- }
- task_t& operator=(const task_t& src_)
- {
- delete task_impl;
- task_impl = src_.task_impl->fork();
- return *this;
- }
-
- void run()
- {
- task_impl->run();
- }
- task_impl_i* task_impl;
- };
Task最重要的接口是run,简单的执行保存的操作,具体的操作保存在task_impl_i的基类中,由于对象本身就是数据加操作的集合,所以构造task_impl_i的子类对象时,为其赋予不同的数据和操作即可。这里使用了组合的方式实现了接口和实现的分离。这么做的优点是应用层只需知道task的概念即可,对应task_impl_i不需要了解。由于不同的操作和数据可能需要构造不同task_impl_i子类,我们需要提供一些泛型函数,能够将用户的所有操作和数据都能轻易的转换成task对象。task_binder_t 提供一系列的gen函数,能够转换用户的普通函数和数据为task_t对象。
函数封装了用户的操作逻辑,需要在某线程执行特定操作时,需要将操作对应的函数转换成task_t,投递到目的线程对应的任务队列。任务队列使用起来虽然像是在互相投递消息,但是根本上仍然是共享数据式的数据交换方式。主要步骤如下:
实现的关键代码如下:
- void produce(const task_t& task_)
- {
- lock_guard_t lock(m_mutex);
- bool need_sig = m_tasklist.empty();
-
- m_tasklist.push_back(task_);
- if (need_sig)
- {
- m_cond.signal();
- }
- }
消费任务的线程会变成完全的任务驱动,该线程只有一个职责,执行任务队列的所有任务,若当前任务队列为空时,线程会阻塞在条件变量上,重新有新任务到来时,线程会被再次唤醒。实现代码如下:
- int consume(task_t& task_)
- {
- lock_guard_t lock(m_mutex);
- while (m_tasklist.empty())
- {
- if (false == m_flag)
- {
- return -1;
- }
- m_cond.wait();
- }
-
- task_ = m_tasklist.front();
- m_tasklist.pop_front();
-
- return 0;
- }
- int run()
- {
- task_t t;
- while (0 == consume(t))
- {
- t.run();
- }
- return 0;
- }
BlockingQueue是一个继承自Queue的接口,在Queue的队列基础上增加了阻塞操作。简单来说,就是在在BlockingQueue为空时从队头取数据将会被阻塞,因为此时还没有数据可取,一旦队列中有数据了,取数据的线程就会释放得到了数据;如果BlockingQueue有容量限制且满了,那么插入数据的线程将会阻塞,知道队列中有空闲位置可以插入数据了,才会释放。经过上面一段描述,可以发现这就是一个生产者-消费者模型。
- class BQueue
- {
- public:
- BQueue(int worker_count_, int capacity_ = 8);
- ~BQueue();
-
- bool push(const Message &item);
-
- bool get(Message &item, const uint16_t &time_ms = 1000);
-
- void getNow(Message &item);
-
- int size() { return (tail - head + capacity) % capacity; }
-
- void shutdown();
-
- private:
- int capacity;
- int head = 0;
- int tail = 0;
- std::vector<Message> queue;
- std::mutex mtx;
- std::condition_variable notFull, notEmpty;
- std::vector<Worker<BQueue>*> threads; //这个Worker类是一个线程池,稍后说明
- };
头文件BQueue.cpp
- BQueue::BQueue(int worker_count_, int capacity_)
- : capacity(capacity_), queue(std::vector<Message>(capacity))
- {
- // assert(workerCount_ > 0);
- ///此处创建了一个线程池
- try {
- for (int i = 0; i < worker_count_; i++)
- threads.push_back(
- new Worker<BQueue>(*this));
- } catch (std::exception &e) {
- LOG(ERROR) << "Thread init failed!";
- LOG(ERROR) << e.what();
- } catch (...) {
- LOG(ERROR) << "Thread init failed!";
- }
- }
-
- BQueue::~BQueue()
- {
- shutdown();
- // join();
- for (auto it : threads) {
- if (!it) {
- delete it;
- it = 0;
- }
- }
- threads.clear();
- }
-
- bool BQueue::push(const Message &item)
- {
- {
- std::unique_lock<std::mutex> lck(mtx);
- notFull.wait(lck, [&] { return (tail + 1) % capacity != head; });
- // if (!notFull.wait_for(lck, std::chrono::milliseconds(1000), [&] { return (tail + 1) % capacity != head; }))
- // return false;
-
- // while ((tail + 1) % capacity == head) //is full
- // notFull.wait(lck);
- queue[tail] = item;
- tail = (tail + 1) % capacity;
- }
-
- //wake up get thread
- notEmpty.notify_one();
-
- return true;
- }
-
- bool BQueue::get(Message &msg, const uint16_t &time_ms)
- {
- {
- std::unique_lock<std::mutex> lck(mtx);
- if (!notEmpty.wait_for(lck, std::chrono::milliseconds(time_ms), [&] { return head != tail; }))
- return false;
- msg = queue[head];
- head = (head + 1) % capacity;
- }
- notFull.notify_one();
- return true;
- }
-
- void BQueue::getNow(Message& msg)
- {
- {
- std::unique_lock<std::mutex> lck(mtx);
- notEmpty.wait(lck, [&] { return head != tail; });
- // while (head == tail) // is empty
- // notEmpty.wait(lck);
- msg = queue[head];
- head = (head + 1) % capacity;
- DEBUGLOG(std::cout << "get..." << std::endl);
- }
-
- //wake up push thread
- notFull.notify_one();
- }
-
- void BQueue::shutdown()
- {
- std::lock_guard<std::mutex> lock(mtx);
-
- for (auto it : threads) {
- if (!it) {
- it->shutdown();
- }
- }
-
- notFull.notify_all();
- notEmpty.notify_all();
- }
线程池Worker类代码:
下面描述一下代码中实现线程池的思想。线程池类ThreadPool含有一定的线程(PoolThread),放在容器threads中,和一个任务队列taskQueue,该队列就是上面所获的阻塞队列(BlockingQueue)。先初始化阻塞队列,设置队列的容量大小。再初始化线程,每个线程都会被启动。被启动的线程会一定循环,循环体内不断尝试从队列中取出任务,然后调用任务的处理函数来执行任务。如果没有任务就被阻塞,直到有任务为止。当任务来临时,通过调用线程池的execute函数将任务放入到队列中,如果队列满了便被阻塞。
- template <class T>
- class Worker
- {
- public:
- Worker(T & queue_)
- : queue(queue_),status(true)
- {
- thread = new std::thread(&Worker::run, this);
- }
-
- ~Worker()
- {
- if (thread) {
- delete thread;
- thread = nullptr;
- }
- }
-
- void run()
- {
- Message message;
- while (status) {
- if (queue.get(message)) {
- handle(message);///此处处理消息
- }
- }
- }
-
- void shutdown()
- {
- status = false;
- join();
- }
-
-
- private:
- void join()
- {
- if (thread->joinable())
- thread->join();
- }
-
- T& queue;
- std::thread* thread;
- bool status;
- };
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。