BeWithYou

胡搞的技术博客

  1. 首页
  2. PHP
  3. PHP中的协程(二)

PHP中的协程(二)


上篇文章里提到PHP中协程的引入,可以使PHP编程有新的玩法,不在遵循原本顺序执行的思路,从而应对大访问量和并发操作。

有赞的zan framework就是基于PHP协程的,提供最简单的方式开发面向C10K+的高并发HTTP服务或SOA服务。我并没有深入的学习这个框架,这里只打算把关于协程的部分抽出来学习一下。

zan框架高并发设计思路

粗看之下(不一定对哦),框架应该是用swoole_server + 协程解决高并发访问。

比如Web服务中,swoole的http_server只开启了少数的几个worker进程。我们知道,如果workeronRequest里使用的是异步方法,则worker的响应是异步处理的,反之则是阻塞的。

zan框架在worker进程中大量使用了PHP协程,所以一个worker进程可以响应很多并发的请求(但是本质上正在执行的只有一个),这应该就是能过达到C10K+的原因吧。

并且协程相对于回调的方式,在PHPer看来更容易接受吧。另外,框架设计里还使用了middleware、连接池、依赖注入等等比较现代的设计,感觉可以更深入的学习一下。

一个类一个类来看

我们把框架里关于协程的部分拆出来看,下面一个类一个类的分析。与鸟哥博客里那个文章的实现相比,有一些相同的地方,更多的是不同。比如那篇文章里,多个任务放到一个schedule里调度,对于后面的实现就比较繁琐。

这里把框架里的代码抽出来,并进行了一定删减,去掉了与其他业务强相关的东西。比如EventContextAsync等等。实际上Async用于处理MySQL查询的返回值的,框架内部将MySQL的具体操作类封装成了Async的子类,并且yield给调度器来用。

Singal类

Singal类里包含了系统调用所需的信号量。指明了协程在一轮运行之后应该处于的状态。

class Signal
{
    const TASK_SLEEP        = 1;
    const TASK_AWAKE        = 2;
    const TASK_CONTINUE     = 3;
    const TASK_KILLED       = 4;
    const TASK_RUNNING      = 5;
    const TASK_WAIT         = 6;
    const TASK_DONE         = 7;

    public static function isSignal($signal) {
        if(!$signal) {
            return false;
        }

        if (!is_int($signal)) {
            return false;
        }

        if($signal < 1 ) {
            return false;
        }

        if($signal > 7) {
            return false;
        }

        return true;
    }
}

Task类

Task包装了具体协程函数,并提供相应的get set方法。与网上流行的那篇文章(以下简称那文)不同的是,我们的scheduler是内置于Task里的,在run方法里实现具体的调度。

这里我们省略了Context,并且让taskId自增。

class Task
{
    protected $taskId = 0;
    protected $parentId = 0;
    protected $coroutine = null;
    //这里忽略了context 保存的是当前http请求的相关信息,可以通过系统调用的方式操作
    protected $context = null;

    protected $sendValue = null;
    protected $scheduler = null;
    protected $status = 0;

    public function __construct(Generator $coroutine, $taskId = 0, $parentId = 0) {
        $this->coroutine = $coroutine;
        if(isset($GLOBALS['stTaskId']) && $taskId == 0){
            global $stTaskId;
            $taskId = $stTaskId ++;
        }
        $this->taskId = $taskId;
        $this->parentId = $parentId;
        $this->scheduler = new Scheduler($this);
    }

    /**
     * 静态方法调用
     * @param $coroutine
     * @param int $taskId
     * @param int $parentId
     * @return Task
     */
    public static function execute($coroutine, $taskId = 0, $parentId = 0) {
        if ($coroutine instanceof Generator) {
            if(isset($GLOBALS['stTaskId']) && $taskId == 0){
                global $stTaskId;
                $taskId = $stTaskId ++;
            }
            $task = new Task($coroutine, $taskId, $parentId);
            $task->run();
            return $task;
        }
        return $coroutine;
    }

    public function run() {
        while (true) {
            try {
                if ($this->status == Signal::TASK_KILLED){
                    $this->fireTaskDoneEvent();
                    break;
                }
                $this->status = $this->scheduler->schedule();
                //以下几种状态表示信号量,实际上已经从while里跳出来了。如果需要继续的话,会在其他地方重启。
                switch ($this->status) {
                    case Signal::TASK_KILLED:
                    case Signal::TASK_SLEEP:
                    case Signal::TASK_WAIT:
                        return null;
                    case Signal::TASK_DONE:
                        $this->fireTaskDoneEvent();
                        return null;
                }
            } catch (Exception $e) {
                $this->scheduler->throwException($e);
            }
        }
    }

    public function send($value) {
        $this->sendValue = $value;
        return $this->coroutine->send($value);
    }

    public function getTaskId() {
        return $this->taskId;
    }

    public function getContext() {
        return $this->context;
    }

    public function getSendValue() {
        return $this->sendValue;
    }

    public function getResult() {
        return $this->sendValue;
    }

    public function getStatus() {
        return $this->status;
    }

    public function setStatus($signal) {
        $this->status = $signal;
    }

    public function getCoroutine() {
        return $this->coroutine;
    }

    public function setCoroutine(Generator $coroutine) {
        $this->coroutine = $coroutine;
    }

    public function fireTaskDoneEvent() {
        echo "Task done $this->taskId\n";
    }
}

Scheduler类

scheduler类负责:

  1. 获取Task里的协程函数跑完一轮的返回值
  2. 根据返回值的类型采取不同的处理方式,如系统调用、子协程、普通yield值、检查协程栈等等。
  3. 在子协程的调用过程中,负责父子协程的进栈出栈,yield值的传递等等。
class Scheduler
{
    private $task = null;
    private $stack = null;

    public function __construct(Task $task)
    {
        $this->task = $task;
        $this->stack = new SplStack();
    }

    public function schedule()
    {
        $coroutine = $this->task->getCoroutine();

        $value = $coroutine->current();

        $signal = $this->handleSysCall($value);
        if ($signal !== null) return $signal;

        $signal = $this->handleCoroutine($value);
        if ($signal !== null) return $signal;

        $signal = $this->handleYieldValue($value);
        if ($signal !== null) return $signal;

        $signal = $this->handleTaskStack($value);
        if ($signal !== null) return $signal;

        $signal = $this->checkTaskDone($value);
        if ($signal !== null) return $signal;

        return Signal::TASK_DONE;
    }

    public function isStackEmpty()
    {
        return $this->stack->isEmpty();
    }

    public function throwException($e, $isFirstCall = false)
    {
        if ($this->isStackEmpty()) {
            $this->task->getCoroutine()->throw($e);
            return;
        }

        try{
            if ($isFirstCall) {
                $coroutine = $this->task->getCoroutine();
            } else {
                $coroutine = $this->stack->pop();
            }

            $this->task->setCoroutine($coroutine);
            $coroutine->throw($e);

            $this->task->run();
        }catch (Exception $e){
            $this->throwException($e);
        }
    }

    /**
     * 处理系统调用
     * @param $value
     * @return mixed|null
     */
    private function handleSysCall($value)
    {
        if (!($value instanceof SysCall)
            && !is_subclass_of($value, SysCall::class)
        ) {
            return null;
        }
        echo $this->task->getTaskId()."| SYSCALL\n";
        //走系统调用 实际上因为__invoke 走的是 $value($this->task);
        $signal = call_user_func($value, $this->task);
        if (Signal::isSignal($signal)) {
            return $signal;
        }

        return null;
    }

    /**
     * 处理子协程
     * @param $value
     * @return int|null
     */
    private function handleCoroutine($value)
    {
        if (!($value instanceof Generator)) {
            return null;
        }
        echo $this->task->getTaskId()."| COROUTINE\n";
        //获取当前的协程 入栈
        $coroutine = $this->task->getCoroutine();
        $this->stack->push($coroutine);
        //将新的协程设为当前的协程
        $this->task->setCoroutine($value);

        return Signal::TASK_CONTINUE;
    }

    /**
     * 处理协程栈
     * @param $value
     * @return int|null
     */
    private function handleTaskStack($value)
    {
        //能够跑到这里说明当前协程已经跑完了 valid()==false了 需要看下栈里是否还有以前的协程
        if ($this->isStackEmpty()) {
            return null;
        }

        echo $this->task->getTaskId()."| TASKSTACK\n";
        //出栈 设置为当前运行的协程
        $coroutine = $this->stack->pop();
        $this->task->setCoroutine($coroutine);

        //这个sendvalue可能是从刚跑完的协程那里得到的 把它当做send值传给老协程 让他继续跑
        $value = $this->task->getSendValue();
        $this->task->send($value);

        return Signal::TASK_CONTINUE;
    }

    /**
     * 处理普通的yield值
     * @param $value
     * @return int|null
     */
    private function handleYieldValue($value)
    {
        $coroutine = $this->task->getCoroutine();
        if (!$coroutine->valid()) {
            return null;
        }
//        if($this->task->getTaskId() == 2){
//
//        }else{
            echo $this->task->getTaskId()."| YIELD VALUE\n";
//        }
        //如果协程后面没有yield了 这里发出send以后valid就变成false了 并且current变成NULL
        $status = $this->task->send($value);
        return Signal::TASK_CONTINUE;
    }

    private function checkTaskDone($value)
    {
        $coroutine = $this->task->getCoroutine();
        if ($coroutine->valid()) {
            return null;
        }
        echo $this->task->getTaskId()."| CHECKDONE\n";

        return Signal::TASK_DONE;
    }
}

SysCall类

与那文的思路相同,系统调用类一般作为yield后面跟着的值吐给外层的调用方来执行,并且可能返回响应的信号量,标识这个Task是继续运行还是进入等待状态中。

不同的是这里的__invoke入参不需要Scheduler

class SysCall
{
    protected $callback = null;

    public function __construct(\Closure $callback)
    {
        $this->callback = $callback;
    }

    public function __invoke(Task $task)
    {
        return call_user_func($this->callback, $task);
    }
}

组装起来!

基本的组件就是上面的几个类了,下面举一些实际的例子,说明如何利用这几个看似简陋的组件来搞大新闻。

延迟执行任务

function taskSleep($ms)
{
    return new SysCall(function (Task $task) use ($ms) {
        swoole_timer_after($ms, function() use($task){
            $task->send("this is send value in sleep function.");
            $task->run();
        });
        return Signal::TASK_SLEEP;
    });
}

function delay(){
    yield taskSleep(2000);
}

function gen(){
    echo "gen1\n";
    yield 1;
    echo "gen2\n";
    yield 2;
    echo "gen3\n";
    yield 3;
}

//Task::execute(delay(), 1); 亦可
(new Task(delay(), 1))->run();
(new Task(gen(), 2))->run();

/** output
1| SYSCALL
gen1
2| YIELD VALUE
gen2
2| YIELD VALUE
gen3
2| YIELD VALUE
2| CHECKDONE
Task done 2
//2秒以后//
1| CHECKDONE
Task done 1
**/

taskSleep是个系统调用,告诉调度器我要睡眠了(传递给他一个Signal::TASK_SLEEP)。具体说明时候唤醒呢,要等swoole_timer_after2秒以后将它唤醒。

我们这里同时跑了两个任务,从输出来看第一个任务的延时执行,并不会阻塞第二个任务。可以清楚地看到,我们的协程是可以实现多任务并行处理的(当然实际上并不是并行)。

独立堆栈的子协程

function justReturnValue(){
    yield (delay());
    yield 'yield value 2';
}

function gen2(){
    $ret1 = (yield "yield value 1");
    echo "[ret] $ret1\n";
    $ret2 = (yield justReturnValue());
    echo "[ret] $ret2\n";
}

(new Task(gen2(), 1))->run();

/** output
1| YIELD VALUE
[ret] yield value 1
1| COROUTINE
1| COROUTINE
1| SYSCALL
// 2秒以后 //
1| TASKSTACK
1| YIELD VALUE
1| TASKSTACK
[ret] yield value 2
1| CHECKDONE
Task done 1
**/

gen2里有一个子协程justReturnValue的调用,而justReturnValue里也有delay的子协程调用。通过输出可以清楚的看到,父子协程进栈出栈的顺序,以及出栈的协程会将吐出来的值交给原先的协程。

实现一个非阻塞IO的Web服务

参照那文里的实现,我们也可以写一个自己的Web服务。首先还是来说明一下要做什么,以及思路。

直接引用那文的说法:

有一个任务是在套接字上侦听是否有新连接,当有新连接要建立的时候,它创建一个新任务来处理新连接。

Web服务器最难的部分通常是像读数据这样的套接字操作是阻塞的。例如PHP将等待到客户端完成发送为止。对一个Web服务器来说,这有点不太高效。因为服务器在一个时间点上只能处理一个连接。

解决方案是确保在真正对套接字读写之前该套接字已经“准备就绪”. 为了查找哪个套接字已经准备好读或者写了, 可以使用 流选择函数

传统的做法中,创建一个套接字,等待新连接,然后读取、发送、关闭。这些都是阻塞的,会花时间在这些抢占资源的步骤上。如果我们使用协程的方式,可以先将等待操作的任务yield掉,之后结合stream_select方法,选择出可以继续操作的任务将其resume

通俗的说,可以理解为大家一起挤公交车,原先必须一个一个上,但是上车以后要找公交卡,要刷卡或者投币,操作完了以后下一个乘客才能继续。如果使用协程的话,第一个乘客A上车以后,挂起到一边找公交卡,不影响第二个乘客B上车。等到A掏出公交卡以后,直接插队刷卡上车。虽然还是一个一个排队上车,但是找卡的时间里其他乘客不会干等了。

socket的状态

首先定义2个全局数组保存所有用到的socket。再定义2个系统调用将socket设置进数组里,并且返回等待信号量让Task挂起。

$waitingForRead = [];
$waitingForWrite = [];

function waitForRead($socket) {
    return new SysCall(
        function(Task $task) use ($socket) {
            global $waitingForRead;
            if (isset($waitingForRead[(int) $socket])) {
                $waitingForRead[(int) $socket][1][] = $task;
            } else {
                $waitingForRead[(int) $socket] = [$socket, [$task]];
            }
            //设置完了不让他往下走
            return Signal::TASK_WAIT;
        }
    );
}

function waitForWrite($socket) {
    return new SysCall(
        function(Task $task) use ($socket) {
            global $waitingForWrite;
            if (isset($waitingForWrite[(int) $socket])) {
                $waitingForWrite[(int) $socket][1][] = $task;
            } else {
                $waitingForWrite[(int) $socket] = [$socket, [$task]];
            }
            //设置完了不让他往下走
            return Signal::TASK_WAIT;
        }
    );
}

选择可以操作的socket

注册一个任务,不断检查我们的全局数组,直到有socket就绪了,将其对应的任务唤醒。

function ioPoll($timeout) {
    global $waitingForRead;
    global $waitingForWrite;

    $rSocks = [];
    foreach ($waitingForRead as list($socket)) {
        $rSocks[] = $socket;
    }

    $wSocks = [];
    foreach ($waitingForWrite as list($socket)) {
        $wSocks[] = $socket;
    }

    $eSocks = []; // dummy

    //stream_select 方法会直接修改入参 只保留就绪的socket数组
    if (false === stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
        return;
    }

    foreach ($rSocks as $socket) {
        list(, $tasks) = $waitingForRead[(int) $socket];
        unset($waitingForRead[(int) $socket]);

        foreach ($tasks as $task) {
            $task->send("ready for read");
            $task->run();
        }
    }

    foreach ($wSocks as $socket) {
        list(, $tasks) = $waitingForWrite[(int) $socket];
        unset($waitingForWrite[(int) $socket]);

        foreach ($tasks as $task) {
            $task->send("ready for write");
            $task->run();
        }
    }
}

function ioPollTask() {
    global $waitingForRead;
    global $waitingForWrite;
    while (true) {
        if(count($waitingForRead) <=1 && count($waitingForWrite) <=1){
            //如果等待检查的socket只有1个 则用阻塞的方式等待
            ioPoll(null);
        }else{
            //否则设为0超时
            ioPoll(0);
        }
        yield;
    }
}

封装socket

socket封装一下,定义了必须的4个方法。

class CoSocket {
    protected $socket;

    public function __construct($socket) {
        $this->socket = $socket;
    }

    public function accept() {
        //等待本socket就绪
        yield waitForRead($this->socket);
        //就绪以后会继续走到这里 返回给外层一个客户端连接socket
        yield stream_socket_accept($this->socket, 0);
    }

    public function read($size) {
        //等待本socket就绪
        yield waitForRead($this->socket);
        //就绪以后回把读取到的内容 返回给外层
        yield fread($this->socket, $size);
    }

    public function write($string) {
        //等待本socket就绪
        yield waitForWrite($this->socket);
        //就绪以后把响应写给客户端
        fwrite($this->socket, $string);
    }

    public function close() {
        @fclose($this->socket);
    }
}

处理客户端新连接

服务端socket接受到新的连接以后,创建新的任务。下面是这个任务里实际运行的协程。

function handleClient(CoSocket $socket) {
    $data = (yield $socket->read(8192));
    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);

    //响应报文由状态行(HTTP版本、状态码)+HTTP首部字段(响应首部字段、通用首部字段、实体首部字段)组成。
    //空行(CR+LF)分隔首部与报文主体。所以这里留个空行在打印$msg
    $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;

    yield $socket->write($response);
    yield $socket->close();
}

开启服务!

直接创建一个包含server协程的任务,和一个不断刷新stream_select的任务。之后的流程都交给ioPollTask来调度了。

//定义全局的taskId自增用
static $stTaskId = 1;

function server($port){
    echo "Starting server at port $port...\n";
    //这里抛出的异常会被scheduler和task抛来抛去 最后还是到这里catch一下
    try{
        $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
        if (!$socket) throw new Exception($errStr, $errNo);
        //设置为读写非阻塞
        stream_set_blocking($socket, 0);
        $socket = new CoSocket($socket);
        while (true) {
            $clientSocket = (yield $socket->accept());
            $clientCoSocket = new CoSocket($clientSocket);
            //为新的链接创建Task
            Task::execute(handleClient($clientCoSocket));
        }
    }catch (Exception $e){
        echo $e->getMessage();
    }
}

//创建服务端socket的task 1
Task::execute(server(8000));
//不断刷新socket_select的task 2
Task::execute(ioPollTask());

运行效果

开启服务后,我们先直接用curl访问,观测一下得到的结果。

➜  ~ curl -d "a=123&b=456"  http://localhost:8000
Received following request:

POST / HTTP/1.1
Host: localhost:8000
User-Agent: curl/7.51.0
Accept: */*
Content-Length: 11
Content-Type: application/x-www-form-urlencoded

a=123&b=456

可以看到服务端吐出了我们发送给他的信息,包括HTTP请求行、首部字段和正文。如果我们在浏览器里访问的话,正文内容会丰富许多,会有Cookie,UA等等,如下:

Received following request:

GET / HTTP/1.1
Host: localhost:8000
Connection: keep-alive
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
Accept-Encoding: gzip, deflate, sdch, br
Accept-Language: zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4
Cookie: Phpstorm-f86ac615=34137ba0-5113-4922-b809-b6fa20dbf937

不足的地方

zan framework里的协程调度,并没有采用任务队列的方式。可能是因为他只是针对单独的http或者tcp请求来设计的吧,一般是链式调用。由于这个原因,所以没法设置具体某个任务的执行顺序。当然实际效果跟那文里是相同的,说到底还是由ioPollTask来驱动。

小结

我们通过几个例子加深了对PHP中协程用法的理解。需要注意的是,在协程中(本文构造的这种结构)我们要避免使用死循环,除非循环里yield的结果可以将其挂起并出让控制权给其他协程。

比如上文的Web服务器中,因为有不能主动挂起的ioPollTask,所以不能实现在响应时延迟几秒的效果。因为即使使用了taskSleep这种系统调用,也会因为ioPollTask死循环导致不能获取控制权无法执行。

回到顶部