利用 think-swoole 扩展开发 RPC 服务并监听独立端口

在 ThinkPHP 6.1 中,利用 think-swoole 扩展开发 RPC 服务并监听独立端口,最标准的做法是通过自定义服务类在 Swoole 启动时调用 addlistener

以下是完整的实现步骤和代码示例:

1. 修改 config/swoole.php 配置

主服务依然保持原样(通常是 9501 端口用于 HTTP)。我们在配置文件的 rpc 项中(或自定义位置)添加 RPC 的配置,以便后续代码读取: [1]

php

return [
    'server' => [
        'host' => '0.0.0.0',
        'port' => 9501, // 主 HTTP 端口
        // ... 其他配置
    ],
    // 自定义 RPC 监听配置
    'rpc_server' => [
        'host' => '0.0.0.0',
        'port' => 9502, // 独立的 RPC 端口
    ],
];

请谨慎使用此类代码。

2. 创建自定义服务监听 RPC

app/service 目录下创建一个自定义服务类(例如 RpcService.php),利用 ThinkPHP 的服务注册机制,在 Swoole 启动前动态添加监听端口。

php

<?php
namespace app\service;

use think\Service;
use think\swoole\Manager;
use Swoole\Server as SwooleServer;

class RpcService extends Service
{
    public function boot()
    {
        // 确保是在 CLI 模式下且引入了 think-swoole
        if ($this->app->runningInConsole() && class_exists(Manager::class)) {
            
            // 监听 swoole 启动事件或在适当的时机获取 Server 实例
            $this->app->event->listen('swoole.init', function () {
                /** @var Manager $manager */
                $manager = $this->app->make(Manager::class);
                $server  = $manager->getServer();
                
                // 获取配置
                $config = $this->app->config->get('swoole.rpc_server');
                
                // 动态添加 RPC 监听端口 (TCP 协议)
                $rpcPort = $server->addListener($config['host'], $config['port'], SWOOLE_SOCK_TCP);
                
                // 设置 RPC 端口的独立参数(如固定包头协议,防止 TCP 粘包)
                $rpcPort->set([
                    'open_length_check'     => true,
                    'package_length_type'   => 'N',
                    'package_length_offset' => 0,
                    'package_body_offset'   => 4,
                    'package_max_length'    => 2048000,
                ]);
                
                // 绑定监听事件
                $rpcPort->on('receive', function (SwooleServer $serv, int $fd, int $reactorId, string $data) {
                    $this->handleRpcRequest($serv, $fd, $data);
                });
            });
        }
    }

    /**
     * 处理 RPC 请求核心逻辑
     */
    protected function handleRpcRequest(SwooleServer $serv, int $fd, string $data)
    {
        try {
            // 1. 解包(去掉前4字节的长度头)
            $body = substr($data, 4);
            $request = json_decode($body, true);
            
            // 2. 解析请求参数 (示例格式:['class' => 'User', 'method' => 'getInfo', 'args' => []])
            $class  = $request['class'] ?? '';
            $method = $request['method'] ?? '';
            $args   = $request['args'] ?? [];
            
            // 3. 实例化对应的 RPC 服务类并调用方法
            // 注意:建议限制可调用的命名空间,防止越权
            $className = "\\app\\rpc\\" . ucfirst($class);
            if (!class_exists($className)) {
                throw new \Exception("Service {$class} not found");
            }
            
            $serviceInstance = $this->app->make($className);
            if (!method_exists($serviceInstance, $method)) {
                throw new \Exception("Method {$method} not found");
            }
            
            // 执行业务逻辑
            $result = call_user_func_array([$serviceInstance, $method], $args);
            $response = ['status' => 200, 'data' => $result, 'msg' => 'success'];
            
        } catch (\Throwable $e) {
            $response = ['status' => 500, 'data' => null, 'msg' => $e->getMessage()];
        }
        
        // 4. 响应客户端(打包:前4字节写入数据长度)
        $sendData = json_encode($response);
        $package  = pack('N', strlen($sendData)) . $sendData;
        $serv->send($fd, $package);
    }
}

请谨慎使用此类代码。

3. 注册服务

app/service.php 配置文件中,将刚才编写的服务类注册进去:

php

return [
    // ... 其他系统服务
    \app\service\RpcService::class,
];

请谨慎使用此类代码。

4. 编写 RPC 业务类

创建 app/rpc/User.php 作为具体的 RPC 接口实现:

php

<?php
namespace app\rpc;

class User
{
    public function getInfo(int $uid)
    {
        // 正常的 TP6.1 数据库操作或业务逻辑
        return [
            'uid'  => $uid,
            'name' => '张三',
            'time' => date('Y-m-d H:i:s')
        ];
    }
}

请谨慎使用此类代码。

5. 启动与测试

  1. 使用命令启动服务:php think swoole。此时系统会同时监听 9501 (HTTP) 和 9502 (RPC/TCP)。

  2. 客户端(可以是另一个 TP 项目或其他语言)通过 TCP 连接 9502 端口,并按照 pack('N', ...) 的协议格式发送 {"class":"User","method":"getInfo","args":[1001]} 即可获取返回数据。

在自定义 TCP 协议时,核心要解决的问题是粘包和丢包。最安全、最高效的方法是采用 “固定包头 + 包体” 的格式。

在上面的示例中,我们使用了 pack('N', strlen($data)),也就是前 4 个字节存包体长度(大端字节序),后面紧跟 JSON 字符串

为了配合前面服务端的配置,以下是为您编写的配套客户端(Client)自定义协议打包与解包的完整实现,可以直接用于其他 ThinkPHP 项目或独立 PHP 脚本中。

1. 编写自定义协议客户端类

您可以将此文件放入客户端项目的 app/common/RpcClient.php 或任意工具类目录中:

php

<?php
namespace app\common;

class RpcClient
{
    protected $host;
    protected $port;
    protected $timeout;

    public function __construct(string $host = '127.0.0.1', int $port = 9502, float $timeout = 2.5)
    {
        $this->host = $host;
        $this->port = $port;
        $this->timeout = $timeout;
    }

    /**
     * 发送 RPC 请求
     * @param string $class 类名
     * @param string $method 方法名
     * @param array $args 参数
     * @return array
     */
    public function call(string $class, string $method, array $args = [])
    {
        // 1. 创建 TCP 连接
        $fp = stream_socket_client(
            "tcp://{$this->host}:{$this->port}", 
            $errno, 
            $errstr, 
            $this->timeout
        );

        if (!$fp) {
            throw new \Exception("RPC 远程连接失败: {$errstr} ({$errno})");
        }

        // 设置读写超时
        stream_set_timeout($fp, (int)$this->timeout, ($this->timeout - (int)$this->timeout) * 1000000);

        try {
            // 2. 按照【自定义协议】组装请求数据
            $body = json_encode([
                'class'  => $class,
                'method' => $method,
                'args'   => $args
            ]);
            
            // pack('N', ...) 将长度转为 4 字节无符号二进制大端序
            $payload = pack('N', strlen($body)) . $body;

            // 3. 发送数据
            fwrite($fp, $payload);

            // 4. 读取响应包头(前 4 个字节,获取返回数据的长度)
            $header = fread($fp, 4);
            if (strlen($header) < 4) {
                throw new \Exception("RPC 读取响应包头失败");
            }

            // unpack('N', ...) 解析出数据真实长度
            $lenData = unpack('Nlen', $header);
            $length  = $lenData['len'];

            // 5. 循环读取完整的包体
            $responseBody = '';
            while (strlen($responseBody) < $length) {
                $buffer = fread($fp, $length - strlen($responseBody));
                if ($buffer === false || $buffer === '') {
                    break;
                }
                $responseBody .= $buffer;
            }

            // 6. 解析结果
            $result = json_decode($responseBody, true);
            if (json_last_error() !== JSON_ERROR_NONE) {
                throw new \Exception("RPC 响应数据反序列化失败");
            }

            return $result;

        } finally {
            // 7. 关闭连接
            fclose($fp);
        }
    }
}

请谨慎使用此类代码。

2. 在控制器中调用测试

在客户端的控制器中,直接实例化并调用远程方法:

php

<?php
namespace app\controller;

use app\common\RpcClient;
use think\response\Json;

class Index
{
    public function testRpc(): Json
    {
        // 实例化客户端,指向 think-swoole 监听的 9502 端口
        $client = new RpcClient('127.0.0.1', 9502);

        try {
            // 远程调用 app\rpc\User 类的 getInfo 方法,传入参数 uid = 1001
            $response = $client->call('User', 'getInfo', ['uid' => 1001]);
            
            return json([
                'msg'  => '请求成功',
                'data' => $response
            ]);
        } catch (\Throwable $e) {
            return json([
                'msg'  => '请求失败',
                'error' => $e->getMessage()
            ], 500);
        }
    }
}

请谨慎使用此类代码。

💡 协议设计优化建议(生产环境必备)

  1. 连接池优化:上述客户端代码采用的是 Short Connection(短连接),每次请求都会经历 TCP 三次握手。如果在高并发场景下频繁调用,建议在客户端引入 Swoole 客户端连接池,保持长连接以提升性能。

  2. 多端兼容:如果您未来的客户端可能由 Go、Java 或 Node.js 编写,请务必告知对方开发人员:该 TCP 服务的协议是 LengthFieldBasedFrameDecoder(长度字段前缀解码器),长度占 4 字节,网络字节序为 Big-Endian(大端)