赞
踩
python由于解析器CPython中GIL(Global Interpreter Lock)的缘故,使得多线程python服务并没有想象中的那么高效率(没有发挥CPU多核的优势),具体原因这边不多做展开(细节可参考《python中的GIL详解》)。
因此,考虑通过使用python multiprocessing多进程库,将原来的多线程服务转变为多进程服务,弥补因为GIL而低效的缺陷。
说起多进程服务,大家可能第一个先想到nginx这个成熟的开源案例。所以这边我也借鉴了nginx事务模型中的一些细节,实现了一套基于python multiprocessing库的多进程服务框架。
多线程服务转多进程,意味着一些全局变量、全局锁等等,也由多线程全局变量、全局锁转变成多进程全局变量、全局锁,下面结合源码做一些解析。
框架最终实现的功能:
直接上代码:
# -*- coding: UTF-8 -*- import errno import fcntl import multiprocessing import os import signal import select import socket import threading import time # 文件锁路径 _FILE_LOCK="/tmp/MultiProcessServer.lock" class MultiProcessServer: def __init__(self, host, port): # 使用了cpu核数作为worker进程的数量 self.numWorkers = multiprocessing.cpu_count() # 多进程共享数据,服务总共接收的连接数 self.numAccepts = multiprocessing.Value('i', 0) # 当前worker进程信息 self.workers = [] # 平滑重启时老worker进程信息 self.oldWorkers = [] # 运行标志位 self.running = True # 平滑重启标志位 self.update = False self.host = host self.port = port self.listenSocket = None self.listenFd = -1 self.listenWait = 128 def initialize(self): self.listenSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.listenSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.listenSocket.bind((self.host, self.port)) self.listenSocket.listen(self.listenWait) self.listenFd = self.listenSocket.fileno() # 注册master进程停止的信号处理函数 signal.signal(signal.SIGTERM, self.masterQuit) # 注册master进程平滑重启子进程的信号处理函数 signal.signal(signal.SIGUSR2, self.masterUpdate) # 更改worker子进程启动数量 def setNumWorkers(self, num): self.numWorkers = num # master进程主体函数 def run(self): for i in range(self.numWorkers): self.runWorker() while self.running: self.checkWorker() self.checkOldWorker() time.sleep(2) for worker in (self.workers + self.oldWorkers): os.kill(worker.pid, signal.SIGTERM) worker.join() print "[MASTER] quitting..." # master进程检查当前worker进程的状态,发现有worker挂了,立刻启动新的 def checkWorker(self): deadWorkers = [] for worker in self.workers: if not worker.is_alive(): print "[MASTER]worker %s is dead" % worker.pid deadWorkers.append(worker) worker.join() for deadWorker in deadWorkers: self.workers.remove(deadWorker) for i in xrange(self.numWorkers - len(self.workers)): self.runWorker() # master进程检查平滑重启时老worker进程的状态,发现有worker挂了,不会启动新的,但需要join相关子进程,避免僵尸进程的出现 def checkOldWorker(self): deadOldWorkers = [] for oldWorker in self.oldWorkers: if not oldWorker.is_alive(): print "[MASTER]worker %s is dead" % oldWorker.pid deadOldWorkers.append(oldWorker) oldWorker.join() for deadOldWorker in deadOldWorkers: self.oldWorkers.remove(deadOldWorker) # master进程启动worker子进程 def runWorker(self): try: worker = multiprocessing.Process(target=self.workerProcess) worker.daemon = True worker.start() self.workers.append(worker) print "[MASTER]run worker pid: %s" % worker.pid except Exception, e: print "[MASTER]run worker failed: %s" % e # worker进程主体函数 def workerProcess(self): # 线程锁,一个线程处理accept,一个线程处理建立连接后的网络io self.connMutex = threading.Lock() # key:文件描述符;value:(连接对象,连接地址) self.connDict = dict() # 每个worker进程的最大连接数设置 self.connLimit = self.listenWait / self.numWorkers self.eventPoll = select.epoll() # worker注册自己的停止、平滑重启信号处理函数;不重新注册这边会拷贝到master进程上的信号处理函数 signal.signal(signal.SIGTERM, self.workerQuit) signal.signal(signal.SIGUSR2, self.workerUpdate) # acceept线程启动 AcceptThread = threading.Thread(target=self.DoAccept, args=()) AcceptThread.daemon = True AcceptThread.start() while self.running: print "[WORKER %d] alive, update flag: %d" % (os.getpid(), int(self.update)) #time.sleep(2) self.DoPoll() # 检查更新标志位 if self.update: self.connMutex.acquire() nowConns = len(self.connDict) self.connMutex.release() if nowConns: # worker进程上还有连接时,不会退出 print "[WORKER %d] still has %d connections, waiting disconnecting..." % (os.getpid(), nowConns) else: # worker进程上没连接时,退出 print "[WORKER %d] has no connections, quitting for updating..." % os.getpid() break print "[WORKER %d] quitting..." % os.getpid() # worker accept线程主体函数 def DoAccept(self): while self.running and not self.update: self.connMutex.acquire() nowConns = len(self.connDict) self.connMutex.release() # 判断连接数,虽参考了nginx,判断逻辑比较简单,可按需自行替换 if nowConns >= self.connLimit: print "[WORKER %d] connections %d >= %d, give up accepting" % (os.getpid(), nowConns, self.connLimit) time.sleep(2) continue conn = None addr = None #with self.numAccepts.get_lock(): # worker进程连接数未达上限,竞争文件锁,以获得accept的机会 with open(_FILE_LOCK, 'a') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) conn, addr = self.listenSocket.accept() self.numAccepts.value += 1 self.connMutex.acquire() self.connDict[conn.fileno()] = (conn,addr) self.connMutex.release() # 注册新连接可读事件 self.eventPoll.register(conn.fileno(), select.EPOLLIN) print "[WORKER %d] get new connection %s" % (os.getpid(), addr) # worker网络io线程处理主体函数 def DoPoll(self): try: events = self.eventPoll.poll(5) except IOError, e: # 记得加异常处理,否则在poll期间收到信号,将抛出异常 if e.args[0] == errno.EINTR: return else: raise self.connMutex.acquire() for fd, event in events: if event & select.EPOLLIN: # 读事件处理,记得异常处理,防止客户端侧异常退出 try: data = self.connDict[fd][0].recv(100) # 怎么读、读完怎么处理,这边按需调整吧 print "[WORKER %d] receive %s" % (os.getpid(), data) self.eventPoll.modify(fd, select.EPOLLOUT) except socket.error, e: print "[WORKER %d] socket error: %s" % (os.getpid(), str(e)) elif event & select.EPOLLOUT: # 写事件处理,记得异常处理,防止客户端侧异常退出 try: # 回什么数据,这边按需调整吧 self.connDict[fd][0].send("receive ok") self.eventPoll.modify(fd, select.EPOLLIN) except socket.error, e: print "[WORKER %d] socket error: %s" % (os.getpid(), str(e)) elif event & select.EPOLLHUP: # 连接断开事件处理 print "[WORKER %d] connection disconnect: %s" % (os.getpid(), self.connDict[fd][1]) self.connDict[fd][0].close() del self.connDict[fd] self.eventPoll.unregister(fd) self.connMutex.release() # master进程退出信号处理函数 def masterQuit(self, signum, frame): print "[MASTER] receive SIGTERM" self.running = False # master进程平滑重启信号处理函数 def masterUpdate(self, signum, frame): print "[MASTER] receive SIGUSR2" for worker in self.workers: self.oldWorkers.append(worker) os.kill(worker.pid, signal.SIGUSR2) self.workers = [] # worker进程退出信号处理函数 def workerQuit(self, signum, frame): print "[WORKER %d] receive SIGTERM" % os.getpid() self.running = False # worker进程平滑重启信号处理函数 def workerUpdate(self, signum, frame): print "[WORKER %d] receive SIGUSR2" % os.getpid() self.update = True if __name__=="__main__": s = MultiProcessServer("127.0.0.1", 12345) s.initialize() s.run()
#with self.numAccepts.get_lock():
# worker进程连接数未达上限,竞争文件锁,以获得accept的机会
with open(_FILE_LOCK, 'a') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
try:
events = self.eventPoll.poll(5)
except IOError, e:
# 记得加异常处理,否则在poll期间收到信号,将抛出异常
if e.args[0] == errno.EINTR:
return
else:
raise
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。