赞
踩
// 为一个新的stream绑定一个新的accept事件监听.
public function resumeAccept()
{
// Register a listener to be notified when server socket is ready to read.
if (self::$globalEvent && $this->_pauseAccept && $this->_mainSocket) {
if ($this->transport !== 'udp') {
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
} else {
// tcp可读事件监听(当有client连接上当前socket server时触发此事件,进行accpet操作).
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
array($this, 'acceptUdpConnection'));
}
$this->_pauseAccept = false;
}
}
// 绑定信号处理(主进程的核心就在于信号的处理,回调各种绑定函数).
protected static function installSignal()
{
// stop 停止信号.
pcntl_signal(SIGINT, array('\Workerman\Worker', 'signalHandler'), false);
// graceful stop 平滑停止信号.
pcntl_signal(SIGTERM, array('\Workerman\Worker', 'signalHandler'), false);
// reload 重新加载信号.
pcntl_signal(SIGUSR1, array('\Workerman\Worker', 'signalHandler'), false);
// graceful reload 平滑加载信号.
pcntl_signal(SIGQUIT, array('\Workerman\Worker', 'signalHandler'), false);
// status worker状态信号.
pcntl_signal(SIGUSR2, array('\Workerman\Worker', 'signalHandler'), false);
// connection status 客户端链接状态信号.
pcntl_signal(SIGIO, array('\Workerman\Worker', 'signalHandler'), false);
// ignore 忽略的信号.
pcntl_signal(SIGPIPE, SIG_IGN, false);
}
// 信号处理绑定的回调函数.
public static function signalHandler($signal)
{
switch ($signal) {
// Stop 停止.
case SIGINT:
self::$_gracefulStop = false;
// 停止所有进程.
self::stopAll();
break;
// Graceful stop 平滑停止.
case SIGTERM:
self::$_gracefulStop = true;
// 停止所有进程.
self::stopAll();
break;
// Reload worker子进程重新加载.
case SIGQUIT:
case SIGUSR1:
if($signal === SIGQUIT){
// 平滑的停止
self::$_gracefulStop = true;
}else{
// 直接停止.
self::$_gracefulStop = false;
}
self::$_pidsToRestart = self::getAllWorkerPids();
self::reload();
break;
// Show status 状态.
case SIGUSR2:
// 将worker运行信息写入统计文件.
self::writeStatisticsToStatusFile();
break;
// Show connection status 客户端连接.
case SIGIO:
self::writeConnectionsStatisticsToStatusFile();
break;
}
}
// 停止所有进程(由主进程触发给子进程).
public static function stopAll()
{
// 标记当前状态为停止.
self::$_status = self::STATUS_SHUTDOWN;
// For master process.
// 主进程处理逻辑.
if (self::$_masterPid === posix_getpid()) {
self::log("Workerman[" . basename(self::$_startFile) . "] stopping ...");
// 获取所有子进程的pid.
$worker_pid_array = self::getAllWorkerPids();
// Send stop signal to all child processes.
if (self::$_gracefulStop) {
$sig = SIGTERM;
} else {
$sig = SIGINT;
}
foreach ($worker_pid_array as $worker_pid) {
// 向子进程发送停止的信号.
posix_kill($worker_pid, $sig);
if(!self::$_gracefulStop){
// 强制杀死子进程.
Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($worker_pid, SIGKILL), false);
}
}
// Remove statistics file.
// 移除统计文件.
if (is_file(self::$_statisticsFile)) {
@unlink(self::$_statisticsFile);
}
} // For child processes.
else {
// 子进程处理.
// Execute exit.
foreach (self::$_workers as $worker) {
// 自己调用自己的停止方法.
$worker->stop();
}
// 子进程如果强制退出则直接进入exit,否则会等待关闭所有连接然后exit.
if (!self::$_gracefulStop || ConnectionInterface::$statistics['connection_count'] <= 0) {
// 强制停止.
self::$globalEvent->destroy();
exit(0);
}
}
}
// 子进程执行的退出函数.
public function stop()
{
// Try to emit onWorkerStop callback.
// 如果绑定了停止的回调函数,优先执行回调函数.
if ($this->onWorkerStop) {
try {
call_user_func($this->onWorkerStop, $this);
} catch (\Exception $e) {
self::log($e);
exit(250);
} catch (\Error $e) {
self::log($e);
exit(250);
}
}
// Remove listener for server socket.
// 移除子进程的socket server监听.
$this->unlisten();
// Close all connections for the worker.
if (!self::$_gracefulStop) {
// 强制断开所有客户端连接.
foreach ($this->connections as $connection) {
// TcpConnection::closr()
$connection->close();
}
}
// 清空绑定的回调函数.
$this->onMessage = $this->onClose = $this->onError = $this->onBufferDrain = $this->onBufferFull = null;
// Remove worker instance from self::$_workers.
// 清空实例化的worker.
unset(self::$_workers[$this->workerId]);
}
// 子进程取消socket server监听.
public function unlisten() {
$this->pauseAccept();
if ($this->_mainSocket) {
// 关闭监听.
@fclose($this->_mainSocket);
$this->_mainSocket = null;
}
}
// 子进程暂停接收新的客户端连接.
public function pauseAccept()
{
if (self::$globalEvent && $this->_mainSocket && false === $this->_pauseAccept) {
// 删除事件绑定.
self::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ);
$this->_pauseAccept = true;
}
}

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。