laravel 使用workerman实现websocket连接

首先现在根目录执行安装workerman

composer require workerman/workerman

composer install/update

创建自定义 artisan 命令来启动 workerman 服务

php artisan make:command WorkermanCommand

在 app/Console/Commands/ 目录下生成 WorkermanCommand.php 文件。

代码如下:

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Cache;
use Workerman\Lib\Timer;
use Workerman\Worker;

class WorkermanCommand extends Command
{

    private $server;
    // -d 是否以debug方式运行
    protected $signature = 'workerman {action} {-d?}';
    protected $description = 'Start a Workerman server.';
    public function __construct() {
        parent::__construct();
    }

    /** * Execute the console command. * * @return mixed */
    public function handle() {
        global $argv;
        $arg = $this->argument('action');
        $argv[1] = $argv[2];
        $argv[2] = isset($argv[3]) ? "-{$argv[3]}" : '';

        switch ($arg) {
            case 'start':
                $this->start();
                break;
            case 'stop':
                $this->stop();
                break;
            case 'restart':
                $this->restart();
                break;
            case 'reload':
                $this->reload();
                break;
        }
    }

    private function start() {
        // 创建一个Worker监听8383端口,不使用任何应用层协议
        $this->server = new Worker("websocket://0.0.0.0:8383");
        // 启动1个进程对外提供服务,倘若要实现客户端与服务端交互,则必须设置为1
        $this->server->count = 1;

        //处理客户端传来的消息
        $this->server->onMessage = function ($connection, $data)
        {
            $connection->lastMessageTime = time();
            $data = json_decode($data, true);
            if (!isset($connection->openid)) {
                //这里你可将用户的唯一标识绑定这个连接,我这里用的是openid
                ....

                $this->server->openidConnetcions[$connection->openid] = $connection;
                $connection->send("Accepts success");
            }

        };

        //启动worker进程,建立一个内部端口,方便内部系统推送数据
        $this->server->onWorkerStart = function ($worker)
        {
//             进程启动后设置一个每秒运行一次的定时器,心跳(长连接则需要)
            Timer::add(1, function()use($worker){
                $time_now = time();
                foreach($worker->connections as $connection) {
                    // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
                    if (empty($connection->lastMessageTime)) {
                        $connection->lastMessageTime = $time_now;
                        continue;
                    }
                    // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接, HEARTBEAT_TIME是心跳时间,是个常量
                    if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
                        $connection->close();
                    }
                }
            });

            //建立内部连接, 后端内部可向此连接发送信息
          $inner_text_worker = new Worker('text://0.0.0.0:5678');
          $inner_text_worker->onMessage = function ($connection, $buffer) {
              $data = json_decode($buffer, true);
              $openid = $data['openid'];

              $ret = $this->sendMessageByOpenid($openid, $buffer);
              $connection->send($ret ? 'ok' : 'fail');
          };
          $inner_text_worker->listen();
        };

        $this->server->onClose = function ($connection)
        {
            echo "connection closed from ip {$connection->getRemoteIp()}\n";
            if (isset($connection->openid)) {
                unset($this->server->openidConnetcions[$connection->openid]);
                echo "connection closed from ip {$connection->getRemoteIp()}\n";
            }
        };

        $this->server->onConnect = function ($connection)
        {
            echo "连接成功";
        };

        Worker::runAll();
    }

    private function stop() {
        $this->server = new Worker('websocket://0.0.0.0:8383');
        // 设置此实例收到reload信号后是否reload重启
        $this->server->reloadable = false;
        $this->server->onWorkerStop = function($worker)
        {
            echo "Worker stop...\n";
        };
        // 运行worker
        Worker::runAll();
    }
    private function restart() {
        $this->server = new Worker('websocket://0.0.0.0:8383');
        // 设置此实例收到reload信号后是否reload重启
        $this->server->reloadable = true;
        $this->server->onWorkerStart = function($worker)
        {
            echo "Worker restart...\n";
        };
        // 运行worker
        Worker::runAll();
    }
    private function reload() {
        $this->server = new Worker('websocket://0.0.0.0:8383');
        // 设置此实例收到reload信号后是否reload重启
        $this->server->reloadable = false;
        $this->server->onWorkerStart = function($worker)
        {
            echo "Worker reload...\n";
        };
        // 运行worker
        Worker::runAll();
    }

 
    //接受内部推送数据后向指定用户推送消息
    private function sendMessageByOpenid($openid, $message)
    {
        if (isset($this->server->openidConnetcions[$openid])) {
            $connection = $this->server->openidConnetcions[$openid];
            $connection->send($message);
            return true;
        }
        return false;
    }

}

后端推送消息到前端的代码

// 建立socket连接到内部推送端口
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// 推送的数据,包含uid字段,表示是给这个uid推送
$data = array('uid'=>'uid1', 'percent'=>'88%');
// 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
fwrite($client, json_encode($data)."\n");
// 读取推送结果
echo fread($client, 8192);

最后可执行命令来启动workerman服务

php artisan workerman start d

启动效果如下:

这样websocket就可以连接啦

%1 $ S

发表评论

电子邮件地址不会被公开。 必填项已用*标注