tp6 swoole rpc jsonrpc客户端封装

在 ThinkPHP6 中使用 Swoole 客户端封装 JSON-RPC 客户端,最核心的高效做法是结合 Swoole 的协程 TCP 客户端 (Swoole\Coroutine\Client) 与 PHP 的魔术方法 (__call),从而实现像调用本地方法一样调用远程服务(RPC 代理)。 [1]

以下是完整的封装方案,包含客户端核心类、TP6 服务注册以及业务调用示例。

1. 核心 JSON-RPC 客户端封装

app\common\rpc 目录下创建 JsonRpcClient.php(如目录不存在请自行创建):

php

<?php
namespace app\common\rpc;

use Swoole\Coroutine\Client as CoClient;
use RuntimeException;

class JsonRpcClient
{
    private string $host;
    private int $port;
    private float $timeout;
    private string $serviceName;

    /**
     * @param string $host 服务端IP
     * @param int $port 服务端端口
     * @param string $serviceName 调用的服务类名/命名空间
     * @param float $timeout 超时时间
     */
    public function __construct(string $host, int $port, string $serviceName = '', float $timeout = 2.0)
    {
        $this->host = $host;
        $this->port = $port;
        $this->serviceName = $serviceName;
        $this->timeout = $timeout;
    }

    /**
     * 链式调用:设置当前调用的服务接口
     */
    public function withService(string $serviceName): self
    {
        $this->serviceName = $serviceName;
        return $this;
    }

    /**
     * 魔术方法:动态捕获方法调用并转化为 JSON-RPC 请求
     */
    public function __call(string $method, array $params)
    {
        if (empty($this->serviceName)) {
            throw new RuntimeException("RPC service name is not defined.");
        }

        // 1. 构造标准的 JSON-RPC 2.0 请求协议包
        $requestId = uniqid('rpc_', true);
        $requestData = [
            'jsonrpc' => '2.0',
            'method'  => $this->serviceName . '@' . $method, // 支持服务端定位类和方法
            'params'  => $params,
            'id'      => $requestId,
        ];

        // 2. 序列化数据并追加 EOF 结束符(根据服务端配置,这里以 \r\n 为例)
        $payload = json_encode($requestData) . "\r\n";

        // 3. 使用 Swoole 协程客户端发送并接收数据
        $client = new CoClient(SWOOLE_SOCK_TCP);
        
        if (!$client->connect($this->host, $this->port, $this->timeout)) {
            throw new RuntimeException(sprintf("Connect to RPC server failed: %s (code: %s)", $client->errMsg, $client->errCode));
        }

        if (!$client->send($payload)) {
            throw new RuntimeException("Send RPC request failed.");
        }

        // 接收响应(服务端通常也会开启 package_eof_check)
        $response = $client->recv($this->timeout);
        $client->close();

        if ($response === false || $response === '') {
            throw new RuntimeException("Receive RPC response timeout or empty.");
        }

        // 4. 解析响应数据
        $responseData = json_decode(trim($response), true);
        if (json_last_error() !== JSON_ERROR_NONE) {
            throw new RuntimeException("Failed to parse JSON-RPC response.");
        }

        // 5. 校验结果与处理错误
        if (isset($responseData['error'])) {
            $error = $responseData['error'];
            throw new RuntimeException($error['message'] ?? 'Unknown RPC Error', $error['code'] ?? 500);
        }

        return $responseData['result'] ?? null;
    }
}

请谨慎使用此类代码。

2. 将 RPC 客户端注入 ThinkPHP6 容器

为了方便在控制器或服务中一键调用,可以通过 TP6 的服务或配置文件将其容器化。

新建或修改配置文件 config/rpc.php

php

<?php
return [
    'default' => [
        'host'    => '127.0.0.1',
        'port'    => 9502,
        'timeout' => 3.0,
    ],
];

请谨慎使用此类代码。

app\service.php 中注册全局服务,或者直接在控制器的构造函数中实例化:

php

<?php
namespace app\controller;

use app\BaseController;
use app\common\rpc\JsonRpcClient;

class Order extends BaseController
{
    private JsonRpcClient $rpcClient;

    public function __construct(\think\App $app)
    {
        parent::__construct($app);
        
        // 从配置中读取并初始化 RPC 客户端
        $config = config('rpc.default');
        $this->rpcClient = new JsonRpcClient($config['host'], $config['port']);
    }

    public function create()
    {
        // 假设你要调用微服务中的 UserService 类的 getUserInfo 方法
        // 必须在 Swoole 协程上下文中执行
        try {
            $userId = 1001;
            
            // 链式指定服务名,并直接调用远程方法
            $userInfo = $this->rpcClient
                ->withService('app\rpc\service\UserService')
                ->getUserInfo($userId);

            return json([
                'code' => 200,
                'data' => $userInfo
            ]);
            
        } catch (\Exception $e) {
            return json([
                'code' => 500,
                'msg'  => $e->getMessage()
            ]);
        }
    }
}

请谨慎使用此类代码。

3. 封装要点说明与生产环境优化

  • 协程环境限制Swoole\Coroutine\Client 必须运行在 Swoole 协程上下文(如 think-swoole 运行的环境下)。如果是用传统 FPM 模式做客户端调用,请将 Swoole\Coroutine\Client 替换为原生的 fsockopen 或基于流的 socket 客户端。 [1, 2]

  • 粘包处理(协议边界):示例代码中使用的是简单的 \r\n 换行符(EOF 结束符)来拆包。生产环境下建议在 config/swoole.php 服务端和本客户端统一配置 固定包头协议open_length_check),以防大数据传输时产生粘包、断包。

  • 连接池优化:高并发场景下,每次请求都 new CoClient 建立长连接并在结束后 close 仍会产生频繁握手开销。建议配合 think-swoole 自带的 ConnectionPool(连接池) 封装 RPC 连接,维持 TCP 长连接复用。

若你当前的微服务架构对自动生成客户端接口代码(RPC Interface)有更高要求,可以考虑参考官方 think-swoole rpc:interface 的生成逻辑进行自动化解耦绑定。

由于原生 PHP 客户端(如基于 FPM 环境)不支持 Swoole 的协程连接池和协程客户端,我们需要将网络通信底层完全替换为 PHP 原生的流阻塞/非阻塞 Socket 客户端(使用 stream_socket_clientfsockopen)。

为了保证代码的高内聚,我们可以在客户端内部保持 __call 的魔术方法逻辑不变,仅将底层的发送和接收网络逻辑重构。

以下是基于原生 PHP Stream 流(比 fsockopen 更好控制超时和换行)的 JSON-RPC 客户端完整实现:

1. 原生 PHP Socket 客户端封装

app\common\rpc 目录下创建或修改 JsonRpcClient.php

php

<?php
namespace app\common\rpc;

use RuntimeException;

class JsonRpcClient
{
    private string $host;
    private int $port;
    private float $timeout;
    private string $serviceName;

    /**
     * @param string $host 服务端IP
     * @param int $port 服务端端口
     * @param string $serviceName 调用的服务类名/命名空间
     * @param float $timeout 超时时间(秒)
     */
    public function __construct(string $host = '', int $port = 0, string $serviceName = '', float $timeout = 3.0)
    {
        // 优先读取 TP6 配置,若未传入则读默认配置
        $config = config('rpc.default', []);
        $this->host = !empty($host) ? $host : ($config['host'] ?? '127.0.0.1');
        $this->port = !empty($port) ? $port : ($config['port'] ?? 9502);
        $this->timeout = !empty($timeout) ? $timeout : ($config['timeout'] ?? 3.0);
        $this->serviceName = $serviceName;
    }

    /**
     * 链式调用:设置当前调用的服务接口
     */
    public function withService(string $serviceName): self
    {
        $this->serviceName = $serviceName;
        return $this;
    }

    /**
     * 魔术方法:动态捕获方法调用并转化为 JSON-RPC 请求
     */
    public function __call(string $method, array $params)
    {
        if (empty($this->serviceName)) {
            throw new RuntimeException("RPC service name is not defined.");
        }

        // 1. 构造标准的 JSON-RPC 2.0 请求协议包
        $requestId = uniqid('rpc_', true);
        $requestData = [
            'jsonrpc' => '2.0',
            'method'  => $this->serviceName . '@' . $method,
            'params'  => $params,
            'id'      => $requestId,
        ];

        // 2. 序列化数据并追加 EOF 结束符(与 Swoole 服务端对齐)
        $payload = json_encode($requestData) . "\r\n";

        // 3. 使用原生 stream_socket_client 建立 TCP 连接(替代 fsockopen)
        $remoteSocket = "tcp://{$this->host}:{$this->port}";
        $errno = 0;
        $errstr = '';
        
        // 创建流连接
        $stream = @stream_socket_client(
            $remoteSocket, 
            $errno, 
            $errstr, 
            $this->timeout, 
            STREAM_CLIENT_CONNECT
        );

        if (!$stream) {
            throw new RuntimeException(sprintf("Connect to RPC server [%s] failed: %s (code: %s)", $remoteSocket, $errstr, $errno));
        }

        // 设置流的读取超时时间(秒和微秒)
        $seconds = floor($this->timeout);
        $microseconds = ($this->timeout - $seconds) * 1000000;
        stream_set_timeout($stream, (int)$seconds, (int)$microseconds);

        try {
            // 4. 发送数据
            $fwrite = fwrite($stream, $payload);
            if ($fwrite === false) {
                throw new RuntimeException("Send RPC request failed.");
            }

            // 5. 接收响应(处理 EOF 协议:读取到 "\r\n" 为止)
            $response = '';
            while (!feof($stream)) {
                // stream_get_line 比 fgets 性能更好,且能自动裁剪结束符
                $buffer = stream_get_line($stream, 8192, "\r\n");
                
                if ($buffer === false) {
                    // 检查是否是因为超时导致的读取结束
                    $info = stream_get_meta_data($stream);
                    if ($info['timed_out']) {
                        throw new RuntimeException("Receive RPC response timeout.");
                    }
                    break;
                }
                
                $response .= $buffer;
                break; // 由于 RPC 是一问一答且按 \r\n 拆包,读取到一行即可退出
            }

        } finally {
            // 确保在任何情况下都会关闭连接(短连接模式)
            fclose($stream);
        }

        if (empty($response)) {
            throw new RuntimeException("Receive RPC response empty.");
        }

        // 6. 解析响应数据
        $responseData = json_decode($response, true);
        if (json_last_error() !== JSON_ERROR_NONE) {
            throw new RuntimeException("Failed to parse JSON-RPC response. Raw: " . $response);
        }

        // 7. 校验结果与处理错误
        if (isset($responseData['error'])) {
            $error = $responseData['error'];
            throw new RuntimeException($error['message'] ?? 'Unknown RPC Error', $error['code'] ?? 500);
        }

        return $responseData['result'] ?? null;
    }
}

请谨慎使用此类代码。

2. 配置文件 config/rpc.php

在 TP6 的配置目录下设置服务端地址:

php

<?php
return [
    'default' => [
        'host'    => '127.0.0.1', // Swoole 服务端 IP
        'port'    => 9502,        // Swoole 服务端 端口
        'timeout' => 3.0,         // 建立连接与读取数据的超时时间
    ],
];

请谨慎使用此类代码。

3. 控制器层直接调用

原生模式下,该客户端无需处于 Swoole 协程环境中,在传统的 Apache / php-fpm 模式下即可完美运行:

php

<?php
namespace app\controller;

use app\BaseController;
use app\common\rpc\JsonRpcClient;

class User extends BaseController
{
    /**
     * 通过 TP6 容器自动实例化客户端
     */
    public function profile(JsonRpcClient $rpcClient)
    {
        try {
            // 像本地方法一样直接跨网络调用 Swoole 服务端的方法
            $userInfo = $rpcClient
                ->withService('app\rpc\service\UserService')
                ->getUserInfo(1001);

            return json([
                'status' => 'success',
                'data'   => $userInfo
            ]);

        } catch (\Exception $e) {
            return json([
                'status' => 'error',
                'msg'    => $e->getMessage()
            ], 500);
        }
    }
}

请谨慎使用此类代码。

💡 为什么选择 stream_socket_client 而不是 fsockopen

  1. 控制粒度更细stream_socket_client 允许使用 STREAM_CLIENT_ASYNC_CONNECT(异步连接)或配合 stream_select 进行多路复用。

  2. 错误处理更优雅fsockopen 会直接触发 PHP 的 Notice/Warning 级别错误,而 stream_socket_client可以通过 @ 抑制符完美转换为自定义的 RuntimeException

  3. 配合 stream_get_line 更安全:相比于 fgets 会把换行符 \r\n 一并读出,stream_get_line($stream, 8192, "\r\n") 可以在读取的同时直接将约定的 EOF 结束符裁剪掉,避免了后期多余的 trim() 动作。