为了在原生 PHP 环境(如 FPM/Apache)下同时支持同步阻塞调用(一问一答)和异步非阻塞调用(发出请求后不等待,后续并发统一收包),最经典的架构是利用 stream_socket_client 的非阻塞模式结合 stream_select 进行多路复用。
以下是完整重构后的 JsonRpcClient.php,支持无缝切换同步与异步并发模式。
1. 支持同步/异步的客户端封装
在 app\common\rpc 目录下修改 JsonRpcClient.php:
php
<?php
namespace app\common\rpc;
use RuntimeException;
class JsonRpcClient
{
private string $host;
private int $port;
private float $timeout;
private string $serviceName;
// 用于存放异步请求的队列和 Socket 连接池
private array $asyncQueue = [];
private array $asyncSockets = [];
public function __construct(string $host = '', int $port = 0, string $serviceName = '', float $timeout = 3.0)
{
$config = config('rpc.default', []);
$this->host = !empty($host) ? $host : ($config['host'] ?? '127.0.0.1');
$this->port = !empty($port) ? $port : ($config['port'] ?? 9502);
$this->timeout = !empty($timeout) ? $timeout : ($config['timeout'] ?? 3.0);
$this->serviceName = $serviceName;
}
public function withService(string $serviceName): self
{
$this->serviceName = $serviceName;
return $this;
}
/**
* 魔术方法:默认走【同步】调用
*/
public function __call(string $method, array $params)
{
return $this->call($method, $params);
}
/**
* 1. 同步阻塞调用
*/
public function call(string $method, array $params)
{
$stream = $this->createConnection();
$payload = $this->pack($method, $params, uniqid('rpc_sync_', true));
if (fwrite($stream, $payload) === false) {
fclose($stream);
throw new RuntimeException("Send RPC request failed.");
}
try {
$response = $this->readResponse($stream);
return $this->unpack($response);
} finally {
fclose($stream);
}
}
/**
* 2. 异步调用:只发送请求,不等待返回,加入异步队列
* @return string 返回本次请求的唯一标识 ID
*/
public function asyncCall(string $method, array $params): string
{
if (empty($this->serviceName)) {
throw new RuntimeException("RPC service name is not defined.");
}
$requestId = uniqid('rpc_async_', true);
$payload = $this->pack($method, $params, $requestId);
// 创建非阻塞连接
$stream = $this->createConnection(true);
// 发送数据
if (@fwrite($stream, $payload) === false) {
fclose($stream);
throw new RuntimeException("Send async RPC request failed.");
}
// 记录 Socket 和请求 ID 的映射关系
$this->asyncSockets[(int)$stream] = $stream;
$this->asyncQueue[(int)$stream] = [
'id' => $requestId,
'service' => $this->serviceName,
'method' => $method
];
return $requestId;
}
/**
* 3. 统一并发收包:等待所有异步请求返回结果(多路复用)
* @return array 返回以 requestId 为键,结果为值的数组
*/
public function wait(): array
{
if (empty($this->asyncSockets)) {
return [];
}
$results = [];
$readBuffers = [];
while (!empty($this->asyncSockets)) {
$read = $this->asyncSockets;
$write = null;
$except = null;
// 计算超时时间(秒和微秒)
$seconds = floor($this->timeout);
$microseconds = ($this->timeout - $seconds) * 1000000;
// 使用 stream_select 监控哪些 Socket 有数据可读
$numChanged = stream_select($read, $write, $except, (int)$seconds, (int)$microseconds);
if ($numChanged === false) {
$this->clearAsyncPool();
throw new RuntimeException("Stream select failed during async RPC.");
}
if ($numChanged === 0) {
// 超时处理,关闭所有剩余未响应的连接
$this->clearAsyncPool();
throw new RuntimeException("Async RPC requests timeout.");
}
// 遍历有数据可读的 Socket
foreach ($read as $stream) {
$streamId = (int)$stream;
if (!isset($readBuffers[$streamId])) {
$readBuffers[$streamId] = '';
}
// 异步流采用非阻塞读取,拼接缓冲区
$buffer = stream_get_line($stream, 8192, "\r\n");
if ($buffer !== false) {
$readBuffers[$streamId] .= $buffer;
// 获取对应的请求信息
$reqInfo = $this->asyncQueue[$streamId];
try {
$results[$reqInfo['id']] = $this->unpack($readBuffers[$streamId]);
} catch (\Exception $e) {
$results[$reqInfo['id']] = new RuntimeException("Async error: " . $e->getMessage());
}
// 读完后关闭并移除该 Socket
fclose($stream);
unset($this->asyncSockets[$streamId]);
unset($this->asyncQueue[$streamId]);
unset($readBuffers[$streamId]);
} else {
// 对端关闭或读取异常
fclose($stream);
unset($this->asyncSockets[$streamId]);
unset($this->asyncQueue[$streamId]);
}
}
}
return $results;
}
/**
* 创建原生网络连接
*/
private function createConnection(bool $isAsync = false)
{
$remoteSocket = "tcp://{$this->host}:{$this->port}";
$errno = 0;
$errstr = '';
$stream = @stream_socket_client(
$remoteSocket,
$errno,
$errstr,
$this->timeout,
STREAM_CLIENT_CONNECT
);
if (!$stream) {
throw new RuntimeException(sprintf("Connect to RPC server failed: %s (code: %s)", $errstr, $errno));
}
// 如果是异步模式,设置为非阻塞流
if ($isAsync) {
stream_set_blocking($stream, false);
} else {
$seconds = floor($this->timeout);
$microseconds = ($this->timeout - $seconds) * 1000000;
stream_set_timeout($stream, (int)$seconds, (int)$microseconds);
}
return $stream;
}
/**
* 单个同步连接的读数据逻辑
*/
private function readResponse($stream): string
{
$response = '';
while (!feof($stream)) {
$buffer = stream_get_line($stream, 8192, "\r\n");
if ($buffer === false) {
$info = stream_get_meta_data($stream);
if ($info['timed_out']) {
throw new RuntimeException("Receive RPC response timeout.");
}
break;
}
$response .= $buffer;
break;
}
return $response;
}
/**
* 打包协议数据
*/
private function pack(string $method, array $params, string $id): string
{
return json_encode([
'jsonrpc' => '2.0',
'method' => $this->serviceName . '@' . $method,
'params' => $params,
'id' => $id,
]) . "\r\n";
}
/**
* 解包并校验协议数据
*/
private function unpack(string $response)
{
if (empty($response)) {
throw new RuntimeException("Empty RPC response.");
}
$responseData = json_decode($response, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new RuntimeException("Failed to parse JSON-RPC response.");
}
if (isset($responseData['error'])) {
$error = $responseData['error'];
throw new RuntimeException($error['message'] ?? 'Unknown RPC Error', $error['code'] ?? 500);
}
return $responseData['result'] ?? null;
}
/**
* 异常时清理异步资源
*/
private function clearAsyncPool(): void
{
foreach ($this->asyncSockets as $stream) {
if (is_resource($stream)) {
fclose($stream);
}
}
$this->asyncSockets = [];
$this->asyncQueue = [];
}
}
请谨慎使用此类代码。
2. 业务层调用示例
在 TP6 的控制器中,你现在可以自由地混合同步调用与并发的异步调用。
php
<?php
namespace app\controller;
use app\BaseController;
use app\common\rpc\JsonRpcClient;
class Order extends BaseController
{
/**
* 演示同步与异步调用
*/
public function handle(JsonRpcClient $rpcClient)
{
try {
// ==================== 1. 同步调用示例 ====================
// 直接触发魔术方法,代码会在此处阻塞,直到拿到 UserService 的结果
$userInfo = $rpcClient
->withService('app\rpc\service\UserService')
->getUserInfo(1001);
// ==================== 2. 异步并发调用示例 ====================
// 假设我们要同时去查询:商品库存、优惠券状态、物流运费(如果是同步需要依次等待,耗时累加)
// 异步调用会立即返回一个 请求ID,不阻塞进程
$rpcGoods = $rpcClient->withService('app\rpc\service\GoodsService');
$id1 = $rpcGoods->asyncCall('checkStock', ['goods_id' => 55, 'num' => 1]);
$rpcCoupon = $rpcClient->withService('app\rpc\service\CouponService');
$id2 = $rpcCoupon->asyncCall('validateCoupon', ['coupon_id' => 999, 'user_id' => 1001]);
$rpcExpress = $rpcClient->withService('app\rpc\service\ExpressService');
$id3 = $rpcExpress->asyncCall('calculateFee', ['weight' => 1.5, 'region' => '北京']);
// 统一收包:此处会发生阻塞,但三个请求是同时在服务端并行的
// 最大阻塞时间取决于三个请求中最慢的那一个(以及设置的 timeout)
$asyncResults = $rpcClient->wait();
// 从结果集中根据请求 ID 取出对应的数据
$stockData = $asyncResults[$id1] ?? null;
$couponData = $asyncResults[$id2] ?? null;
$expressFee = $asyncResults[$id3] ?? null;
return json([
'status' => 'success',
'sync_user_data' => $userInfo,
'async_results' => [
'stock' => $stockData,
'coupon' => $couponData,
'express' => $expressFee,
]
]);
} catch (\Exception $e) {
return json(['status' => 'error', 'msg' => $e->getMessage()], 500);
}
}
}
请谨慎使用此类代码。
💡 核心机制解析
同步逻辑 (
__call):调用时直接新建 TCP 连接,发送并阻塞读取,结束后立马fclose释放。适合对前后依赖度极高、必须串行执行的业务。异步逻辑 (
asyncCall):将stream_set_blocking($stream, false)设置为非阻塞。这意味着调用fwrite写完数据后,程序不会死等服务端返回,而是立刻往下走。多路复用驱动 (
wait):利用底层的系统调用stream_select。它会同时把 3 个 Socket 连接挂起监听,哪个连接的服务端先返回数据,就先去读哪个,直到所有连接读完或整体超时。这是非 Swoole 环境下实现并发 RPC 调用最高效的手段。