0

0

NodeJS Streams:在 Pipeline 中优雅地提前结束读取流

花韻仙語

花韻仙語

发布时间:2025-07-22 18:04:01

|

1072人浏览过

|

来源于php中文网

原创

nodejs streams:在 pipeline 中优雅地提前结束读取流

本文探讨了在使用 NodeJS Streams 的 pipeline 处理大型文件时,如何在满足特定条件后提前结束读取流,同时确保已读取的数据块能够完成处理。文章提供了两种解决方案:一种是在转换流中“吞噬”后续数据,另一种是利用 AbortController 中止 pipeline,并详细讲解了实现方法和注意事项,旨在帮助开发者更有效地处理流数据。

在使用 NodeJS streams 的 pipeline 处理大型文件时,有时需要在特定条件满足时提前结束读取流,但同时又希望已读取的数据块能够继续完成处理。直接销毁读取流可能会导致 ERR_STREAM_PREMATURE_CLOSE 错误,并且不够优雅。本文将介绍两种更佳的解决方案,帮助你安全且高效地实现这一需求。

方案一:在转换流中“吞噬”后续数据

这种方法的核心思想是在检测到需要停止读取的条件后,让转换流“吞噬”后续的所有数据,使其不再向下传递。这样,读取流会一直读取到文件末尾,但下游的流只处理到满足停止条件之前的数据。

以下是示例代码:

const { Transform } = require("node:stream");
const { pipeline } = require("node:stream/promises");
const fs = require("node:fs");

let shouldStop = false;
const firstStream = fs.createReadStream("./lg.txt");

const secondStream = new Transform({
    transform(chunk, encoding, callback) {
        if (shouldStop) {
            // 吞噬剩余数据
            callback(null, "");
        } else {
            const text = chunk.toString();
            const foundText = text.search("CHAPTER 9") !== -1;
            if (foundText) {
                // 设置标志位,吞噬剩余数据
                shouldStop = true;
            }
            callback(null, text.toUpperCase());
        }
    },
});

const lastStream = process.stdout;

pipeline(firstStream, secondStream, lastStream)
    .then(() => console.log("Pipeline completed successfully."))
    .catch(err => console.error("Pipeline failed.", err));

代码解释:

  1. shouldStop 变量用于标记是否需要停止处理数据。
  2. 在 secondStream 的 transform 函数中,如果 shouldStop 为 true,则直接调用 callback(null, ""),表示吞噬当前数据块,不向下传递。
  3. 如果 shouldStop 为 false,则检查当前数据块中是否包含目标文本。如果包含,则设置 shouldStop 为 true,并对数据进行转换后向下传递。

优点:

  • 逻辑简单,易于理解和实现。
  • 不需要中断 pipeline,避免了潜在的错误。

缺点:

百度智能云·曦灵
百度智能云·曦灵

百度旗下的AI数字人平台

下载
  • 读取流会一直读取到文件末尾,可能会浪费一些资源。

方案二:使用 AbortController 中止 Pipeline

AbortController 提供了一种更优雅的方式来中止 pipeline,并且可以进行清理工作。通过 AbortController,可以向 pipeline 发送一个中止信号,pipeline 会在完成当前数据块的处理后停止。

以下是示例代码:

const { Transform } = require("node:stream");
const { pipeline } = require("node:stream/promises");
const fs = require("node:fs");

const firstStream = fs.createReadStream("./lg.txt");

const ac = new AbortController();
const signal = ac.signal;

const secondStream = new Transform({
    transform(chunk, encoding, callback) {
        const text = chunk.toString();
        const foundText = text.search("CHAPTER 9") !== -1;

        callback(null, text.toUpperCase());
        if (foundText) {
            ac.abort(new Error("reading terminated, match found"));
        }

    },
});

const lastStream = process.stdout;

pipeline(firstStream, secondStream, lastStream, { signal }).then(() => {
    console.log("\nall done without match");
}).catch((err) => {
    if (err.code === "ABORT_ERR") {
        console.log(`\n${signal.reason.message}`);
    } else {
        console.log(err);
    }
});

代码解释:

  1. 创建 AbortController 实例 ac,并获取其 signal 属性。
  2. 在 secondStream 的 transform 函数中,如果检测到目标文本,则调用 ac.abort(new Error("reading terminated, match found")),发送中止信号。
  3. 在调用 pipeline 函数时,将 signal 作为选项传递。
  4. 使用 try...catch 捕获 pipeline 函数可能抛出的错误。如果错误代码为 ABORT_ERR,则表示 pipeline 被中止,可以获取中止原因。

优点:

  • 更优雅地中止 pipeline,可以进行清理工作。
  • 可以获取中止原因,方便调试。

缺点:

  • 代码相对复杂一些。

注意事项:

  • 跨 Chunk 边界问题: 在搜索目标文本时,需要注意目标文本可能跨越 chunk 边界的情况。为了避免漏检,可以保留每个 chunk 的最后 N-1 个字符,并将其添加到下一个 chunk 的开头,其中 N 为目标文本的长度。
  • 错误处理: 在使用 pipeline 函数时,需要注意错误处理。可以使用 try...catch 语句捕获可能抛出的错误,并进行相应的处理。
  • 资源释放: 在中止 pipeline 后,需要确保所有资源都得到正确释放。

总结

本文介绍了两种在 NodeJS Streams 的 pipeline 中提前结束读取流的解决方案。第一种方案是在转换流中“吞噬”后续数据,逻辑简单,但可能会浪费一些资源。第二种方案是使用 AbortController 中止 pipeline,更优雅,可以进行清理工作,但代码相对复杂。选择哪种方案取决于具体的应用场景和需求。同时,需要注意跨 chunk 边界问题和错误处理,确保程序的稳定性和可靠性。

相关专题

更多
c语言中null和NULL的区别
c语言中null和NULL的区别

c语言中null和NULL的区别是:null是C语言中的一个宏定义,通常用来表示一个空指针,可以用于初始化指针变量,或者在条件语句中判断指针是否为空;NULL是C语言中的一个预定义常量,通常用来表示一个空值,用于表示一个空的指针、空的指针数组或者空的结构体指针。

231

2023.09.22

java中null的用法
java中null的用法

在Java中,null表示一个引用类型的变量不指向任何对象。可以将null赋值给任何引用类型的变量,包括类、接口、数组、字符串等。想了解更多null的相关内容,可以阅读本专题下面的文章。

435

2024.03.01

scripterror怎么解决
scripterror怎么解决

scripterror的解决办法有检查语法、文件路径、检查网络连接、浏览器兼容性、使用try-catch语句、使用开发者工具进行调试、更新浏览器和JavaScript库或寻求专业帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

187

2023.10.18

500error怎么解决
500error怎么解决

500error的解决办法有检查服务器日志、检查代码、检查服务器配置、更新软件版本、重新启动服务、调试代码和寻求帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

271

2023.10.25

Java 桌面应用开发(JavaFX 实战)
Java 桌面应用开发(JavaFX 实战)

本专题系统讲解 Java 在桌面应用开发领域的实战应用,重点围绕 JavaFX 框架,涵盖界面布局、控件使用、事件处理、FXML、样式美化(CSS)、多线程与UI响应优化,以及桌面应用的打包与发布。通过完整示例项目,帮助学习者掌握 使用 Java 构建现代化、跨平台桌面应用程序的核心能力。

34

2026.01.14

php与html混编教程大全
php与html混编教程大全

本专题整合了php和html混编相关教程,阅读专题下面的文章了解更多详细内容。

14

2026.01.13

PHP 高性能
PHP 高性能

本专题整合了PHP高性能相关教程大全,阅读专题下面的文章了解更多详细内容。

33

2026.01.13

MySQL数据库报错常见问题及解决方法大全
MySQL数据库报错常见问题及解决方法大全

本专题整合了MySQL数据库报错常见问题及解决方法,阅读专题下面的文章了解更多详细内容。

18

2026.01.13

PHP 文件上传
PHP 文件上传

本专题整合了PHP实现文件上传相关教程,阅读专题下面的文章了解更多详细内容。

12

2026.01.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
快速入门Node.JS全套完整版
快速入门Node.JS全套完整版

共83课时 | 8.2万人学习

nodejs开发基础教程
nodejs开发基础教程

共15课时 | 4.5万人学习

JavaScript设计模式视频教程
JavaScript设计模式视频教程

共28课时 | 5.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号