在 ThinkPHP 6.1 中,利用 think-swoole 扩展开发 RPC 服务并监听独立端口,最标准的做法是通过自定义服务类在 Swoole 启动时调用 addlistener。
以下是完整的实现步骤和代码示例:
1. 修改 config/swoole.php 配置
主服务依然保持原样(通常是 9501 端口用于 HTTP)。我们在配置文件的 rpc 项中(或自定义位置)添加 RPC 的配置,以便后续代码读取: [1]
php
return [
'server' => [
'host' => '0.0.0.0',
'port' => 9501, // 主 HTTP 端口
// ... 其他配置
],
// 自定义 RPC 监听配置
'rpc_server' => [
'host' => '0.0.0.0',
'port' => 9502, // 独立的 RPC 端口
],
];
请谨慎使用此类代码。
2. 创建自定义服务监听 RPC
在 app/service 目录下创建一个自定义服务类(例如 RpcService.php),利用 ThinkPHP 的服务注册机制,在 Swoole 启动前动态添加监听端口。
php
<?php
namespace app\service;
use think\Service;
use think\swoole\Manager;
use Swoole\Server as SwooleServer;
class RpcService extends Service
{
public function boot()
{
// 确保是在 CLI 模式下且引入了 think-swoole
if ($this->app->runningInConsole() && class_exists(Manager::class)) {
// 监听 swoole 启动事件或在适当的时机获取 Server 实例
$this->app->event->listen('swoole.init', function () {
/** @var Manager $manager */
$manager = $this->app->make(Manager::class);
$server = $manager->getServer();
// 获取配置
$config = $this->app->config->get('swoole.rpc_server');
// 动态添加 RPC 监听端口 (TCP 协议)
$rpcPort = $server->addListener($config['host'], $config['port'], SWOOLE_SOCK_TCP);
// 设置 RPC 端口的独立参数(如固定包头协议,防止 TCP 粘包)
$rpcPort->set([
'open_length_check' => true,
'package_length_type' => 'N',
'package_length_offset' => 0,
'package_body_offset' => 4,
'package_max_length' => 2048000,
]);
// 绑定监听事件
$rpcPort->on('receive', function (SwooleServer $serv, int $fd, int $reactorId, string $data) {
$this->handleRpcRequest($serv, $fd, $data);
});
});
}
}
/**
* 处理 RPC 请求核心逻辑
*/
protected function handleRpcRequest(SwooleServer $serv, int $fd, string $data)
{
try {
// 1. 解包(去掉前4字节的长度头)
$body = substr($data, 4);
$request = json_decode($body, true);
// 2. 解析请求参数 (示例格式:['class' => 'User', 'method' => 'getInfo', 'args' => []])
$class = $request['class'] ?? '';
$method = $request['method'] ?? '';
$args = $request['args'] ?? [];
// 3. 实例化对应的 RPC 服务类并调用方法
// 注意:建议限制可调用的命名空间,防止越权
$className = "\\app\\rpc\\" . ucfirst($class);
if (!class_exists($className)) {
throw new \Exception("Service {$class} not found");
}
$serviceInstance = $this->app->make($className);
if (!method_exists($serviceInstance, $method)) {
throw new \Exception("Method {$method} not found");
}
// 执行业务逻辑
$result = call_user_func_array([$serviceInstance, $method], $args);
$response = ['status' => 200, 'data' => $result, 'msg' => 'success'];
} catch (\Throwable $e) {
$response = ['status' => 500, 'data' => null, 'msg' => $e->getMessage()];
}
// 4. 响应客户端(打包:前4字节写入数据长度)
$sendData = json_encode($response);
$package = pack('N', strlen($sendData)) . $sendData;
$serv->send($fd, $package);
}
}
请谨慎使用此类代码。
3. 注册服务
在 app/service.php 配置文件中,将刚才编写的服务类注册进去:
php
return [
// ... 其他系统服务
\app\service\RpcService::class,
];
请谨慎使用此类代码。
4. 编写 RPC 业务类
创建 app/rpc/User.php 作为具体的 RPC 接口实现:
php
<?php
namespace app\rpc;
class User
{
public function getInfo(int $uid)
{
// 正常的 TP6.1 数据库操作或业务逻辑
return [
'uid' => $uid,
'name' => '张三',
'time' => date('Y-m-d H:i:s')
];
}
}
请谨慎使用此类代码。
5. 启动与测试
使用命令启动服务:
php think swoole。此时系统会同时监听 9501 (HTTP) 和 9502 (RPC/TCP)。客户端(可以是另一个 TP 项目或其他语言)通过 TCP 连接 9502 端口,并按照
pack('N', ...)的协议格式发送{"class":"User","method":"getInfo","args":[1001]}即可获取返回数据。
在自定义 TCP 协议时,核心要解决的问题是粘包和丢包。最安全、最高效的方法是采用 “固定包头 + 包体” 的格式。
在上面的示例中,我们使用了 pack('N', strlen($data)),也就是前 4 个字节存包体长度(大端字节序),后面紧跟 JSON 字符串。
为了配合前面服务端的配置,以下是为您编写的配套客户端(Client)自定义协议打包与解包的完整实现,可以直接用于其他 ThinkPHP 项目或独立 PHP 脚本中。
1. 编写自定义协议客户端类
您可以将此文件放入客户端项目的 app/common/RpcClient.php 或任意工具类目录中:
php
<?php
namespace app\common;
class RpcClient
{
protected $host;
protected $port;
protected $timeout;
public function __construct(string $host = '127.0.0.1', int $port = 9502, float $timeout = 2.5)
{
$this->host = $host;
$this->port = $port;
$this->timeout = $timeout;
}
/**
* 发送 RPC 请求
* @param string $class 类名
* @param string $method 方法名
* @param array $args 参数
* @return array
*/
public function call(string $class, string $method, array $args = [])
{
// 1. 创建 TCP 连接
$fp = stream_socket_client(
"tcp://{$this->host}:{$this->port}",
$errno,
$errstr,
$this->timeout
);
if (!$fp) {
throw new \Exception("RPC 远程连接失败: {$errstr} ({$errno})");
}
// 设置读写超时
stream_set_timeout($fp, (int)$this->timeout, ($this->timeout - (int)$this->timeout) * 1000000);
try {
// 2. 按照【自定义协议】组装请求数据
$body = json_encode([
'class' => $class,
'method' => $method,
'args' => $args
]);
// pack('N', ...) 将长度转为 4 字节无符号二进制大端序
$payload = pack('N', strlen($body)) . $body;
// 3. 发送数据
fwrite($fp, $payload);
// 4. 读取响应包头(前 4 个字节,获取返回数据的长度)
$header = fread($fp, 4);
if (strlen($header) < 4) {
throw new \Exception("RPC 读取响应包头失败");
}
// unpack('N', ...) 解析出数据真实长度
$lenData = unpack('Nlen', $header);
$length = $lenData['len'];
// 5. 循环读取完整的包体
$responseBody = '';
while (strlen($responseBody) < $length) {
$buffer = fread($fp, $length - strlen($responseBody));
if ($buffer === false || $buffer === '') {
break;
}
$responseBody .= $buffer;
}
// 6. 解析结果
$result = json_decode($responseBody, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception("RPC 响应数据反序列化失败");
}
return $result;
} finally {
// 7. 关闭连接
fclose($fp);
}
}
}
请谨慎使用此类代码。
2. 在控制器中调用测试
在客户端的控制器中,直接实例化并调用远程方法:
php
<?php
namespace app\controller;
use app\common\RpcClient;
use think\response\Json;
class Index
{
public function testRpc(): Json
{
// 实例化客户端,指向 think-swoole 监听的 9502 端口
$client = new RpcClient('127.0.0.1', 9502);
try {
// 远程调用 app\rpc\User 类的 getInfo 方法,传入参数 uid = 1001
$response = $client->call('User', 'getInfo', ['uid' => 1001]);
return json([
'msg' => '请求成功',
'data' => $response
]);
} catch (\Throwable $e) {
return json([
'msg' => '请求失败',
'error' => $e->getMessage()
], 500);
}
}
}
请谨慎使用此类代码。
💡 协议设计优化建议(生产环境必备)
连接池优化:上述客户端代码采用的是 Short Connection(短连接),每次请求都会经历 TCP 三次握手。如果在高并发场景下频繁调用,建议在客户端引入 Swoole 客户端连接池,保持长连接以提升性能。
多端兼容:如果您未来的客户端可能由 Go、Java 或 Node.js 编写,请务必告知对方开发人员:该 TCP 服务的协议是
LengthFieldBasedFrameDecoder(长度字段前缀解码器),长度占4 字节,网络字节序为Big-Endian(大端)。