Event Loop
EventLoop 是一个Reactor线程,其中运行了一个epoll实例
可通过接口添加socket描述符到epoll监听中,并指定事件相应的回调函数
Event Loop不可用于FPM环境下(FPM请求完成会自动关闭、是web访问的)
PHP代码示例tcp聊天室
sever代码
<?php
class Server
{
private $serv;
public function __construct() {
$this->serv = new swoole_server("0.0.0.0", 9501);
$this->serv->set(array(
'worker_num' => 3,
'task_worker_num'=>3,
'daemonize' => false,
'max_request' => 10000,
'dispatch_mode' => 2,
// 'debug_mode'=> 0,
));
$this->serv->on('Start', array($this, 'onStart'));
$this->serv->on('Connect', array($this, 'onConnect'));
$this->serv->on('Receive', array($this, 'onReceive'));
$this->serv->on('Close', array($this, 'onClose'));
$this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
$this->serv->on('Task',array($this,'onTask'));
$this->serv->on('Finish',array($this,'onFinish'));
$this->serv->start();
}
//task回调会在worker进程创建之初调用 不会区分是worker进程还是taskworker进程
public function onWorkerStart($serv,$worker_id)
{
}
public function onStart( $serv ) {
echo "Start\n";
}
public function onConnect( $serv, $fd, $from_id ) {
// $serv->send( $fd, "Hello {$fd}!" );
echo "client {$fd} Connect\n";
}
//收到客户端的请求调用时候被调用
public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
echo "GET Message From Client {$fd}:{$data}";
foreach ($serv->connections as $client){
if($fd !=$client){
$serv->send($client,$data);
}
}
}
public function onTask($serv,$task_id,$from_id,$data)
{
return '完成了';
}
public function onFinish($serv,$task_id,$data){
echo "Task {$task_id} finish\n";
echo "Result:{$data} \n";
}
public function onClose( $serv, $fd, $from_id ) {
echo "Client {$fd} close connection\n";
}
}
// 启动服务器
$server = new Server();
clent端代码
<?php
$socket=stream_socket_client("tcp://127.0.0.1:9501",$erron,$errstr,30);
function onRead(){
global $socket;
$buffer=stream_socket_recvfrom($socket,1024);
if(!$buffer){
echo "Server closed\n";
swoole_event_del($socket);
}
echo "\nRECV:{$buffer}\n";
fwrite(STDOUT,"Enter Msg:");
}
function onWrite()
{
global $socket;
echo "on Write\n";
}
function onInput()
{
global $socket;
$msg=trim(fgets(STDIN));
if($msg=='exit'){
//swoole监听退出
swoole_event_exit();
exit();
}
swoole_event_write($socket,$msg);
fwrite(STDOUT,'Enter Msg:');
}
//获取tcp链接描述符、注册读写事件
swoole_event_add($socket,'onRead','onWrite');
//监听键盘标准输入事件
swoole_event_add(STDIN,'onInput');
fwrite(STDOUT,"Enter Msg:");
Event Loop常见问题
Q:为什么开启了event loop的程序会一直运行不停止?
A:开启Event loop后,程序内会启动一个线程并一直阻塞在epoll的监听上,因此不会退出
Q:如何关闭Event Loop?
A:调用swoole_event_exit函数即可关闭事件循环(注意:swoole_server程序中此函数无效)
注:在实际项目中用的不多
swoole_process相关知识
1、本质基于C语言封装的进程管理模块,方便PHP的多进程编程
2、内置管道、消息队列接口,可方便实现进程间通信
3、提供自定义信号管理
demo1异步
<?php
class BaseProcess{
private $process;
public function __construct()
{
$this->process= new swoole_process(
array($this,'run'),false,true);
$this->process->start();
//异步的监听管道并进行处理
swoole_event_add($this->process->pipe,function($pipe){
$data=$this->process->read();
echo "RECV: ".$data.PHP_EOL;
});
}
public function run($worker)
{
swoole_timer_tick(1000,function($timer_id){
static $index=0;
$index=$index+1;
$this->process->write("Hello");
var_dump($index);
if($index==10){
swoole_timer_clear($timer_id);
}
});
}
}
new BaseProcess();
swoole_process::signal(SIGCHLD,function ($sig){
//必须为false ,非阻塞模式、回收子线程
while ($ret=swoole_process::wait(false)){
echo "PID={$ret['pid']}\n";
}
});
输出pid后父进程并没有关闭、因为还有个swoole_event_add函数监听
demo2基于消息队列同步
<?php
class BaseProcess{
private $process;
public function __construct()
{
$this->process= new swoole_process(
array($this,'run'),false,true);
//创建消息队列
if(!$this->process->useQueue(123)){
//通过此方式打印错误
var_dump(swoole_strerror(swoole_error()));
exit;
}
$this->process->start();
while (true){
$data=$this->process->pop();
echo "RECV:".$data.PHP_EOL;
}
}
public function run($worker)
{
swoole_timer_tick(1000,function($timer_id){
static $index=0;
$index=$index+1;
$this->process->push("Hello");
var_dump($index);
if($index==10){
swoole_timer_clear($timer_id);
}
});
}
}
new BaseProcess();
swoole_process::signal(SIGCHLD,function ($sig){
//必须为false ,非阻塞模式、回收子线程
while ($ret=swoole_process::wait(false)){
echo "PID={$ret['pid']}\n";
}
});
因为while 所有ctrl+c关闭
demo3动态进程池
<?php
class BaseProcess{
private $process;
private $process_use=[];
private $process_list=[];
private $min_worker_num=3;
private $max_worker_num=6;
private $current_num;
public function __construct()
{
$this->process= new swoole_process(array($this,'run'),false,2);
$this->process->start();
swoole_process::wait();
}
public function run($worker)
{
$this->current_num=$this->min_worker_num;
for($i=0;$i<$this->current_num;$i++){
$process=new swoole_process(array($this,'task_run'),false,2);
$pid=$process->start();
$this->process_list[$pid]=$process;
$this->process_use[$pid]=0;
}
foreach ($this->process_list as $process){
swoole_event_add($process->pipe,function($pipe)use($process){
$data=$process->read();
//接受子进程传来的数据 (任务完成后接收)
var_dump($data."@");
$this->process_use[$data]=0;
});
}
swoole_timer_tick(1000,function($timer_id){
static $index=0;
$index =$index+1;
$flag=true;
foreach ($this->process_use as $pid=>$used){
if($used==0){
$flag=false;
$this->process_use[$pid]=1;
$this->process_list[$pid]->write($index."Hello");
break;
}
}
if($flag && $this->current_num<$this->max_worker_num){
$process=new swoole_process(array($this,'task_run'),false,2);
$pid=$process->start();
$this->process_list[$pid]=$process;
$this->process_use[$pid]=1;
$this->process_list[$pid]->write($index."hello");
$this->current_num++;
}
var_dump($index);
if($index==10){
foreach ($this->process_list as $process){
$process->write("exit");
}
swoole_timer_clear($timer_id);
$this->process->exit();
}
});
}
public function task_run($worker)
{
swoole_event_add($worker->pipe,function ($pipe)use($worker){
$data=$worker->read();
var_dump($worker->pid.":".$data);
if($data=='exit'){
$worker->exit();
exit;
}
sleep(5);
//向父进程写入数据 (任务完成)
$worker->write($worker->pid);
});
}
}
new BaseProcess();