概述
推荐:《PHP视频教程》
前言
模拟supervisor进程管理DEMO(简易实现)
截图:
实现
1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程
不足:无法持续监听错误页面。由于socket得到的响应是通过include
函数加载的,所以在加载的页面内不能出现tail -f
命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。
知识点
代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1)
,而是用stream_select($read, $write, $except, 1)
让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open
,是一个很强大的函数。在这之前我曾用pcntl_exec
执行过外部程序,但是需要先pcntl_fork
。而用其他的如exec
,shell_exec
无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()
处理子进程。如此才能防止因为子进程启动时的上下文导致的一些怪异的现象。
代码
主进程代码:Process.php
<?php
require_once __DIR__ . '/Consumer.php';require_once __DIR__ . '/StreamConnection.php';require_once __DIR__ . '/Http.php';class Process{
/**
* 待启动的消费者数组
*/
protected $consumers = array();
protected $childPids = array();
const PPID_FILE = __DIR__ . '/process';
protected $serializerConsumer;
public function __construct()
{
$this->consumers = $this->getConsumers();
}
// 这里是个DEMO,实际可以用读取配置文件的方式。
public function getConsumers()
{
$consumer = new Consumer([
'program' => 'test',
'command' => '/usr/bin/php test.php',
'directory' => __DIR__,
'logfile' => __DIR__ . '/test.log',
'uniqid' => uniqid(),
'auto_restart' => false,
]);
return [
$consumer->uniqid => $consumer,
];
}
public function run()
{
if (empty($this->consumers)) {
// consumer empty
return;
}
if ($this->_notifyMaster()) {
// master alive
return;
}
$pid = pcntl_fork();
if ($pid < 0) {
exit;
} elseif ($pid > 0) {
exit;
}
if (!posix_setsid()) {
exit;
}
$stream = new StreamConnection('tcp://0.0.0.0:7865');
@cli_set_process_title('AMQP Master Process');
// 将主进程ID写入文件
file_put_contents(self::PPID_FILE, getmypid());
// master进程继续
while (true) {
$this->init();
pcntl_signal_dispatch();
$this->waitpid();
// 如果子进程被全部回收,则主进程退出
// if (empty($this->childPids)) {
// $stream->close($stream->getSocket());
// break;
// }
$stream->accept(function ($uniqid, $action) {
$this->handle($uniqid, $action);
return $this->display();
});
}
}
protected function init()
{
foreach ($this->consumers as &$c) {
switch ($c->state) {
case Consumer::RUNNING:
case Consumer::STOP:
break;
case Consumer::NOMINAL:
case Consumer::STARTING:
$this->fork($c);
break;
case Consumer::STOPING:
if ($c->pid && posix_kill($c->pid, SIGTERM)) {
$this->reset($c, Consumer::STOP);
}
break;
case Consumer::RESTART:
if (empty($c->pid)) {
$this->fork($c);
break;
}
if (posix_kill($c->pid, SIGTERM)) {
$this->reset($c, Consumer::STOP);
$this->fork($c);
}
break;
default:
break;
}
}
}
protected function reset(Consumer $c, $state)
{
$c->pid = '';
$c->uptime = '';
$c->state = $state;
$c->process = null;
}
protected function waitpid()
{
foreach ($this->childPids as $uniqid => $pid) {
$result = pcntl_waitpid($pid, $status, WNOHANG);
if ($result == $pid || $result == -1) {
unset($this->childPids[$uniqid]);
$c = &$this->consumers[$uniqid];
$state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP;
$this->reset($c, $state);
}
}
}
/**
* 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程
*/
private function _notifyMaster()
{
$ppid = file_get_contents(self::PPID_FILE );
$isAlive = $this->checkProcessAlive($ppid);
if (!$isAlive) return false;
return true;
}
public function checkProcessAlive($pid)
{
if (empty($pid)) return false;
$pidinfo = `ps co pid {$pid} | xargs`;
$pidinfo = trim($pidinfo);
$pattern = "/.*?PID.*?(d+).*?/";
preg_match($pattern, $pidinfo, $matches);
return empty($matches) ? false : ($matches[1] == $pid ? true : false);
}
/**
* fork一个新的子进程
*/
protected function fork(Consumer $c)
{
$descriptorspec = [2 => ['file', $c->logfile, 'a'],];
$process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory);
if ($process) {
$ret = proc_get_status($process);
if ($ret['running']) {
$c->state = Consumer::RUNNING;
$c->pid = $ret['pid'];
$c->process = $process;
$c->uptime = date('m-d H:i');
$this->childPids[$c->uniqid] = $ret['pid'];
} else {
$c->state = Consumer::EXITED;
proc_close($process);
}
} else {
$c->state = Consumer::ERROR;
}
return $c;
}
public function display()
{
$location = 'http://127.0.0.1:7865';
$basePath = Http::$basePath;
$scriptName = isset($_SERVER['SCRIPT_NAME']) &&
!empty($_SERVER['SCRIPT_NAME']) &&
$_SERVER['SCRIPT_NAME'] != '/' ? $_SERVER['SCRIPT_NAME'] : '/index.php';
if ($scriptName == '/index.html') {
return Http::status_301($location);
}
$sourcePath = $basePath . $scriptName;
if (!is_file($sourcePath)) {
return Http::status_404();
}
ob_start();
include $sourcePath;
$response = ob_get_contents();
ob_clean();
return Http::status_200($response);
}
public function handle($uniqid, $action)
{
if (!empty($uniqid) && !isset($this->consumers[$uniqid])) {
return;
}
switch ($action) {
case 'refresh':
break;
case 'restartall':
$this->killall(true);
break;
case 'stopall':
$this->killall();
break;
case 'stop':
$c = &$this->consumers[$uniqid];
if ($c->state != Consumer::RUNNING) break;
$c->state = Consumer::STOPING;
break;
case 'start':
$c = &$this->consumers[$uniqid];
if ($c->state == Consumer::RUNNING) break;
$c->state = Consumer::STARTING;
break;
case 'restart':
$c = &$this->consumers[$uniqid];
$c->state = Consumer::RESTART;
break;
case 'copy':
$c = $this->consumers[$uniqid];
$newC = clone $c;
$newC->uniqid = uniqid('C');
$newC->state = Consumer::NOMINAL;
$newC->pid = '';
$this->consumers[$newC->uniqid] = $newC;
break;
default:
break;
}
}
protected function killall($restart = false)
{
foreach ($this->consumers as &$c) {
$c->state = $restart ? Consumer::RESTART : Consumer::STOPING;
}
}}$cli = new Process();$cli->run();
登录后复制
Consumer消费者对象
<?php
require_once __DIR__ . '/BaseObject.php';class Consumer extends BaseObject{
/** 开启多少个消费者 */
public $numprocs = 1;
/** 当前配置的唯一标志 */
public $program;
/** 执行的命令 */
public $command;
/** 当前工作的目录 */
public $directory;
/** 通过 $qos $queueName $duplicate 生成的 $queue */
public $queue;
/** 程序执行日志记录 */
public $logfile = '';
/** 消费进程的唯一ID */
public $uniqid;
/** 进程IDpid */
public $pid;
/** 进程状态 */
public $state = self::NOMINAL;
/** 自启动 */
public $auto_restart = false;
public $process;
/** 启动时间 */
public $uptime;
const RUNNING = 'running';
const STOP = 'stoped';
const NOMINAL = 'nominal';
const RESTART = 'restart';
const STOPING = 'stoping';
const STARTING = 'stating';
const ERROR = 'error';
const BLOCKED = 'blocked';
const EXITED = 'exited';
const FATEL = 'fatel';}
登录后复制
stream相关代码:StreamConnection.php
<?php
class StreamConnection{
protected $socket;
protected $timeout = 2; //s
protected $client;
public function __construct($host)
{
$this->socket = $this->connect($host);
}
public function connect($host)
{
$socket = stream_socket_server($host, $errno, $errstr);
if (!$socket) {
exit('stream error');
}
stream_set_timeout($socket, $this->timeout);
stream_set_chunk_size($socket, 1024);
stream_set_blocking($socket, false);
$this->client = [$socket];
return $socket;
}
public function accept(Closure $callback)
{
$read = $this->client;
if (stream_select($read, $write, $except, 1) < 1) return;
if (in_array($this->socket, $read)) {
$cs = stream_socket_accept($this->socket);
$this->client[] = $cs;
}
foreach ($read as $s) {
if ($s == $this->socket) continue;
$header = fread($s, 1024);
if (empty($header)) {
$index = array_search($s, $this->client);
if ($index)
unset($this->client[$index]);
$this->close($s);
continue;
}
Http::parse_http($header);
$uniqid = isset($_GET['uniqid']) ? $_GET['uniqid'] : '';
$action = isset($_GET['action']) ? $_GET['action'] : '';
$response = $callback($uniqid, $action);
$this->write($s, $response);
$index = array_search($s, $this->client);
if ($index)
unset($this->client[$index]);
$this->close($s);
}
}
public function write($socket, $response)
{
$ret = fwrite($socket, $response, strlen($response));
}
public function close($socket)
{
$flag = fclose($socket);
}
public function getSocket()
{
return $this->socket;
}}
登录后复制
Http响应代码:Http.php
<?php
class Http{
public static $basePath = __DIR__ . '/views';
public static $max_age = 120; //秒
/*
* 函数: parse_http
* 描述: 解析http协议
*/
public static function parse_http($http)
{
// 初始化
$_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES = array();
$GLOBALS['HTTP_RAW_POST_DATA'] = '';
// 需要设置的变量名
$_SERVER = array(
'QUERY_STRING' => '',
'REQUEST_METHOD' => '',
'REQUEST_URI' => '',
'SERVER_PROTOCOL' => '',
'SERVER_SOFTWARE' => '',
'SERVER_NAME' => '',
'HTTP_HOST' => '',
'HTTP_USER_AGENT' => '',
'HTTP_ACCEPT' => '',
'HTTP_ACCEPT_LANGUAGE' => '',
'HTTP_ACCEPT_ENCODING' => '',
'HTTP_COOKIE' => '',
'HTTP_CONNECTION' => '',
'REMOTE_ADDR' => '',
'REMOTE_PORT' => '0',
'SCRIPT_NAME' => '',
'HTTP_REFERER' => '',
'CONTENT_TYPE' => '',
'HTTP_IF_NONE_MATCH' => '',
);
// 将header分割成数组
list($http_header, $http_body) = explode("rnrn", $http, 2);
$header_data = explode("rn", $http_header);
list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]);
unset($header_data[0]);
foreach ($header_data as $content) {
// rnrn
if (empty($content)) {
continue;
}
list($key, $value) = explode(':', $content, 2);
$key = strtolower($key);
$value = trim($value);
switch ($key) {
case 'host':
$_SERVER['HTTP_HOST'] = $value;
$tmp = explode(':', $value);
$_SERVER['SERVER_NAME'] = $tmp[0];
if (isset($tmp[1])) {
$_SERVER['SERVER_PORT'] = $tmp[1];
}
break;
case 'cookie':
$_SERVER['HTTP_COOKIE'] = $value;
parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE);
break;
case 'user-agent':
$_SERVER['HTTP_USER_AGENT'] = $value;
break;
case 'accept':
$_SERVER['HTTP_ACCEPT'] = $value;
break;
case 'accept-language':
$_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value;
break;
case 'accept-encoding':
$_SERVER['HTTP_ACCEPT_ENCODING'] = $value;
break;
case 'connection':
$_SERVER['HTTP_CONNECTION'] = $value;
break;
case 'referer':
$_SERVER['HTTP_REFERER'] = $value;
break;
case 'if-modified-since':
$_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value;
break;
case 'if-none-match':
$_SERVER['HTTP_IF_NONE_MATCH'] = $value;
break;
case 'content-type':
if (!preg_match('/boundary="?(S+)"?/', $value, $match)) {
$_SERVER['CONTENT_TYPE'] = $value;
} else {
$_SERVER['CONTENT_TYPE'] = 'multipart/form-data';
$http_post_boundary = '--' . $match[1];
}
break;
}
}
// script_name
$_SERVER['SCRIPT_NAME'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH);
// QUERY_STRING
$_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
if ($_SERVER['QUERY_STRING']) {
// $GET
parse_str($_SERVER['QUERY_STRING'], $_GET);
} else {
$_SERVER['QUERY_STRING'] = '';
}
// REQUEST
$_REQUEST = array_merge($_GET, $_POST);
return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_COOKIE, 'server' => $_SERVER, 'files' => $_FILES);
}
public static function status_404()
{
return <<<EOFHTTP/1.1 404 OK
content-type: text/htmlEOF;
}
public static function status_301($location)
{
return <<<EOFHTTP/1.1 301 Moved Permanently
Content-Length: 0
Content-Type: text/plain
Location: $locationCache-Control: no-cacheEOF;
}
public static function status_304()
{
return <<<EOFHTTP/1.1 304 Not Modified
Content-Length: 0EOF;
}
public static function status_200($response)
{
$contentType = $_SERVER['CONTENT_TYPE'];
$length = strlen($response);
$header = '';
if ($contentType)
$header = 'Cache-Control: max-age=180';
return <<<EOFHTTP/1.1 200 OK
Content-Type: $contentTypeContent-Length: $length$header$responseEOF;
}}
登录后复制
待执行的脚本:test.php
<?php
while(true) {
file_put_contents(__DIR__ . '/test.log', date('Y-m-d H:i:s'));
sleep(1);}
登录后复制
在当前目录下的视图页面:
|- Process.php
|- Http.php
|- StreamConnection.php
|- Consumer.php
|- BaseObject.php
|- views/
最后
以上就是想人陪月饼为你收集整理的PHP模拟supervisor的进程管理的全部内容,希望文章能够帮你解决PHP模拟supervisor的进程管理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复