
本文详解如何利用 php fiber 实现多个 http 流的真正并发读取,避免串行等待;通过统一调度未终止 fiber、轮询式分片读取,达成类似协程的 i/o 多路复用效果。
本文详解如何利用 php fiber 实现多个 http 流的真正并发读取,避免串行等待;通过统一调度未终止 fiber、轮询式分片读取,达成类似协程的 i/o 多路复用效果。
在 PHP 8.1+ 中,Fiber 提供了用户态协作式调度能力,是构建轻量异步 I/O 的理想基础——但需注意:Fiber 本身不提供自动 I/O 调度,必须由开发者显式轮询与协调。你遇到的问题(google.com 完全读完才开始 twitter.com)根源在于:原代码对每个 Fiber 执行了“同步耗尽式循环”(while (!$fiber->isTerminated()) { $fiber->resume(); }),导致调度权被单个 Fiber 长期独占,丧失并发性。
要实现真正的交错读取(如先各读 100 字节,再循环),关键策略是:启动全部 Fiber 后,进入统一事件循环,每轮仅对每个活跃 Fiber 调用一次 resume(),让它们协同推进。这模拟了经典的“协作式多任务”模型。
以下是重构后的完整可运行示例:
<?php
function getFiberFromStream($stream, $url): Fiber {
return new Fiber(function ($stream) use ($url): void {
while (!feof($stream)) {
echo "reading 100 bytes from $url" . PHP_EOL;
$contents = fread($stream, 100);
// 主动让出控制权,返回当前批次数据
Fiber::suspend($contents);
}
// feof 触发后 fiber 自然终止
});
}
function getContents(array $urls): array {
$contents = [];
$fibers = []; // 存储 [Fiber, 当前累积内容, stream] 三元组
// ✅ 第一阶段:批量启动所有 Fiber(不等待任何完成)
foreach ($urls as $key => $url) {
$stream = fopen($url, 'r');
if (!$stream) {
throw new RuntimeException("Failed to open stream for $url");
}
stream_set_blocking($stream, false); // 关键:设为非阻塞!
$fiber = getFiberFromStream($stream, $url);
$initialContent = $fiber->start($stream); // 启动并获取首块数据
$fibers[$key] = [$fiber, $initialContent ?? '', $stream];
}
// ✅ 第二阶段:统一轮询调度(核心!)
$hasActive = true;
while ($hasActive) {
$hasActive = false;
foreach ($fibers as $key => &$item) {
[$fiber, $content, $stream] = $item;
if (!$fiber->isTerminated()) {
$hasActive = true;
// 仅执行一次 resume,获取下一批数据
try {
$nextChunk = $fiber->resume();
$item[1] = $content . ($nextChunk ?? '');
} catch (Throwable $e) {
// Fiber 内部异常(如 fread 失败),标记为终止并记录错误
$item[1] = $content . "[ERROR: {$e->getMessage()}]";
$fiber = null; // 防止重复 resume
}
} else {
// Fiber 已终止:关闭流并保存结果
if ($stream && is_resource($stream)) {
fclose($stream);
$item[2] = null; // 清空引用,避免重复关闭
}
$contents[$urls[$key]] = $content;
unset($fibers[$key]); // 清理已完成项,减少后续遍历开销
}
}
// ⚠️ 重要:无活跃 Fiber 时需退出,否则死循环
if (empty($fibers)) break;
}
return $contents;
}
// 使用示例(建议在支持 HTTPS 的环境中运行)
$urls = [
'https://httpbin.org/delay/1', // 模拟慢响应
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
];
// 注意:实际生产中应添加超时、重试、错误熔断等健壮性逻辑
$result = getContents($urls);
var_dump(array_keys($result));关键要点说明
- 非阻塞流是前提:stream_set_blocking($stream, false) 不可省略。若流阻塞,fread() 将挂起整个 Fiber,破坏并发性。
- 调度逻辑分离:启动(start)与执行(resume)严格解耦。所有 Fiber 必须先 start(),再统一 resume(),这是实现交错执行的基石。
- 状态管理需显式:Fiber 无内置上下文存储,需用数组或对象维护 $content 和 $stream 等状态,避免闭包变量被覆盖。
- 资源安全关闭:务必在 Fiber 终止后 fclose(),且通过置空 $item[2] 或 unset() 防止重复关闭导致警告。
- 错误防御不可少:真实场景中,网络抖动、DNS 失败、SSL 错误均会导致 fread() 返回 false 或抛异常,应在 resume() 外层加 try/catch。
? 进阶提示:此模式是手动实现的“协程调度器”。若项目复杂度上升,建议迁移到成熟异步框架(如 Swoole 或 ReactPHP),它们已封装 epoll/kqueue 事件循环与自动流调度,大幅降低心智负担。
立即学习“PHP免费学习笔记(深入)”;
通过以上重构,输出将严格符合预期交错模式,真正释放 Fiber 在 I/O 密集型场景下的并发潜力。











