图

进程:在操作系统中一个进程就是一个正在运行的程序。

  • 子进程会复制父进程的内存空间和上下文环境
  • 修改某个子进程的内存空间,不会修改父进程或其他子进程中的内存空间(进程是独立的)
  • 子进程会复制父进程的IO句柄(打开文件这种,所有子进程都会读写文件,所有会有文件锁机制,防止进程中的读写的互斥。fd描述符),

进程是独立的,那么进程相互是如何通信的?

1、共享内存 (其中的一种)

  • 共享内存不属于任何进程
  • 在共享内存中分配的内存空间可以被任何进程访问(只要拥有共享内存的索引id)。
  • 即使进程关闭,共享内存仍然可以继续保留。

图

shmid 共享内存的索引id

owner 用户

perms 权限

------ Shared Memory Segments -------- 
key shmid owner perms bytes nattch status 
0x00000000 163840 wangjiangf 600 393216 2 dest 
0x00000000 196609 wangjiangf 600 393216 2 dest

2、管道

图

  1. 创建管道是一组(2个)特殊的描述符(读写)
  2. 管道需要在fork函数调用前创建
  3. 如果某一端主动关闭管道,另一端的读取操作会直接返回0

3、消息队列

消息队列和正常开发原理是一样的、具有消息的可靠性

1、通过指定key创建一个消息队列

2、在消息队列中传递的数据大小限制

3、消息队列会一直保留知道被主动关闭

swoole的结构图

图

图

1、Master:处理核心事件驱动(主进程、拥有若干个Reactor线程)

2、Reactor:处理TCP连接,收发数据的线程。Swoole的主线程在Accept新的连接后,会将这个连接分配给一个固定的Reactor线程,并由这个线程负责监听此socket。在socket可读时读取数据,并进行协议解析,将请求投递到Worker进程。在socket可写时将数据发送给TCP客户端。

  • 负责维护客户端TCP连接、处理网络IO、处理协议、收发数据
  • 完全是异步非阻塞的模式
  • 全部为C代码,除Start/Shudown事件回调外,不执行任何PHP代码
  • 将TCP客户端发来的数据缓冲、拼接、拆分成完整的一个请求数据包
  • Reactor以多线程的方式运行

3、Manager进程(管理进程、做进程的管理和分配):

  • swoole中worker/task进程都是由Manager进程Fork并管理的。
  • 子进程结束运行时,manager进程负责回收此子进程,避免成为僵尸进程。并创建新的子进程
  • 服务器关闭时,manager进程将发送信号给所有子进程,通知子进程关闭服务
  • 服务器reload时,manager进程会逐个关闭/重启子进程

4、Worker进程:处理客户端请求

  • 接受由Reactor线程投递的请求数据包,并执行PHP回调函数处理数据
  • 生成响应数据并发给Reactor线程,由Reactor线程发送给TCP客户端
  • 可以是异步非阻塞模式,也可以是同步阻塞模式
  • Worker以多进程的方式运行

5、Task进程:异步工作进程

  • 接受由Worker进程通过swoole_server->task/taskwait方法投递的任务
  • 处理任务,并将结果数据返回(swoole_server->finish)给Worker进程
  • 完全是同步阻塞模式
  • TaskWorker以多进程的方式运行

worker进程中的receive、close、connect



//注意ip如果是127.0.0.1的话,不是本机访问不了
$serv = new Swoole_Server("0.0.0.0", 9501);
$serv->on('connect', function ($serv, $fd){
    echo "数据已经连接.\n";
});
$serv->on('receive', function ($serv, $fd, $reactor_id, $data) {
    echo "请求过来的数据:".$data;
    $serv->send($fd, "hello swoole");
    $serv->close($fd);
});
//连接断开触发的毁掉函数。
$serv->on('close', function ($serv, $fd) {
    echo "Client: Close.\n";
});
$serv->start();

通过火狐浏览器连接9501端口(部分浏览器可能出现错误)

图

终端输出内容

图

也可以通过telnet 连接并且发送数据。

通过socket代码连接(php-fpm和命令行通用)


$socket = socket_create(AF_INET,SOCK_STREAM,SOL_TCP);

socket_connect($socket,"192.168.205.10",9501);
socket_write($socket,"hell server\n");
$out=socket_read($socket,1024);
socket_close($socket);

图

swoole中server_client连接(php-fpm和命令行通用)

$client = new swoole\client(SWOOLE_SOCK_TCP);
$client->connect('192.168.205.10', 9501);
$client->send("hello world\n");
echo $client->recv();
$client->close();

Reactor、Worker、TaskWorker的关系

可以理解为Reactor就是nginx,Worker就是php-fpm。Reactor线程异步并行地处理网络请求,然后再转发给Worker进程中去处理。Reactor和Worker间通过UnixSocket进行通信。

在php-fpm的应用中,经常会将一个任务异步投递到Redis等队列中,并在后台启动一些php进程异步地处理这些任务。Swoole提供的Worker是一套更完整的方案,将任务的投递、队列、php任务处理进程管理合为一体。通过底层提供的API可以非常简单地实现异步任务的处理。另外TaskWorker还可以在任务执行完成后,再返回一个结果反馈到Worker。

Swoole的Reactor、Worker、TaskWorker之间可以紧密的结合起来,提供更高级的使用方式。

一个更通俗的比喻,假设Server就是一个工厂,那Reactor就是销售,接受客户订单。而Worker就是工人,当销售接到订单后,Worker去工作生产出客户要的东西。而TaskWorker可以理解为行政人员,可以帮助Worker干些杂事,让Worker专心工作。

底层会为Worker进程、TaskWorker进程分配一个唯一的ID
不同的Worker和TaskWorker进程之间可以通过sendMessage接口进行通信

Task进程

流程图

图

流程代码示例

server端 2.x版本

<?php

class Server
{
    private $serv;

    public function __construct()
    {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array('worker_num' => 3, 'task_worker_num' => 3, 'daemonize' => false, 'max_request' => 10000, 'dispatch_mode' => 2,
            //            'debug_mode'=> 0,
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }

    public function onStart($serv)
    {
        echo "Start\n";
    }

    public function onConnect($serv, $fd, $from_id)
    {
        //        $serv->send( $fd, "Hello {$fd}!" );
        echo "client {$fd} Connect\n";

    }

    public function onReceive(swoole_server $serv, $fd, $from_id, $data)
    {
        echo "Get Message From Client {$fd}:{$data}\n";
        $data = ['task' => 'task_1', 'params' => $data, 'fd' => $fd,];
        $serv->task(json_encode($data));
    }

    public function onTask($serv, $task_id, $from_id, $data)
    {
        echo "This Task {$task_id} from worker {$from_id}\n";
        echo "Data:{$data}\n";
        $data = json_decode($data, true);
        var_dump($data);
        $serv->send($data['fd'], date('Y-m-d H:i:s', time()));
        return "finished (return后自动调用onFinish)";
    }

    public function onFinish($serv, $task_id, $data)
    {
        echo "Task {$task_id} finish\n";
        echo "Result:{$data}  \n";
    }

    public function onClose($serv, $fd, $from_id)
    {
        echo "Client {$fd} close connection\n";
    }
}

// 启动服务器
$server = new Server();

客户端代码

class Client
{
    private $client;

    public function __construct()
    {
        $this->client = new swoole_client(SWOOLE_SOCK_TCP);
    }

    public function connect()
    {
        if (!$this->client->connect("127.0.0.1", 9501, 1)) {
            echo "Error:链接失败\n";
        }
        fwrite(STDOUT, "请输入消息:");
        $msg = trim(fgets(STDIN));
        $this->client->send($msg);
        $message = $this->client->recv();
        echo "Get Message From Server:{$message}\n";
    }
}

$client = new Client();
$client->connect();

注意点

  • TaskWorker进程的通信通过unix socket进行
  • Task数据传递大小小于8K直接管道传递,大于8K写入临时文件传递
  • Task传递对象 1、可以通过序列化传递一个对象的拷贝(非引用)
  • Task传递对象 2、Task对象的改变不会反应到worker中(连个进程中是独立的)
  • Task传递对象 3、数据库连接,网络连接对象不可传递
  • Task的Onfinish的回调会传递给调用给该task方法的worker进程(原路返回可寻)

代码示例

<?php

class Server
{
    private $serv;
    public $test;

    public function __construct()
    {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 3,
            'task_worker_num' => 3,
            'daemonize' => false,
            'max_request' => 10000,
            'dispatch_mode' => 2,
//            'debug_mode'=> 0,

        ));

        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));

        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));

        $this->serv->start();
    }

    public function onStart($serv)
    {
        echo "Start\n";
    }

    public function onConnect($serv, $fd, $from_id)
    {
//        $serv->send( $fd, "Hello {$fd}!" );
        echo "client {$fd} Connect\n";
    }

    public function onReceive(swoole_server $serv, $fd, $from_id, $data)
    {
        echo "Get Message From Client {$fd}:{$data}\n";
        $this->test = new Test();
        var_dump($this->test);

        $serv->task(serialize($this->test));
        $serv->send($fd, "我的");
    }

    public function onTask($serv, $task_id, $from_id, $data)
    {
        echo "This Task {$task_id} from worker {$from_id}\n";
        echo "Data:{$data}\n";
        $data = unserialize($data);
        $data->index = 2;
        var_dump($data);
        var_dump($data->test);
        return "finished (return后自动调用onFinish)";
    }

    public function onFinish($serv, $task_id, $data)
    {
        echo "Task {$task_id} finish\n";
        echo "Result:{$data}  \n";
        var_dump($this->test);
    }

    public function onClose($serv, $fd, $from_id)
    {
        echo "Client {$fd} close connection\n";
    }
}

class Test
{
    public $index = 0;
}

// 启动服务器
$server = new Server();

结果

图

server端4.4.4版本 协程写法、客户端用浏览器访问、kill命令发送重启、可以执行协程任务。

class Server
{
    private $serv;
    public function __construct()
    {
        $this->serv = new swoole\server("0.0.0.0", 9501);
        $this->serv->set(array('worker_num' => 3, 'task_worker_num' => 3, 'daemonize' => false, 'max_request' => 10000, 'dispatch_mode' => 2,
            'task_enable_coroutine' => true,
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }
    public function onStart($serv)
    {
        cli_set_process_title("server");
        file_put_contents('server.pid',$this->serv->master_pid);
        echo "Start\n";
    }

    public function onConnect($serv, $fd, $from_id)
    {
        //        $serv->send( $fd, "Hello {$fd}!" );
        echo "client {$fd} Connect\n";

    }

    public function onReceive(swoole\server $serv, $fd, $from_id, $data)
    {
        echo "Get Message From Client {$fd}:{$data}\n";
        $data = ['task' => 'task_1', 'params' => $data, 'fd' => $fd,];
        $serv->send($fd, date('Y-m-d H:i:s', time()));
        $serv->task($data);
    }

    /**
     * @param $serv
     * @param \Swoole\Server\Task $task
     */
    public function onTask($serv, Swoole\Server\Task $task)
    {
        for($i=0;$i&lt;=4;$i++){
        go(function()use($i){
            echo $i.PHP_EOL;
        });
        }
        //任务的编号
        var_dump($task->id);
        var_dump($task->worker_id);
        var_dump($task->data);
        var_dump($task->flags);
//        $serv->send($task->data['fd'], date('Y-m-d H:i:s', time()));
        $task->finish([123, 'hello']);
    }

    public function onFinish($serv, $task_id, $data)
    {
        echo "Task {$task_id} finish\n";
        echo "Result:{$data[0]}  \n";
    }

    public function onClose($serv, $fd, $from_id)
    {
        echo "Client {$fd} close connection\n";
    }
}

// 启动服务器
$server = new Server();

kill -USR2 $(cat server.pid)

客户端用php-fpm的方式,浏览器访问

class Client
{
    private $client;

    public function __construct()
    {
        $this->client = new swoole_client(SWOOLE_SOCK_TCP);
    }

    public function connect()
    {
        if (!$this->client->connect("127.0.0.1", 9501, 1)) {
            echo "Error:链接失败\n";
        }
        $msg =['sql'=>'select * from wk_user;'];
        $this->client->send(json_encode($msg));
        $message = $this->client->recv();
        echo "Get Message From Server:{$message}\n";
    }
}

$client = new Client();
$client->connect();

异步mysql连接池

<?php
class Server
{
    private $serv;
    public $pdo;

    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 3,
            'task_worker_num'=>3,
            'daemonize' => false,
            'max_request' => 10000,
            'dispatch_mode' => 2,
//            'debug_mode'=> 0,

        ));

        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));

        $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));

        $this->serv->on('Task',array($this,'onTask'));
        $this->serv->on('Finish',array($this,'onFinish'));

        $this->serv->start();
    }

    public function onStart( $serv ) {
        echo "Start\n";
    }

    public function onConnect( $serv, $fd, $from_id ) {
//        $serv->send( $fd, "Hello {$fd}!" );
        echo "client {$fd} Connect\n";
    }

    //task回调会在worker进程创建之初调用 不会区分是worker进程还是taskworker进程
    public function onWorkerStart($serv,$worker_id)
    {
        //异步的mysql连接池、在task中维持一个pdo的实例
        //和 'worker_num' => 3,对应
        if($serv->taskworker){
            $this->pdo=new PDO(
                "mysql:host=127.0.0.1;port=3306;dbname=test",
                'root',
                '123456',
                array(
                    PDO::MYSQL_ATTR_INIT_COMMAND=>"SET NAMES 'UTF8';",
                    PDO::ATTR_ERRMODE=>PDO::ERRMODE_EXCEPTION,
                    PDO::ATTR_PERSISTENT=>true
                )
            );
            echo "Task Worker\n";
        }else{
            echo "Worker Process\n";
        }

    }

    //收到客户端的请求
    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        $task=[
            'sql'=>"select * from  userinfo;",
            'params'=>[],
            'fd'=>$fd
        ];
        $serv->task(json_encode($task));

    }

    public function onTask($serv,$task_id,$from_id,$data)
    {

        try{
            $data=json_decode($data,true);

            $statement=$this->pdo->query($data['sql']);
            while($row =  $statement->fetch(PDO::FETCH_ASSOC)){
                print_r($row);
            }
        }catch (PDOException $e){
            var_dump($e->getMessage());
            return false;
        }
    }

    public function onFinish($serv,$task_id,$data){
        echo "Task {$task_id} finish\n";
        echo "Result:{$data}  \n";

    }

    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}

// 启动服务器
$server = new Server();
Last modification:January 26, 2020
如果觉得我的文章对你有用,请随意赞赏