自定义事件监听(Server Events)

在 ThinkPHP6 中,配合封装的客户端,服务端的职责是:启动一个 TCP 监听端口,接收请求、按照约定协议(如 EOF)拆包、解析 JSON-RPC 2.0 协议、动态调用对应的服务类方法,最后将结果打包返回。

如果使用 think-swoole 扩展,最优雅的方式是利用其提供的自定义事件监听(Server Events)。以下是完整的服务端实现方案。


1. 修改 config/swoole.php 开启 TCP 监听

首先,需要让 think-swoole 在启动 Web 服务的顺便,多监听一个 TCP 端口专门用于 RPC 通信。

修改 config/swoole.php,在 server.listen 数组中增加一个 TCP 监听:

php

<?php
// config/swoole.php

return [
    'server' => [
        'host'      => '0.0.0.0',
        'port'      => 80, // Web 端口
        'mode'      => SWOOLE_PROCESS,
        'sock_type' => SWOOLE_SOCK_TCP,
        'options'   => [
            'worker_num' => swoole_cpu_num(),
            // 可以根据需要在这里设置全局参数
        ],
        // 🌟 核心:多端口监听配置
        'listen' => [
            [
                'host'      => '0.0.0.0',
                'port'      => 9502, // RPC 专门端口
                'sock_type' => SWOOLE_SOCK_TCP,
                'options'   => [
                    // 必须与客户端的拆包协议完全一致
                    'open_eof_check' => true,
                    'package_eof'    => "\r\n",
                ],
            ],
        ],
    ],
    // 其他配置保持默认...
];

请谨慎使用此类代码。


2. 创建 RPC 业务服务类(供远程调用)

在服务端创建一个实际处理业务的类。例如在 app\rpc\service 目录下创建 UserService.php

php

<?php
namespace app\rpc\service;

class UserService
{
    /**
     * 供客户端远程调用的方法
     */
    public function getUserInfo(int $userId): array
    {
        // 模拟数据库查询
        return [
            'id'       => $userId,
            'nickname' => '张三',
            'email'    => 'zhangsan@example.com',
            'source'   => 'RPC Server'
        ];
    }
}

请谨慎使用此类代码。


3. 创建 RPC 协议解析器

我们需要一个核心处理器,负责将接收到的原始字符串解析为标准的 JSON-RPC 响应。

app\common\rpc 目录下创建 RpcDispatcher.php

php

<?php
namespace app\common\rpc;

use Throwable;

class RpcDispatcher
{
    /**
     * 核心调度方法
     * @param string $rawData 客户端发来的原始数据
     * @return string 返回打包好的 JSON 字符串
     */
    public function dispatch(string $rawData): string
    {
        $responseId = null;
        try {
            // 1. 解析 JSON 数据
            $request = json_decode(trim($rawData), true);
            if (json_last_error() !== JSON_ERROR_NONE) {
                return $this->error(-32700, 'Parse error', $responseId);
            }

            // 2. 校验协议基础字段
            if (!isset($request['jsonrpc']) || $request['jsonrpc'] !== '2.0' || !isset($request['method'])) {
                return $this->error(-32600, 'Invalid Request', $responseId);
            }

            $responseId = $request['id'] ?? null;
            $methodStr  = $request['method']; // 格式如: "app\rpc\service\UserService@getUserInfo"
            $params     = $request['params'] ?? [];

            // 3. 解析类名和方法名
            if (strpos($methodStr, '@') === false) {
                return $this->error(-32601, 'Method not found (Format error)', $responseId);
            }

            list($className, $methodName) = explode('@', $methodStr);

            // 4. 检查类和方法是否存在
            if (!class_exists($className)) {
                return $this->error(-32601, "Class {$className} not found", $responseId);
            }

            // 利用 TP6 的容器单例实例化类(支持依赖注入)
            $instance = app($className);

            if (!method_exists($instance, $methodName)) {
                return $this->error(-32601, "Method {$methodName} not found", $responseId);
            }

            // 5. 动态调用业务方法
            // 使用 call_user_func_array 传入数组参数
            $result = call_user_func_array([$instance, $methodName], $params);

            // 6. 返回标准成功响应
            return $this->success($result, $responseId);

        } catch (Throwable $e) {
            // 捕获业务代码运行中的任何异常
            return $this->error($e->getCode() ?: -32603, $e->getMessage(), $responseId);
        }
    }

    private function success($result, $id): string
    {
        return json_encode([
            'jsonrpc' => '2.0',
            'result'  => $result,
            'id'      => $id
        ]) . "\r\n"; // 必须追加结束符
    }

    private function error(int $code, string $message, $id): string
    {
        return json_encode([
            'jsonrpc' => '2.0',
            'error'   => [
                'code'    => $code,
                'message' => $message
            ],
            'id'      => $id
        ]) . "\r\n"; // 必须追加结束符
    }
}

请谨慎使用此类代码。


4. 绑定 Swoole 事件监听器

think-swoole 会将 Swoole 底层的 Receive(接收到 TCP 数据)事件转换为 TP6 的事件系统。我们只需要定义一个监听器。

app\listener 目录下创建 RpcReceiveListener.php

php

<?php
namespace app\listener;

use app\common\rpc\RpcDispatcher;
use Swoole\Server;

class RpcReceiveListener
{
    private RpcDispatcher $dispatcher;

    public function __construct(RpcDispatcher $dispatcher)
    {
        $this->dispatcher = $dispatcher;
    }

    /**
     * 事件触发入口
     * @param array $event 包含 [server, fd, reactorId, data]
     */
    public function handle($event)
    {
        /** @var Server $server */
        $server    = $event[0];
        $fd        = $event[1];
        $reactorId = $event[2];
        $rawData   = $event[3]; // 已经由 Swoole 依据 \r\n 拆包好的完整单条请求

        // 获取当前连接的信息,确认是不是从 9502 端口进来的 RPC 请求
        $connectionInfo = $server->getClientInfo($fd);
        
        // 🌟 核心判断:如果不是来自配置的 9502 RPC 端口,则不处理(防止和 Web 的长连接冲突)
        if ($connectionInfo && $connectionInfo['server_port'] === 9502) {
            
            // 送入调度器解析并执行
            $responsePayload = $this->dispatcher->dispatch($rawData);
            
            // 将结果直接发回客户端
            $server->send($fd, $responsePayload);
        }
    }
}

请谨慎使用此类代码。

最后,在 app\event.php 中将该监听器注册到 swoole.receive 事件中:

php

<?php
// app/event.php

return [
    'bind'    => [],
    'listen'  => [
        // 监听 think-swoole 的 TCP 接收事件
        'swoole.receive' => [
            \app\listener\RpcReceiveListener::class
        ],
    ],
    'subscribe' => [],
];

请谨慎使用此类代码。


5. 链路闭环测试验证

  1. 启动服务端:在项目根目录下执行 php think swoole。此时控制台会显示同时监听了两个端口(如 809502)。

  2. 客户端调用:使用你之前封装的客户端(同步或异步皆可)发起请求:

    php

    $client = new \app\common\rpc\JsonRpcClient();
    $res = $client->withService('app\rpc\service\UserService')->getUserInfo(888);
    dump($res);
    

    请谨慎使用此类代码。

  3. 执行结果:你会完美拿到服务端 UserService 返回的带有 'source' => 'RPC Server' 的数组。

💡 生产环境避坑指南

  • 命名空间安全防范:目前的 RpcDispatcher 允许客户端传入任意类名。在生产环境中非常危险! 建议在 RpcDispatcher 中加入白名单校验,比如 if (strpos($className, 'app\rpc\service\\') !== 0),只允许调用特定目录下的 RPC 专用类,防止客户端恶意调用系统内部敏感类(如 think\Db)。

  • 跨服务类的自动映射:如果客户端和服务端是两个独立项目,客户端可能并不想写长长的 app\rpc\service\UserService,你可以通过配置文件做一个简称映射,如 'user' => 'app\rpc\service\UserService'