在 ThinkPHP6 中使用 Swoole 客户端封装 JSON-RPC 客户端,最核心的高效做法是结合 Swoole 的协程 TCP 客户端 (Swoole\Coroutine\Client) 与 PHP 的魔术方法 (__call),从而实现像调用本地方法一样调用远程服务(RPC 代理)。 [1]
以下是完整的封装方案,包含客户端核心类、TP6 服务注册以及业务调用示例。
1. 核心 JSON-RPC 客户端封装
在 app\common\rpc 目录下创建 JsonRpcClient.php(如目录不存在请自行创建):
php
<?php
namespace app\common\rpc;
use Swoole\Coroutine\Client as CoClient;
use RuntimeException;
class JsonRpcClient
{
private string $host;
private int $port;
private float $timeout;
private string $serviceName;
/**
* @param string $host 服务端IP
* @param int $port 服务端端口
* @param string $serviceName 调用的服务类名/命名空间
* @param float $timeout 超时时间
*/
public function __construct(string $host, int $port, string $serviceName = '', float $timeout = 2.0)
{
$this->host = $host;
$this->port = $port;
$this->serviceName = $serviceName;
$this->timeout = $timeout;
}
/**
* 链式调用:设置当前调用的服务接口
*/
public function withService(string $serviceName): self
{
$this->serviceName = $serviceName;
return $this;
}
/**
* 魔术方法:动态捕获方法调用并转化为 JSON-RPC 请求
*/
public function __call(string $method, array $params)
{
if (empty($this->serviceName)) {
throw new RuntimeException("RPC service name is not defined.");
}
// 1. 构造标准的 JSON-RPC 2.0 请求协议包
$requestId = uniqid('rpc_', true);
$requestData = [
'jsonrpc' => '2.0',
'method' => $this->serviceName . '@' . $method, // 支持服务端定位类和方法
'params' => $params,
'id' => $requestId,
];
// 2. 序列化数据并追加 EOF 结束符(根据服务端配置,这里以 \r\n 为例)
$payload = json_encode($requestData) . "\r\n";
// 3. 使用 Swoole 协程客户端发送并接收数据
$client = new CoClient(SWOOLE_SOCK_TCP);
if (!$client->connect($this->host, $this->port, $this->timeout)) {
throw new RuntimeException(sprintf("Connect to RPC server failed: %s (code: %s)", $client->errMsg, $client->errCode));
}
if (!$client->send($payload)) {
throw new RuntimeException("Send RPC request failed.");
}
// 接收响应(服务端通常也会开启 package_eof_check)
$response = $client->recv($this->timeout);
$client->close();
if ($response === false || $response === '') {
throw new RuntimeException("Receive RPC response timeout or empty.");
}
// 4. 解析响应数据
$responseData = json_decode(trim($response), true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new RuntimeException("Failed to parse JSON-RPC response.");
}
// 5. 校验结果与处理错误
if (isset($responseData['error'])) {
$error = $responseData['error'];
throw new RuntimeException($error['message'] ?? 'Unknown RPC Error', $error['code'] ?? 500);
}
return $responseData['result'] ?? null;
}
}
请谨慎使用此类代码。
2. 将 RPC 客户端注入 ThinkPHP6 容器
为了方便在控制器或服务中一键调用,可以通过 TP6 的服务或配置文件将其容器化。
新建或修改配置文件 config/rpc.php:
php
<?php
return [
'default' => [
'host' => '127.0.0.1',
'port' => 9502,
'timeout' => 3.0,
],
];
请谨慎使用此类代码。
在 app\service.php 中注册全局服务,或者直接在控制器的构造函数中实例化:
php
<?php
namespace app\controller;
use app\BaseController;
use app\common\rpc\JsonRpcClient;
class Order extends BaseController
{
private JsonRpcClient $rpcClient;
public function __construct(\think\App $app)
{
parent::__construct($app);
// 从配置中读取并初始化 RPC 客户端
$config = config('rpc.default');
$this->rpcClient = new JsonRpcClient($config['host'], $config['port']);
}
public function create()
{
// 假设你要调用微服务中的 UserService 类的 getUserInfo 方法
// 必须在 Swoole 协程上下文中执行
try {
$userId = 1001;
// 链式指定服务名,并直接调用远程方法
$userInfo = $this->rpcClient
->withService('app\rpc\service\UserService')
->getUserInfo($userId);
return json([
'code' => 200,
'data' => $userInfo
]);
} catch (\Exception $e) {
return json([
'code' => 500,
'msg' => $e->getMessage()
]);
}
}
}
请谨慎使用此类代码。
3. 封装要点说明与生产环境优化
协程环境限制:
Swoole\Coroutine\Client必须运行在 Swoole 协程上下文(如think-swoole运行的环境下)。如果是用传统 FPM 模式做客户端调用,请将Swoole\Coroutine\Client替换为原生的fsockopen或基于流的 socket 客户端。 [1, 2]粘包处理(协议边界):示例代码中使用的是简单的
\r\n换行符(EOF 结束符)来拆包。生产环境下建议在config/swoole.php服务端和本客户端统一配置 固定包头协议(open_length_check),以防大数据传输时产生粘包、断包。连接池优化:高并发场景下,每次请求都
new CoClient建立长连接并在结束后close仍会产生频繁握手开销。建议配合think-swoole自带的 ConnectionPool(连接池) 封装 RPC 连接,维持 TCP 长连接复用。
若你当前的微服务架构对自动生成客户端接口代码(RPC Interface)有更高要求,可以考虑参考官方 think-swoole rpc:interface 的生成逻辑进行自动化解耦绑定。
由于原生 PHP 客户端(如基于 FPM 环境)不支持 Swoole 的协程连接池和协程客户端,我们需要将网络通信底层完全替换为 PHP 原生的流阻塞/非阻塞 Socket 客户端(使用 stream_socket_client 或 fsockopen)。
为了保证代码的高内聚,我们可以在客户端内部保持 __call 的魔术方法逻辑不变,仅将底层的发送和接收网络逻辑重构。
以下是基于原生 PHP Stream 流(比 fsockopen 更好控制超时和换行)的 JSON-RPC 客户端完整实现:
1. 原生 PHP Socket 客户端封装
在 app\common\rpc 目录下创建或修改 JsonRpcClient.php:
php
<?php
namespace app\common\rpc;
use RuntimeException;
class JsonRpcClient
{
private string $host;
private int $port;
private float $timeout;
private string $serviceName;
/**
* @param string $host 服务端IP
* @param int $port 服务端端口
* @param string $serviceName 调用的服务类名/命名空间
* @param float $timeout 超时时间(秒)
*/
public function __construct(string $host = '', int $port = 0, string $serviceName = '', float $timeout = 3.0)
{
// 优先读取 TP6 配置,若未传入则读默认配置
$config = config('rpc.default', []);
$this->host = !empty($host) ? $host : ($config['host'] ?? '127.0.0.1');
$this->port = !empty($port) ? $port : ($config['port'] ?? 9502);
$this->timeout = !empty($timeout) ? $timeout : ($config['timeout'] ?? 3.0);
$this->serviceName = $serviceName;
}
/**
* 链式调用:设置当前调用的服务接口
*/
public function withService(string $serviceName): self
{
$this->serviceName = $serviceName;
return $this;
}
/**
* 魔术方法:动态捕获方法调用并转化为 JSON-RPC 请求
*/
public function __call(string $method, array $params)
{
if (empty($this->serviceName)) {
throw new RuntimeException("RPC service name is not defined.");
}
// 1. 构造标准的 JSON-RPC 2.0 请求协议包
$requestId = uniqid('rpc_', true);
$requestData = [
'jsonrpc' => '2.0',
'method' => $this->serviceName . '@' . $method,
'params' => $params,
'id' => $requestId,
];
// 2. 序列化数据并追加 EOF 结束符(与 Swoole 服务端对齐)
$payload = json_encode($requestData) . "\r\n";
// 3. 使用原生 stream_socket_client 建立 TCP 连接(替代 fsockopen)
$remoteSocket = "tcp://{$this->host}:{$this->port}";
$errno = 0;
$errstr = '';
// 创建流连接
$stream = @stream_socket_client(
$remoteSocket,
$errno,
$errstr,
$this->timeout,
STREAM_CLIENT_CONNECT
);
if (!$stream) {
throw new RuntimeException(sprintf("Connect to RPC server [%s] failed: %s (code: %s)", $remoteSocket, $errstr, $errno));
}
// 设置流的读取超时时间(秒和微秒)
$seconds = floor($this->timeout);
$microseconds = ($this->timeout - $seconds) * 1000000;
stream_set_timeout($stream, (int)$seconds, (int)$microseconds);
try {
// 4. 发送数据
$fwrite = fwrite($stream, $payload);
if ($fwrite === false) {
throw new RuntimeException("Send RPC request failed.");
}
// 5. 接收响应(处理 EOF 协议:读取到 "\r\n" 为止)
$response = '';
while (!feof($stream)) {
// stream_get_line 比 fgets 性能更好,且能自动裁剪结束符
$buffer = stream_get_line($stream, 8192, "\r\n");
if ($buffer === false) {
// 检查是否是因为超时导致的读取结束
$info = stream_get_meta_data($stream);
if ($info['timed_out']) {
throw new RuntimeException("Receive RPC response timeout.");
}
break;
}
$response .= $buffer;
break; // 由于 RPC 是一问一答且按 \r\n 拆包,读取到一行即可退出
}
} finally {
// 确保在任何情况下都会关闭连接(短连接模式)
fclose($stream);
}
if (empty($response)) {
throw new RuntimeException("Receive RPC response empty.");
}
// 6. 解析响应数据
$responseData = json_decode($response, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new RuntimeException("Failed to parse JSON-RPC response. Raw: " . $response);
}
// 7. 校验结果与处理错误
if (isset($responseData['error'])) {
$error = $responseData['error'];
throw new RuntimeException($error['message'] ?? 'Unknown RPC Error', $error['code'] ?? 500);
}
return $responseData['result'] ?? null;
}
}
请谨慎使用此类代码。
2. 配置文件 config/rpc.php
在 TP6 的配置目录下设置服务端地址:
php
<?php
return [
'default' => [
'host' => '127.0.0.1', // Swoole 服务端 IP
'port' => 9502, // Swoole 服务端 端口
'timeout' => 3.0, // 建立连接与读取数据的超时时间
],
];
请谨慎使用此类代码。
3. 控制器层直接调用
原生模式下,该客户端无需处于 Swoole 协程环境中,在传统的 Apache / php-fpm 模式下即可完美运行:
php
<?php
namespace app\controller;
use app\BaseController;
use app\common\rpc\JsonRpcClient;
class User extends BaseController
{
/**
* 通过 TP6 容器自动实例化客户端
*/
public function profile(JsonRpcClient $rpcClient)
{
try {
// 像本地方法一样直接跨网络调用 Swoole 服务端的方法
$userInfo = $rpcClient
->withService('app\rpc\service\UserService')
->getUserInfo(1001);
return json([
'status' => 'success',
'data' => $userInfo
]);
} catch (\Exception $e) {
return json([
'status' => 'error',
'msg' => $e->getMessage()
], 500);
}
}
}
请谨慎使用此类代码。
💡 为什么选择 stream_socket_client 而不是 fsockopen?
控制粒度更细:
stream_socket_client允许使用STREAM_CLIENT_ASYNC_CONNECT(异步连接)或配合stream_select进行多路复用。错误处理更优雅:
fsockopen会直接触发 PHP 的Notice/Warning级别错误,而stream_socket_client可以通过@抑制符完美转换为自定义的RuntimeException。配合
stream_get_line更安全:相比于fgets会把换行符\r\n一并读出,stream_get_line($stream, 8192, "\r\n")可以在读取的同时直接将约定的 EOF 结束符裁剪掉,避免了后期多余的trim()动作。