支持同步/异步的客户端封装

为了在原生 PHP 环境(如 FPM/Apache)下同时支持同步阻塞调用(一问一答)和异步非阻塞调用(发出请求后不等待,后续并发统一收包),最经典的架构是利用 stream_socket_client非阻塞模式结合 stream_select 进行多路复用。

以下是完整重构后的 JsonRpcClient.php,支持无缝切换同步与异步并发模式。

1. 支持同步/异步的客户端封装

app\common\rpc 目录下修改 JsonRpcClient.php

php

<?php
namespace app\common\rpc;

use RuntimeException;

class JsonRpcClient
{
    private string $host;
    private int $port;
    private float $timeout;
    private string $serviceName;

    // 用于存放异步请求的队列和 Socket 连接池
    private array $asyncQueue = [];
    private array $asyncSockets = [];

    public function __construct(string $host = '', int $port = 0, string $serviceName = '', float $timeout = 3.0)
    {
        $config = config('rpc.default', []);
        $this->host = !empty($host) ? $host : ($config['host'] ?? '127.0.0.1');
        $this->port = !empty($port) ? $port : ($config['port'] ?? 9502);
        $this->timeout = !empty($timeout) ? $timeout : ($config['timeout'] ?? 3.0);
        $this->serviceName = $serviceName;
    }

    public function withService(string $serviceName): self
    {
        $this->serviceName = $serviceName;
        return $this;
    }

    /**
     * 魔术方法:默认走【同步】调用
     */
    public function __call(string $method, array $params)
    {
        return $this->call($method, $params);
    }

    /**
     * 1. 同步阻塞调用
     */
    public function call(string $method, array $params)
    {
        $stream = $this->createConnection();
        $payload = $this->pack($method, $params, uniqid('rpc_sync_', true));

        if (fwrite($stream, $payload) === false) {
            fclose($stream);
            throw new RuntimeException("Send RPC request failed.");
        }

        try {
            $response = $this->readResponse($stream);
            return $this->unpack($response);
        } finally {
            fclose($stream);
        }
    }

    /**
     * 2. 异步调用:只发送请求,不等待返回,加入异步队列
     * @return string 返回本次请求的唯一标识 ID
     */
    public function asyncCall(string $method, array $params): string
    {
        if (empty($this->serviceName)) {
            throw new RuntimeException("RPC service name is not defined.");
        }

        $requestId = uniqid('rpc_async_', true);
        $payload = $this->pack($method, $params, $requestId);

        // 创建非阻塞连接
        $stream = $this->createConnection(true);
        
        // 发送数据
        if (@fwrite($stream, $payload) === false) {
            fclose($stream);
            throw new RuntimeException("Send async RPC request failed.");
        }

        // 记录 Socket 和请求 ID 的映射关系
        $this->asyncSockets[(int)$stream] = $stream;
        $this->asyncQueue[(int)$stream] = [
            'id' => $requestId,
            'service' => $this->serviceName,
            'method' => $method
        ];

        return $requestId;
    }

    /**
     * 3. 统一并发收包:等待所有异步请求返回结果(多路复用)
     * @return array 返回以 requestId 为键,结果为值的数组
     */
    public function wait(): array
    {
        if (empty($this->asyncSockets)) {
            return [];
        }

        $results = [];
        $readBuffers = [];

        while (!empty($this->asyncSockets)) {
            $read = $this->asyncSockets;
            $write = null;
            $except = null;

            // 计算超时时间(秒和微秒)
            $seconds = floor($this->timeout);
            $microseconds = ($this->timeout - $seconds) * 1000000;

            // 使用 stream_select 监控哪些 Socket 有数据可读
            $numChanged = stream_select($read, $write, $except, (int)$seconds, (int)$microseconds);

            if ($numChanged === false) {
                $this->clearAsyncPool();
                throw new RuntimeException("Stream select failed during async RPC.");
            }

            if ($numChanged === 0) {
                // 超时处理,关闭所有剩余未响应的连接
                $this->clearAsyncPool();
                throw new RuntimeException("Async RPC requests timeout.");
            }

            // 遍历有数据可读的 Socket
            foreach ($read as $stream) {
                $streamId = (int)$stream;
                
                if (!isset($readBuffers[$streamId])) {
                    $readBuffers[$streamId] = '';
                }

                // 异步流采用非阻塞读取,拼接缓冲区
                $buffer = stream_get_line($stream, 8192, "\r\n");
                
                if ($buffer !== false) {
                    $readBuffers[$streamId] .= $buffer;
                    
                    // 获取对应的请求信息
                    $reqInfo = $this->asyncQueue[$streamId];
                    
                    try {
                        $results[$reqInfo['id']] = $this->unpack($readBuffers[$streamId]);
                    } catch (\Exception $e) {
                        $results[$reqInfo['id']] = new RuntimeException("Async error: " . $e->getMessage());
                    }

                    // 读完后关闭并移除该 Socket
                    fclose($stream);
                    unset($this->asyncSockets[$streamId]);
                    unset($this->asyncQueue[$streamId]);
                    unset($readBuffers[$streamId]);
                } else {
                    // 对端关闭或读取异常
                    fclose($stream);
                    unset($this->asyncSockets[$streamId]);
                    unset($this->asyncQueue[$streamId]);
                }
            }
        }

        return $results;
    }

    /**
     * 创建原生网络连接
     */
    private function createConnection(bool $isAsync = false)
    {
        $remoteSocket = "tcp://{$this->host}:{$this->port}";
        $errno = 0;
        $errstr = '';

        $stream = @stream_socket_client(
            $remoteSocket,
            $errno,
            $errstr,
            $this->timeout,
            STREAM_CLIENT_CONNECT
        );

        if (!$stream) {
            throw new RuntimeException(sprintf("Connect to RPC server failed: %s (code: %s)", $errstr, $errno));
        }

        // 如果是异步模式,设置为非阻塞流
        if ($isAsync) {
            stream_set_blocking($stream, false);
        } else {
            $seconds = floor($this->timeout);
            $microseconds = ($this->timeout - $seconds) * 1000000;
            stream_set_timeout($stream, (int)$seconds, (int)$microseconds);
        }

        return $stream;
    }

    /**
     * 单个同步连接的读数据逻辑
     */
    private function readResponse($stream): string
    {
        $response = '';
        while (!feof($stream)) {
            $buffer = stream_get_line($stream, 8192, "\r\n");
            if ($buffer === false) {
                $info = stream_get_meta_data($stream);
                if ($info['timed_out']) {
                    throw new RuntimeException("Receive RPC response timeout.");
                }
                break;
            }
            $response .= $buffer;
            break; 
        }
        return $response;
    }

    /**
     * 打包协议数据
     */
    private function pack(string $method, array $params, string $id): string
    {
        return json_encode([
            'jsonrpc' => '2.0',
            'method'  => $this->serviceName . '@' . $method,
            'params'  => $params,
            'id'      => $id,
        ]) . "\r\n";
    }

    /**
     * 解包并校验协议数据
     */
    private function unpack(string $response)
    {
        if (empty($response)) {
            throw new RuntimeException("Empty RPC response.");
        }

        $responseData = json_decode($response, true);
        if (json_last_error() !== JSON_ERROR_NONE) {
            throw new RuntimeException("Failed to parse JSON-RPC response.");
        }

        if (isset($responseData['error'])) {
            $error = $responseData['error'];
            throw new RuntimeException($error['message'] ?? 'Unknown RPC Error', $error['code'] ?? 500);
        }

        return $responseData['result'] ?? null;
    }

    /**
     * 异常时清理异步资源
     */
    private function clearAsyncPool(): void
    {
        foreach ($this->asyncSockets as $stream) {
            if (is_resource($stream)) {
                fclose($stream);
            }
        }
        $this->asyncSockets = [];
        $this->asyncQueue = [];
    }
}

请谨慎使用此类代码。

2. 业务层调用示例

在 TP6 的控制器中,你现在可以自由地混合同步调用与并发的异步调用。

php

<?php
namespace app\controller;

use app\BaseController;
use app\common\rpc\JsonRpcClient;

class Order extends BaseController
{
    /**
     * 演示同步与异步调用
     */
    public function handle(JsonRpcClient $rpcClient)
    {
        try {
            // ==================== 1. 同步调用示例 ====================
            // 直接触发魔术方法,代码会在此处阻塞,直到拿到 UserService 的结果
            $userInfo = $rpcClient
                ->withService('app\rpc\service\UserService')
                ->getUserInfo(1001);


            // ==================== 2. 异步并发调用示例 ====================
            // 假设我们要同时去查询:商品库存、优惠券状态、物流运费(如果是同步需要依次等待,耗时累加)
            // 异步调用会立即返回一个 请求ID,不阻塞进程
            
            $rpcGoods = $rpcClient->withService('app\rpc\service\GoodsService');
            $id1 = $rpcGoods->asyncCall('checkStock', ['goods_id' => 55, 'num' => 1]);
            
            $rpcCoupon = $rpcClient->withService('app\rpc\service\CouponService');
            $id2 = $rpcCoupon->asyncCall('validateCoupon', ['coupon_id' => 999, 'user_id' => 1001]);
            
            $rpcExpress = $rpcClient->withService('app\rpc\service\ExpressService');
            $id3 = $rpcExpress->asyncCall('calculateFee', ['weight' => 1.5, 'region' => '北京']);

            // 统一收包:此处会发生阻塞,但三个请求是同时在服务端并行的
            // 最大阻塞时间取决于三个请求中最慢的那一个(以及设置的 timeout)
            $asyncResults = $rpcClient->wait();

            // 从结果集中根据请求 ID 取出对应的数据
            $stockData   = $asyncResults[$id1] ?? null;
            $couponData  = $asyncResults[$id2] ?? null;
            $expressFee  = $asyncResults[$id3] ?? null;

            return json([
                'status' => 'success',
                'sync_user_data' => $userInfo,
                'async_results'  => [
                    'stock'   => $stockData,
                    'coupon'  => $couponData,
                    'express' => $expressFee,
                ]
            ]);

        } catch (\Exception $e) {
            return json(['status' => 'error', 'msg' => $e->getMessage()], 500);
        }
    }
}

请谨慎使用此类代码。

💡 核心机制解析

  1. 同步逻辑 (__call):调用时直接新建 TCP 连接,发送并阻塞读取,结束后立马 fclose 释放。适合对前后依赖度极高、必须串行执行的业务。

  2. 异步逻辑 (asyncCall):将 stream_set_blocking($stream, false) 设置为非阻塞。这意味着调用 fwrite 写完数据后,程序不会死等服务端返回,而是立刻往下走。

  3. 多路复用驱动 (wait):利用底层的系统调用 stream_select。它会同时把 3 个 Socket 连接挂起监听,哪个连接的服务端先返回数据,就先去读哪个,直到所有连接读完或整体超时。这是非 Swoole 环境下实现并发 RPC 调用最高效的手段。