
本文详解如何利用 php fiber 实现真正并发的非阻塞流读取——通过同时启动多个 fiber 并轮询其状态,避免串行等待,从而让不同 url 的 fread 交替执行,显著提升 i/o 密集型任务的吞吐效率。
本文详解如何利用 php fiber 实现真正并发的非阻塞流读取——通过同时启动多个 fiber 并轮询其状态,避免串行等待,从而让不同 url 的 fread 交替执行,显著提升 i/o 密集型任务的吞吐效率。
在 PHP 8.1+ 中,Fiber 提供了轻量级协程能力,但需注意:Fiber 本身不自动调度,也不隐式处理 I/O 等待。若像传统方式逐个 resume() 直至终止,实际仍是同步串行执行(如原代码中先耗尽 Google 流再处理 Twitter),无法发挥并发优势。
要实现真正的“多流交替读取”,关键在于 分离 Fiber 启动与调度逻辑:
✅ 先批量创建并启动所有 Fiber,保存其上下文;
✅ 再通过轮询(polling)机制,每次仅对每个未终止的 Fiber 执行一次 resume(),让它们以协作式方式交替推进;
✅ 配合非阻塞流(stream_set_blocking($stream, false))和 feof() 检查,安全处理部分读取与 EOF。
以下是优化后的完整实现:
<?php
function getFiberFromStream($stream, $url): Fiber {
return new Fiber(function ($stream) use ($url): void {
while (!feof($stream)) {
// 非阻塞 fread:可能返回空字符串或部分数据
$chunk = fread($stream, 100);
if ($chunk === false || $chunk === '') {
// 无数据可读(EAGAIN/EWOULDBLOCK),短暂让出控制权
Fiber::suspend('');
continue;
}
echo "reading 100 bytes from $url" . PHP_EOL;
Fiber::suspend($chunk);
}
// 显式结束,便于 isTerminated() 判断
});
}
function getContents(array $urls): array {
$contents = [];
$fibers = []; // 存储 [Fiber, current_content, stream] 三元组
// ✅ 第一阶段:批量启动所有 Fiber
foreach ($urls as $index => $url) {
$stream = fopen($url, 'r');
if (!$stream) {
throw new RuntimeException("Failed to open stream for $url");
}
stream_set_blocking($stream, false);
$fiber = getFiberFromStream($stream, $url);
$initialContent = $fiber->start($stream); // 启动并获取首次 yield 值
$fibers[$index] = [$fiber, $initialContent, $stream];
}
// ✅ 第二阶段:协作式轮询调度
$hasActive = true;
while ($hasActive) {
$hasActive = false;
foreach ($fibers as $index => &$item) {
[$fiber, $content, $stream] = $item;
if (!$fiber->isTerminated()) {
$hasActive = true;
// 执行一次 resume,获取下一块数据
$nextChunk = $fiber->resume();
$item[1] = $content .= $nextChunk; // 更新 content
} else {
// Fiber 已完成:关闭流并保存结果
if ($stream) {
fclose($stream);
$item[2] = null; // 防止重复关闭
}
$contents[$urls[$index]] = $content;
}
}
}
return $contents;
}
// 使用示例(建议搭配支持 HTTP/2 或高并发的代理环境测试)
$urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
];
try {
$results = getContents($urls);
echo "\n✅ All requests completed.\n";
foreach ($urls as $url) {
echo sprintf("Length of %s: %d bytes\n", $url, strlen($results[$url] ?? ''));
}
} catch (Throwable $e) {
echo "❌ Error: " . $e->getMessage() . "\n";
}⚠️ 关键注意事项
- 非阻塞流的语义:fread() 在非阻塞模式下可能立即返回 ''(无数据)或 false(错误)。必须检查返回值,避免空循环;必要时可加入微小 usleep(1000) 防止 CPU 空转(本例中由 Fiber::suspend('') 自然让出)。
- 资源管理:务必在 Fiber 终止后显式 fclose(),且确保每个流只关闭一次(通过置 null 标记)。
- 错误处理:fopen() 失败、fread() 返回 false、远程连接中断等均需捕获异常或检查返回值,生产环境不可忽略。
- 性能边界:Fiber 轮询适用于中低并发(如数十个连接);超大规模场景建议结合 ext-event 或 Swoole 等事件驱动扩展。
- PHP 版本要求:需 PHP ≥ 8.1,且 --enable-fiber 编译选项启用(默认已开启)。
✅ 总结
真正的 Fiber 并发 ≠ 同时启动,而在于可控的协作调度。通过将“启动”与“推进”解耦,并采用轮询策略,我们让多个非阻塞流读取操作交织执行,输出日志即验证了预期的交错行为(Google/Twitter/Facebook 交替读取)。这为构建高性能 PHP I/O 工具链(如并发爬虫、API 聚合器)提供了坚实基础——简洁、可控、无需外部扩展。










