进程:在操作系统中一个进程就是一个正在运行的程序。
- 子进程会复制父进程的内存空间和上下文环境
- 修改某个子进程的内存空间,不会修改父进程或其他子进程中的内存空间(进程是独立的)
- 子进程会复制父进程的IO句柄(打开文件这种,所有子进程都会读写文件,所有会有文件锁机制,防止进程中的读写的互斥。fd描述符),
进程是独立的,那么进程相互是如何通信的?
1、共享内存 (其中的一种)
- 共享内存不属于任何进程
- 在共享内存中分配的内存空间可以被任何进程访问(只要拥有共享内存的索引id)。
- 即使进程关闭,共享内存仍然可以继续保留。
shmid 共享内存的索引id
owner 用户
perms 权限
------ Shared Memory Segments --------
key shmid owner perms bytes nattch status
0x00000000 163840 wangjiangf 600 393216 2 dest
0x00000000 196609 wangjiangf 600 393216 2 dest
2、管道
- 创建管道是一组(2个)特殊的描述符(读写)
- 管道需要在fork函数调用前创建
- 如果某一端主动关闭管道,另一端的读取操作会直接返回0
3、消息队列
消息队列和正常开发原理是一样的、具有消息的可靠性
1、通过指定key创建一个消息队列
2、在消息队列中传递的数据大小限制
3、消息队列会一直保留知道被主动关闭
swoole的结构图
1、Master:处理核心事件驱动(主进程、拥有若干个Reactor线程)
2、Reactor:处理TCP连接,收发数据的线程。Swoole的主线程在Accept新的连接后,会将这个连接分配给一个固定的Reactor线程,并由这个线程负责监听此socket。在socket可读时读取数据,并进行协议解析,将请求投递到Worker进程。在socket可写时将数据发送给TCP客户端。
- 负责维护客户端TCP连接、处理网络IO、处理协议、收发数据
- 完全是异步非阻塞的模式
- 全部为C代码,除Start/Shudown事件回调外,不执行任何PHP代码
- 将TCP客户端发来的数据缓冲、拼接、拆分成完整的一个请求数据包
- Reactor以多线程的方式运行
3、Manager进程(管理进程、做进程的管理和分配):
- swoole中worker/task进程都是由Manager进程Fork并管理的。
- 子进程结束运行时,manager进程负责回收此子进程,避免成为僵尸进程。并创建新的子进程
- 服务器关闭时,manager进程将发送信号给所有子进程,通知子进程关闭服务
- 服务器reload时,manager进程会逐个关闭/重启子进程
4、Worker进程:处理客户端请求
- 接受由Reactor线程投递的请求数据包,并执行PHP回调函数处理数据
- 生成响应数据并发给Reactor线程,由Reactor线程发送给TCP客户端
- 可以是异步非阻塞模式,也可以是同步阻塞模式
- Worker以多进程的方式运行
5、Task进程:异步工作进程
- 接受由Worker进程通过swoole_server->task/taskwait方法投递的任务
- 处理任务,并将结果数据返回(swoole_server->finish)给Worker进程
- 完全是同步阻塞模式
- TaskWorker以多进程的方式运行
worker进程中的receive、close、connect
//注意ip如果是127.0.0.1的话,不是本机访问不了
$serv = new Swoole_Server("0.0.0.0", 9501);
$serv->on('connect', function ($serv, $fd){
echo "数据已经连接.\n";
});
$serv->on('receive', function ($serv, $fd, $reactor_id, $data) {
echo "请求过来的数据:".$data;
$serv->send($fd, "hello swoole");
$serv->close($fd);
});
//连接断开触发的毁掉函数。
$serv->on('close', function ($serv, $fd) {
echo "Client: Close.\n";
});
$serv->start();
通过火狐浏览器连接9501端口(部分浏览器可能出现错误)
终端输出内容
也可以通过telnet 连接并且发送数据。
通过socket代码连接(php-fpm和命令行通用)
$socket = socket_create(AF_INET,SOCK_STREAM,SOL_TCP);
socket_connect($socket,"192.168.205.10",9501);
socket_write($socket,"hell server\n");
$out=socket_read($socket,1024);
socket_close($socket);
swoole中server_client连接(php-fpm和命令行通用)
$client = new swoole\client(SWOOLE_SOCK_TCP);
$client->connect('192.168.205.10', 9501);
$client->send("hello world\n");
echo $client->recv();
$client->close();
Reactor、Worker、TaskWorker的关系
可以理解为Reactor就是nginx,Worker就是php-fpm。Reactor线程异步并行地处理网络请求,然后再转发给Worker进程中去处理。Reactor和Worker间通过UnixSocket进行通信。
在php-fpm的应用中,经常会将一个任务异步投递到Redis等队列中,并在后台启动一些php进程异步地处理这些任务。Swoole提供的Worker是一套更完整的方案,将任务的投递、队列、php任务处理进程管理合为一体。通过底层提供的API可以非常简单地实现异步任务的处理。另外TaskWorker还可以在任务执行完成后,再返回一个结果反馈到Worker。
Swoole的Reactor、Worker、TaskWorker之间可以紧密的结合起来,提供更高级的使用方式。
一个更通俗的比喻,假设Server就是一个工厂,那Reactor就是销售,接受客户订单。而Worker就是工人,当销售接到订单后,Worker去工作生产出客户要的东西。而TaskWorker可以理解为行政人员,可以帮助Worker干些杂事,让Worker专心工作。
底层会为Worker进程、TaskWorker进程分配一个唯一的ID
不同的Worker和TaskWorker进程之间可以通过sendMessage接口进行通信
Task进程
流程图
流程代码示例
server端 2.x版本
<?php
class Server
{
private $serv;
public function __construct()
{
$this->serv = new swoole_server("0.0.0.0", 9501);
$this->serv->set(array('worker_num' => 3, 'task_worker_num' => 3, 'daemonize' => false, 'max_request' => 10000, 'dispatch_mode' => 2,
// 'debug_mode'=> 0,
));
$this->serv->on('Start', array($this, 'onStart'));
$this->serv->on('Connect', array($this, 'onConnect'));
$this->serv->on('Receive', array($this, 'onReceive'));
$this->serv->on('Close', array($this, 'onClose'));
$this->serv->on('Task', array($this, 'onTask'));
$this->serv->on('Finish', array($this, 'onFinish'));
$this->serv->start();
}
public function onStart($serv)
{
echo "Start\n";
}
public function onConnect($serv, $fd, $from_id)
{
// $serv->send( $fd, "Hello {$fd}!" );
echo "client {$fd} Connect\n";
}
public function onReceive(swoole_server $serv, $fd, $from_id, $data)
{
echo "Get Message From Client {$fd}:{$data}\n";
$data = ['task' => 'task_1', 'params' => $data, 'fd' => $fd,];
$serv->task(json_encode($data));
}
public function onTask($serv, $task_id, $from_id, $data)
{
echo "This Task {$task_id} from worker {$from_id}\n";
echo "Data:{$data}\n";
$data = json_decode($data, true);
var_dump($data);
$serv->send($data['fd'], date('Y-m-d H:i:s', time()));
return "finished (return后自动调用onFinish)";
}
public function onFinish($serv, $task_id, $data)
{
echo "Task {$task_id} finish\n";
echo "Result:{$data} \n";
}
public function onClose($serv, $fd, $from_id)
{
echo "Client {$fd} close connection\n";
}
}
// 启动服务器
$server = new Server();
客户端代码
class Client
{
private $client;
public function __construct()
{
$this->client = new swoole_client(SWOOLE_SOCK_TCP);
}
public function connect()
{
if (!$this->client->connect("127.0.0.1", 9501, 1)) {
echo "Error:链接失败\n";
}
fwrite(STDOUT, "请输入消息:");
$msg = trim(fgets(STDIN));
$this->client->send($msg);
$message = $this->client->recv();
echo "Get Message From Server:{$message}\n";
}
}
$client = new Client();
$client->connect();
注意点
Task
和Worker
进程的通信通过unix socket进行- Task数据传递大小小于8K直接管道传递,大于8K写入临时文件传递
- Task传递对象 1、可以通过序列化传递一个对象的拷贝(非引用)
- Task传递对象 2、Task对象的改变不会反应到worker中(连个进程中是独立的)
- Task传递对象 3、数据库连接,网络连接对象不可传递
- Task的Onfinish的回调会传递给调用给该task方法的worker进程(原路返回可寻)
代码示例
<?php
class Server
{
private $serv;
public $test;
public function __construct()
{
$this->serv = new swoole_server("0.0.0.0", 9501);
$this->serv->set(array(
'worker_num' => 3,
'task_worker_num' => 3,
'daemonize' => false,
'max_request' => 10000,
'dispatch_mode' => 2,
// 'debug_mode'=> 0,
));
$this->serv->on('Start', array($this, 'onStart'));
$this->serv->on('Connect', array($this, 'onConnect'));
$this->serv->on('Receive', array($this, 'onReceive'));
$this->serv->on('Close', array($this, 'onClose'));
$this->serv->on('Task', array($this, 'onTask'));
$this->serv->on('Finish', array($this, 'onFinish'));
$this->serv->start();
}
public function onStart($serv)
{
echo "Start\n";
}
public function onConnect($serv, $fd, $from_id)
{
// $serv->send( $fd, "Hello {$fd}!" );
echo "client {$fd} Connect\n";
}
public function onReceive(swoole_server $serv, $fd, $from_id, $data)
{
echo "Get Message From Client {$fd}:{$data}\n";
$this->test = new Test();
var_dump($this->test);
$serv->task(serialize($this->test));
$serv->send($fd, "我的");
}
public function onTask($serv, $task_id, $from_id, $data)
{
echo "This Task {$task_id} from worker {$from_id}\n";
echo "Data:{$data}\n";
$data = unserialize($data);
$data->index = 2;
var_dump($data);
var_dump($data->test);
return "finished (return后自动调用onFinish)";
}
public function onFinish($serv, $task_id, $data)
{
echo "Task {$task_id} finish\n";
echo "Result:{$data} \n";
var_dump($this->test);
}
public function onClose($serv, $fd, $from_id)
{
echo "Client {$fd} close connection\n";
}
}
class Test
{
public $index = 0;
}
// 启动服务器
$server = new Server();
结果
server端4.4.4版本 协程写法、客户端用浏览器访问、kill命令发送重启、可以执行协程任务。
class Server
{
private $serv;
public function __construct()
{
$this->serv = new swoole\server("0.0.0.0", 9501);
$this->serv->set(array('worker_num' => 3, 'task_worker_num' => 3, 'daemonize' => false, 'max_request' => 10000, 'dispatch_mode' => 2,
'task_enable_coroutine' => true,
));
$this->serv->on('Start', array($this, 'onStart'));
$this->serv->on('Connect', array($this, 'onConnect'));
$this->serv->on('Receive', array($this, 'onReceive'));
$this->serv->on('Close', array($this, 'onClose'));
$this->serv->on('Task', array($this, 'onTask'));
$this->serv->on('Finish', array($this, 'onFinish'));
$this->serv->start();
}
public function onStart($serv)
{
cli_set_process_title("server");
file_put_contents('server.pid',$this->serv->master_pid);
echo "Start\n";
}
public function onConnect($serv, $fd, $from_id)
{
// $serv->send( $fd, "Hello {$fd}!" );
echo "client {$fd} Connect\n";
}
public function onReceive(swoole\server $serv, $fd, $from_id, $data)
{
echo "Get Message From Client {$fd}:{$data}\n";
$data = ['task' => 'task_1', 'params' => $data, 'fd' => $fd,];
$serv->send($fd, date('Y-m-d H:i:s', time()));
$serv->task($data);
}
/**
* @param $serv
* @param \Swoole\Server\Task $task
*/
public function onTask($serv, Swoole\Server\Task $task)
{
for($i=0;$i<=4;$i++){
go(function()use($i){
echo $i.PHP_EOL;
});
}
//任务的编号
var_dump($task->id);
var_dump($task->worker_id);
var_dump($task->data);
var_dump($task->flags);
// $serv->send($task->data['fd'], date('Y-m-d H:i:s', time()));
$task->finish([123, 'hello']);
}
public function onFinish($serv, $task_id, $data)
{
echo "Task {$task_id} finish\n";
echo "Result:{$data[0]} \n";
}
public function onClose($serv, $fd, $from_id)
{
echo "Client {$fd} close connection\n";
}
}
// 启动服务器
$server = new Server();
kill -USR2 $(cat server.pid)
客户端用php-fpm的方式,浏览器访问
class Client
{
private $client;
public function __construct()
{
$this->client = new swoole_client(SWOOLE_SOCK_TCP);
}
public function connect()
{
if (!$this->client->connect("127.0.0.1", 9501, 1)) {
echo "Error:链接失败\n";
}
$msg =['sql'=>'select * from wk_user;'];
$this->client->send(json_encode($msg));
$message = $this->client->recv();
echo "Get Message From Server:{$message}\n";
}
}
$client = new Client();
$client->connect();
异步mysql连接池
<?php
class Server
{
private $serv;
public $pdo;
public function __construct() {
$this->serv = new swoole_server("0.0.0.0", 9501);
$this->serv->set(array(
'worker_num' => 3,
'task_worker_num'=>3,
'daemonize' => false,
'max_request' => 10000,
'dispatch_mode' => 2,
// 'debug_mode'=> 0,
));
$this->serv->on('Start', array($this, 'onStart'));
$this->serv->on('Connect', array($this, 'onConnect'));
$this->serv->on('Receive', array($this, 'onReceive'));
$this->serv->on('Close', array($this, 'onClose'));
$this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
$this->serv->on('Task',array($this,'onTask'));
$this->serv->on('Finish',array($this,'onFinish'));
$this->serv->start();
}
public function onStart( $serv ) {
echo "Start\n";
}
public function onConnect( $serv, $fd, $from_id ) {
// $serv->send( $fd, "Hello {$fd}!" );
echo "client {$fd} Connect\n";
}
//task回调会在worker进程创建之初调用 不会区分是worker进程还是taskworker进程
public function onWorkerStart($serv,$worker_id)
{
//异步的mysql连接池、在task中维持一个pdo的实例
//和 'worker_num' => 3,对应
if($serv->taskworker){
$this->pdo=new PDO(
"mysql:host=127.0.0.1;port=3306;dbname=test",
'root',
'123456',
array(
PDO::MYSQL_ATTR_INIT_COMMAND=>"SET NAMES 'UTF8';",
PDO::ATTR_ERRMODE=>PDO::ERRMODE_EXCEPTION,
PDO::ATTR_PERSISTENT=>true
)
);
echo "Task Worker\n";
}else{
echo "Worker Process\n";
}
}
//收到客户端的请求
public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
$task=[
'sql'=>"select * from userinfo;",
'params'=>[],
'fd'=>$fd
];
$serv->task(json_encode($task));
}
public function onTask($serv,$task_id,$from_id,$data)
{
try{
$data=json_decode($data,true);
$statement=$this->pdo->query($data['sql']);
while($row = $statement->fetch(PDO::FETCH_ASSOC)){
print_r($row);
}
}catch (PDOException $e){
var_dump($e->getMessage());
return false;
}
}
public function onFinish($serv,$task_id,$data){
echo "Task {$task_id} finish\n";
echo "Result:{$data} \n";
}
public function onClose( $serv, $fd, $from_id ) {
echo "Client {$fd} close connection\n";
}
}
// 启动服务器
$server = new Server();