php-fpm,以及现在比较火的workman,swoole,其都是使用的多进程的管理模式,使用manger进程,管理其子进程,今天通过纯php做了一个类似的小东西,这样以后再做守护脚本的时候,可以使用这种方式,好处就是,多进程,更能利用多核的优势,并且可以做平滑重启。
本代码实现的功能,可以自定义子进程数,比如你定3个子进程,运行会产生1个manger进程,3个work进程,通过linux信号,来实现,进程的平滑重启。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
<?php /** * Class PcntlProccess * 多进程守护任务管理 * * kill -s SIGUSR1 mange进程: 平滑重启所有子进程 * kill | kill -s SIGTERM mange进程:平滑stop所有进程 * kill | kill -s SIGUSR1 | kill -s SIGTERM 子进程,重启该子进程 * author chao * */ class PcntlProccess{ const proccess_num = 3;//子进程数 const proccess_name = 'php-proccess';//进程名前缀 protected $_pid; //入口进程 public function run(){ if(self::proccess_num > 0){ //启动进程 for ($i=0; $i<self::proccess_num; $i++) { $this->parentProccess(); } //SIGUSR1 留给用户使用,此处做reload平滑重启使用 pcntl_signal(SIGUSR1, function($this){ $this->sig_handler(SIGUSR1); }); // 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和 处理. 通常用来要求程序自己正常退出. shell命令kill缺省产生这 个信号. pcntl_signal(SIGTERM, function($this){ $this->sig_handler(SIGTERM); }); //终止进程 终端线路挂断 pcntl_signal(SIGHUP, function($this){ $this->sig_handler(SIGHUP); }); cli_set_process_title(self::proccess_name . '-manger'); while (true) { // do something $pid = pcntl_waitpid(-1, $status, WUNTRACED | WNOHANG); if($pid>0){ $this->parentProccess($pid); } pcntl_signal_dispatch(); // 接收到信号时,调用注册的signalHandler() usleep(100000); } }else{ exit('proccess_num must > 0'); } } function sig_handler($signo){ switch ($signo) { case SIGUSR1: foreach($this->_pid as $key){ posix_kill($key,SIGUSR1); } echo date('Y-m-d H:i:s')." [manage] [SIGUSR1] all work reload\n"; break; case SIGTERM: foreach($this->_pid as $key){ posix_kill($key,SIGUSR1); } echo date('Y-m-d H:i:s')." [manage] [SIGTERM] stop\n"; die(); break; case SIGHUP: echo date('Y-m-d H:i:s')." [manage] [SIGHUP] linux clinet exit\n"; break; } } function sig_san_handler($signo){ switch ($signo) { case SIGUSR1: echo date('Y-m-d H:i:s')." [work] [SIGUSR1] work die\n"; die(); break; case SIGTERM: echo date('Y-m-d H:i:s')." [work] [SIGTERM] work die\n"; die(); break; case SIGHUP: echo date('Y-m-d H:i:s')." [work] [SIGHUP] linux clinet exit\n"; break; } } //父进程 public function parentProccess($id=''){ if(!empty($id)){ unset($this->_pid[$id]); } $pid = pcntl_fork(); if($pid == -1) { return false; } else { if($pid) { #父进程获得子进程的pid,存入数组 $this->_pid[$pid] = $pid; return true; } else { //开始发送,子进程执行完自己的任务后,退出。 $this->sanProccess(); exit; } } } public function sanProccess(){ cli_set_process_title(self::proccess_name . '-work'); echo date('Y-m-d H:i:s')." work proccess start\n"; pcntl_signal(SIGUSR1, function($this){ $this->sig_san_handler(SIGUSR1); }); pcntl_signal(SIGHUP, function($this){ $this->sig_san_handler(SIGHUP); }); pcntl_signal(SIGTERM, function($this){ $this->sig_san_handler(SIGTERM); }); while(1){ //todu pcntl_signal_dispatch(); sleep(1); } } } $obj = new PcntlProccess(); $obj->run(); |
php index.php命令行运行,会产生3个子进程,
1 2 3 4 5 6 |
[root@iZ28y2wmd7jZ ~]# ps -ef|grep php-proccess root 11536 11051 0 18:38 pts/0 00:00:00 grep --color=auto php-proccess root 13756 32171 0 Nov10 ? 00:00:06 php-proccess-work root 13757 32171 0 Nov10 ? 00:00:06 php-proccess-work root 13758 32171 0 Nov10 ? 00:00:06 php-proccess-work root 32171 1 0 Nov09 ? 00:02:37 php-proccess-manger |
你随意kill一个,管理进城会立即补充一个,传递SIGUSR1给子进程,会平滑重启比子进程,给主进城,就会平滑重启所有子进程,kill主进城,或者传递SIGHUP信号,会平滑的kill所有子进程,结束主进城。这样的好处就是主进城只负责监控子进程和自己的状态,运行更稳定,然后后期可以做一些子进程任务处理上限的控制,就可以试进城更稳定高效,信号试用kill -s 传递
升级版:
留作记录
|
<?php use controllers\Member\MemberAsynchTask; use controllers\Member\MemberConf; use controllers\Log\Tlog; /** * Class MemberTaskCommand * 会员积分相关守护任务 * 多进程守护任务管理 * * 监控脚本只需要监控此主任务正常运行即可 * * kill -s SIGUSR1 | kill -10 manger进程: 平滑重启所有子进程 * kill | kill -s SIGTERM mange进程:平滑stop所有进程 * kill | kill -s SIGUSR1 | kill -s SIGTERM 子进程,重启该子进程 * author zhangcunchao * date 2016-11-15 * */ class MemberTaskCommand { const proccess_num = 3;//子进程数 const proccess_name = 'php-member-task';//进程名前缀,此设置主要为监控脚本提供方便 protected $open_debug = true;//开启debug,警告直接输出,不发邮件 protected $_pid; protected $shm_key; //共享内存key protected $shm_id; protected $max_job_limit= 0; //单个子进程最多处理的任务数,达到此任务就重启此子进程;0 没有限制 protected $NOW_DIR; protected $max_try_num = 3;//队列数据最大尝试次数//超过计入日志 public function __construct(){ $this->NOW_DIR = dirname(__FILE__).'/'; } //入口进程 public function run(){ //检查扩展 if(!extension_loaded('pcntl')){ print_r("需要安装pcntl扩展支持".PHP_EOL); exit(); } if(!extension_loaded('shmop')){ print_r("需要安装shmop扩展支持".PHP_EOL); exit(); } $this->shm_key = ftok(__FILE__, 't'); /** 开辟一块共享内存 int $key , string $flags , int $mode , int $size $flags: a:访问只读内存段 c:创建一个新内存段,或者如果该内存段已存在,尝试打开它进行读写 w:可读写的内存段 n:创建一个新内存段,如果该内存段已存在,则会失败 $mode: 八进制格式 0655 $size: 开辟的数据大小 字节 */ $this->shm_id = shmop_open($this->shm_key, 'c', 0644, 1); if($this->shm_id === false){ echo "开辟共享内存失败".PHP_EOL; exit(); } /** * 写入数据 数据必须是字符串格式 , 最后一个指偏移量 * 注意:偏移量必须在指定的范围之内,否则写入不了 * */ $size = shmop_write($this->shm_id, 1, 0); if($size<=0){ echo "共享内存写入失败".PHP_EOL; exit(); } if(self::proccess_num > 0){ //服务检查 try{ $job = MemberAsynchTask::getInstance(); $job->system_check(); }catch (\Exception $e){ print_r($e->getMessage()); exit(); } //启动进程 for ($i=0; $i<self::proccess_num; $i++) { $this->parentProccess(); } //SIGUSR1 留给用户使用,此处做reload平滑重启使用 pcntl_signal(SIGUSR1, function($this){ $this->sig_handler(SIGUSR1); }); // 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和 处理. 通常用来要求程序自己正常退出. shell命令kill缺省产生这 个信号. pcntl_signal(SIGTERM, function($this){ $this->sig_handler(SIGTERM); }); //终止进程 终端线路挂断 pcntl_signal(SIGHUP, function($this){ $this->sig_handler(SIGHUP); }); cli_set_process_title(self::proccess_name . '-manger'); while (true) { // do something $pid = pcntl_waitpid(-1, $status, WUNTRACED | WNOHANG); if($pid>0){ $s = pcntl_wifexited($status); //todo非正常退出报警 $this->parentProccess($pid); } pcntl_signal_dispatch(); // 接收到信号时,调用注册的signalHandler() $this->check_kill_all(); usleep(100000); } }else{ exit('proccess_num must > 0'); } } //监测是否有严重子进程错误,进行强制进程结束 function check_kill_all(){ //读取的范围也必须在申请的内存范围之内,否则失败 //其他异常强制全部退出 $status = shmop_read($this->shm_id, 0, 1); if(1 != $status){ shmop_delete($this->shm_id); shmop_close($this->shm_id); $this->sig_handler(SIGTERM); } } //子进程使用 function san_kill_all_task(){ shmop_write($this->shm_id, 0, 0); } function sig_handler($signo){ switch ($signo) { case SIGUSR1: foreach($this->_pid as $key){ posix_kill($key,SIGUSR1); } echo date('Y-m-d H:i:s')." [manage] [SIGUSR1] all work reload\n"; break; case SIGTERM: foreach($this->_pid as $key){ posix_kill($key,SIGUSR1); } echo date('Y-m-d H:i:s')." [manage] [SIGTERM] stop\n"; die(); break; case SIGHUP: echo date('Y-m-d H:i:s')." [manage] [SIGHUP] linux clinet exit\n"; break; } } function sig_san_handler($signo){ switch ($signo) { case SIGUSR1: echo date('Y-m-d H:i:s')." [work] [SIGUSR1] work die\n"; die(); break; case SIGTERM: echo date('Y-m-d H:i:s')." [work] [SIGTERM] work die\n"; die(); break; case SIGHUP: echo date('Y-m-d H:i:s')." [work] [SIGHUP] linux clinet exit\n"; break; } } //父进程 public function parentProccess($id=''){ if(!empty($id)){ unset($this->_pid[$id]); } $pid = pcntl_fork(); if($pid == -1) { return false; } else { if($pid) { #父进程获得子进程的pid,存入数组 $this->_pid[$pid] = $pid; return true; } else { //开始发送,子进程执行完自己的任务后,退出。 $this->sanProccess(); exit; } } } //子进程 public function sanProccess(){ //信号绑定 cli_set_process_title(self::proccess_name . '-work'); echo date('Y-m-d H:i:s')." work proccess start\n"; pcntl_signal(SIGUSR1, function($this){ $this->sig_san_handler(SIGUSR1); }); pcntl_signal(SIGHUP, function($this){ $this->sig_san_handler(SIGHUP); }); pcntl_signal(SIGTERM, function($this){ $this->sig_san_handler(SIGTERM); }); if($this->max_job_limit){ $san_job_i = 0; } //job分发 $job = MemberAsynchTask::getInstance(); while(1){ try{ $job->queue_ping(); }catch (\Exception $e){ //重试一次 try{ if(10002 == $e->getCode()){ echo '重试一次连接'.PHP_EOL; $job->queue_ping(); } }catch (\Exception $e){ //队列服务已不可用 if(10002 == $e->getCode()){ //kill 所有子进程,报警 $this->san_kill_all_task(); $job->waring_email(var_export($e,true)); sleep(3); } } } //初始容器,避免循环影响 $job_data = array(); $job_key = ''; try{ $jrs = $job->queue_pop(MemberConf::$_MEMBER_QUEUE_LIST); if($jrs && !empty($jrs[0]) && !empty($jrs[1])){ $job_data = json_decode($jrs[1],true); $job_key = $jrs[0]; //分发任务 switch($job_key){ //积分更新任务 case MemberConf::$_MEMBER_QUEUE_LIST['evente_member_integral_queue']: require_once $this->NOW_DIR.'MemberIntegralJob.php'; $MemberIntegralJob = new MemberIntegralJob(); $MemberIntegralJob->integralChangeJob($job_data); break; //用户会员激活任务 case MemberConf::$_MEMBER_QUEUE_LIST['evente_member_user_activate']: require_once $this->NOW_DIR.'MemberIntegralJob.php'; $MemberIntegralJob = new MemberIntegralJob(); $MemberIntegralJob->userMemberActivate($job_data); break; //成长值更新任务 case MemberConf::$_MEMBER_QUEUE_LIST['evente_member_growth_queue']: require_once $this->NOW_DIR.'MemberGrowthJob.php'; $MemberGrowthJob = new MemberGrowthJob(); $MemberGrowthJob->growthChangeJob($job_data); break; //主办会员等级规则修改 case MemberConf::$_MEMBER_QUEUE_LIST['evente_member_rule_up_parent']: require_once $this->NOW_DIR.'MemberGrowthJob.php'; $MemberGrowthJob = new MemberGrowthJob(); $MemberGrowthJob->orgLevelRuleUpJob($job_data); break; //等级reset任务 case MemberConf::$_MEMBER_QUEUE_LIST['evente_member_user_level_reset']: require_once $this->NOW_DIR.'MemberGrowthJob.php'; $MemberGrowthJob = new MemberGrowthJob(); $MemberGrowthJob->userLevelResetJob($job_data); break; default: throw new \Exception("未定义的守护任务处理case:".$job_key,10002); } } }catch (\Exception $e){ $code = $e->getCode(); //统一重试,失败记录重试队列 if(!empty($job_data)&&!empty($job_key)){ if(isset($job_data['_job_error_num'])){ $job_data['_job_error_num']=$job_data['_job_error_num']+1; }else{ $job_data['_job_error_num'] = 1; } } $mes = '本数据第'.$job_data['_job_error_num'].'次错误尝试'.PHP_EOL; //重试上限 if($this->max_try_num <= $job_data['_job_error_num']){ $mes .= '数据已达处理次数上限,重试结束,记入日志'.PHP_EOL; $log = new Tlog(MemberConf::QUEUE_EXCEPTION_FILE,MemberConf::LOG_DIR); unset($job_data['_job_error_num']); $ld = array( 'k' => $job_key, 'v' => $job_data, ); $m = '[error_code:'.$code.' |'.json_encode($ld).'|]'; $log->debugwriteLog($m); //邮件 switch($code){ case 10002: //任务返回到队列 $mes .= '严重错误,需要紧急修复,子进程已全部kill'.PHP_EOL; break; case 10003: $mes .= '严重错误,需要紧急修复,单任务重入队列'.PHP_EOL; break; default: $mes .= '非紧急异常,异常数据已记录到日志:'. MemberConf::TASK_EXCEPTION_FILE . PHP_EOL; } $mes .= var_export($e,true); if($this->open_debug){ print_r($mes); }else{ $job->waring_email($mes,'异步任务会员积分报警邮件'); } }else{ //统一重入队列 if(!empty($job_data)&&!empty($job_key)){ $job->queue_push($job_key,json_encode($job_data)); } } //容错处理 switch($code){ case 10002: //严重致命错误,需要停止所有进程运行 //任务返回到队列 //kill 所有子进程,报警 $this->san_kill_all_task(); sleep(3); break; } } //控制单个进程最大任务数 if($this->max_job_limit){ ++$san_job_i; if($this->max_job_limit <= $san_job_i){ echo 'work job limit reload'.PHP_EOL; exit(); } } pcntl_signal_dispatch(); } } } $obj = new MemberTaskCommand(); $obj->run(); |
程序本天成,妙手偶得之!我们只是代码的搬运工!
转载请注明:http://www.521php.com/archives/2017/
2016年11月08日 下午 10:11 ljw | 引用 | #1
看不动啊。。。。