Event Loop

图

EventLoop 是一个Reactor线程,其中运行了一个epoll实例

可通过接口添加socket描述符到epoll监听中,并指定事件相应的回调函数

Event Loop不可用于FPM环境下(FPM请求完成会自动关闭、是web访问的)

PHP代码示例tcp聊天室

sever代码

<?php
class Server
{
    private $serv;


    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 3,
            'task_worker_num'=>3,
            'daemonize' => false,
            'max_request' => 10000,
            'dispatch_mode' => 2,
//            'debug_mode'=> 0,

        ));

        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));

        $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));

        $this->serv->on('Task',array($this,'onTask'));
        $this->serv->on('Finish',array($this,'onFinish'));

        $this->serv->start();
    }
    //task回调会在worker进程创建之初调用 不会区分是worker进程还是taskworker进程
    public function onWorkerStart($serv,$worker_id)
    {
    }
    public function onStart( $serv ) {
        echo "Start\n";
    }

    public function onConnect( $serv, $fd, $from_id ) {
//        $serv->send( $fd, "Hello {$fd}!" );
        echo "client {$fd} Connect\n";
    }

    //收到客户端的请求调用时候被调用
    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "GET Message From Client {$fd}:{$data}";
        foreach ($serv->connections as $client){
            if($fd !=$client){
                $serv->send($client,$data);
            }
        }
    }

    public function onTask($serv,$task_id,$from_id,$data)
    {
        return '完成了';
    }

    public function onFinish($serv,$task_id,$data){
        echo "Task {$task_id} finish\n";
        echo "Result:{$data}  \n";

    }

    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}


// 启动服务器
$server = new Server();

clent端代码

<?php
$socket=stream_socket_client("tcp://127.0.0.1:9501",$erron,$errstr,30);

function onRead(){
 global  $socket;
 $buffer=stream_socket_recvfrom($socket,1024);
 if(!$buffer){
     echo "Server closed\n";
     swoole_event_del($socket);
 }
 echo "\nRECV:{$buffer}\n";
 fwrite(STDOUT,"Enter Msg:");
}

function onWrite()
{
    global $socket;
    echo "on Write\n";
}
function onInput()
{
    global $socket;
    $msg=trim(fgets(STDIN));
    if($msg=='exit'){
//swoole监听退出
        swoole_event_exit();
        exit();
    }
    swoole_event_write($socket,$msg);
    fwrite(STDOUT,'Enter Msg:');
}

//获取tcp链接描述符、注册读写事件
swoole_event_add($socket,'onRead','onWrite');
//监听键盘标准输入事件
swoole_event_add(STDIN,'onInput');
fwrite(STDOUT,"Enter Msg:");

图

Event Loop常见问题

Q:为什么开启了event loop的程序会一直运行不停止?

A:开启Event loop后,程序内会启动一个线程并一直阻塞在epoll的监听上,因此不会退出

Q:如何关闭Event Loop?

A:调用swoole_event_exit函数即可关闭事件循环(注意:swoole_server程序中此函数无效)

注:在实际项目中用的不多

swoole_process相关知识

图

1、本质基于C语言封装的进程管理模块,方便PHP的多进程编程

2、内置管道、消息队列接口,可方便实现进程间通信

3、提供自定义信号管理

demo1异步

<?php

class BaseProcess{
    private $process;
    public function __construct()
    {
        $this->process= new swoole_process(
            array($this,'run'),false,true);
        $this->process->start();
        //异步的监听管道并进行处理
        swoole_event_add($this->process->pipe,function($pipe){
            $data=$this->process->read();
            echo "RECV: ".$data.PHP_EOL;
        });
    }

    public function run($worker)
    {
        swoole_timer_tick(1000,function($timer_id){
            static $index=0;
            $index=$index+1;
            $this->process->write("Hello");
            var_dump($index);
            if($index==10){
                swoole_timer_clear($timer_id);
            }
        });
    }
}

new BaseProcess();

swoole_process::signal(SIGCHLD,function ($sig){
    //必须为false ,非阻塞模式、回收子线程
    while ($ret=swoole_process::wait(false)){
        echo "PID={$ret['pid']}\n";
    }
});

输出pid后父进程并没有关闭、因为还有个swoole_event_add函数监听

demo2基于消息队列同步

<?php

class BaseProcess{
    private $process;
    public function __construct()
    {
        $this->process= new swoole_process(
            array($this,'run'),false,true);
        //创建消息队列
        if(!$this->process->useQueue(123)){
            //通过此方式打印错误
            var_dump(swoole_strerror(swoole_error()));
            exit;
        }



        $this->process->start();
        while (true){
            $data=$this->process->pop();
            echo "RECV:".$data.PHP_EOL;
        }
    }

    public function run($worker)
    {
        swoole_timer_tick(1000,function($timer_id){
            static $index=0;
            $index=$index+1;
            $this->process->push("Hello");
            var_dump($index);
            if($index==10){
                swoole_timer_clear($timer_id);
            }
        });
    }
}

new BaseProcess();

swoole_process::signal(SIGCHLD,function ($sig){
    //必须为false ,非阻塞模式、回收子线程
    while ($ret=swoole_process::wait(false)){
        echo "PID={$ret['pid']}\n";
    }
});

因为while 所有ctrl+c关闭

demo3动态进程池

<?php

class BaseProcess{
    private $process;

    private $process_use=[];
    private $process_list=[];
    private $min_worker_num=3;
    private $max_worker_num=6;
    private $current_num;


    public function __construct()
    {
        $this->process= new swoole_process(array($this,'run'),false,2);
        $this->process->start();
        swoole_process::wait();

    }

    public function run($worker)
    {
        $this->current_num=$this->min_worker_num;
        for($i=0;$i<$this->current_num;$i++){
            $process=new swoole_process(array($this,'task_run'),false,2);
            $pid=$process->start();
            $this->process_list[$pid]=$process;
            $this->process_use[$pid]=0;
        }
        foreach ($this->process_list as $process){
            swoole_event_add($process->pipe,function($pipe)use($process){
                $data=$process->read();
                //接受子进程传来的数据 (任务完成后接收)
                var_dump($data."@");
                $this->process_use[$data]=0;
            });
        }

        swoole_timer_tick(1000,function($timer_id){
            static $index=0;
            $index =$index+1;
            $flag=true;
            foreach ($this->process_use as $pid=>$used){
                if($used==0){
                    $flag=false;
                    $this->process_use[$pid]=1;
                    $this->process_list[$pid]->write($index."Hello");
                    break;
                }
            }

            if($flag && $this->current_num<$this->max_worker_num){
                $process=new swoole_process(array($this,'task_run'),false,2);
                $pid=$process->start();
                $this->process_list[$pid]=$process;
                $this->process_use[$pid]=1;
                $this->process_list[$pid]->write($index."hello");
                $this->current_num++;
            }
            var_dump($index);
            if($index==10){
                foreach ($this->process_list as $process){
                    $process->write("exit");
                }
                swoole_timer_clear($timer_id);
                $this->process->exit();
            }


        });
    }

    public function task_run($worker)
    {
        swoole_event_add($worker->pipe,function ($pipe)use($worker){
           $data=$worker->read();
           var_dump($worker->pid.":".$data);
           if($data=='exit'){
               $worker->exit();
               exit;
           }
           sleep(5);
           //向父进程写入数据 (任务完成)
           $worker->write($worker->pid);
        });
    }
}

new BaseProcess();
Last modification:January 26, 2020
如果觉得我的文章对你有用,请随意赞赏