BeWithYou

胡搞的技术博客

  1. 首页
  2. PHP
  3. PHP中使用多进程处理数据

PHP中使用多进程处理数据


做一些数据处理时,我们常用PHP写cli任务在后台跑。由于单进程单线程,导致速度较慢。有时候我们会把任务拆分成N多个子任务,使用多个进程同时处理。

比如按id范围处理数据,可以把beginend的id当做入参传入脚本,同时开启若干个进程处理。这种方式简单直接,但是不容易观测脚本执行情况。

这个时候,我们也可以使用pcntl_*系列的函数,为主进程开启若干个子进程来更 优雅 的处理任务。

class Task
{
    const PROCESS_NUM = 3;

    public function run(){
        $pids = array();
        for($i=0; $i<self::PROCESS_NUM; $i++){
            //fork子进程出来
            $pid = pcntl_fork();
            if($pid == 0){
                $this->_workProcess();
                return;
            }else{

                $pids[] = $pid;
            }
        }

        for($i=0; $i<self::PROCESS_NUM; $i++){
            //等待子进程退出
        pcntl_waitpid($pids[$i], $status);
       }
    }

    private function _workProcess()
    {
        $selfPid = posix_getpid();
        echo "task $selfPid start\n";
        //TODO 业务逻辑
    }
}

通过这种方式,可以fork出若干个子进程。我们可以继续使用比如命名管道等进程间通信的方式,从父进程向子进程发送任务需求,收取任务反馈等。

PHP中可以用posix_mkfifo函数创建命名管道,之后写入和读取就跟普通文件一样啦。

$fifo = "/tmp/pipe_".posix_getpid();
if(!file_exists($fifo)){
    posix_mkfifo($fifo, 0666);
}

//写入
$w = fopen($fifo, "w");
fwrite($w, $data);
fclose($w);

//读取
$r = fopen($fifo, "r");
$data = fread($r, 1024);//读取1024Byte
fclose($r);

//以上只是演示API,实际中并不能按照这个顺序写。
//需要注意的是,默认fopen后的是阻塞的读取方式,可以使用下面的方式改成非阻塞,让fread等不用等待有数据就可以返回
stream_set_blocking($w, false);

另外需要注意的是,fread需要制定读取的数据size,如果读不到那么多,就会阻塞住。不过我们也可以利用这一点定制传输协议,将消息内容封装成固定大小的二进制数据放入管道中。如下:

$bData = pack("A1024", $data);
$data = unpack("A1024", $bData)[1]; 
回到顶部