0

0

ExpressJs中并发处理异步任务并等待所有Promise完成

碧海醫心

碧海醫心

发布时间:2025-09-17 11:40:51

|

991人浏览过

|

来源于php中文网

原创

expressjs中并发处理异步任务并等待所有promise完成

本文旨在探讨在ExpressJs应用中如何高效地并发执行多个异步任务,并确保所有Promise都已完成处理后再向客户端发送响应。我们将重点介绍async/await语法与Promise.all()的结合使用,优化异步代码的可读性和健壮性,同时提供错误处理的最佳实践,以确保API行为符合预期。

异步操作与ExpressJs响应机制

在Node.js和ExpressJs开发中,处理I/O密集型或网络请求等异步操作是常态。当一个API请求需要触发多个独立的异步任务(例如,并发请求外部服务并写入文件)时,我们通常希望在所有这些任务都完成后再向客户端返回最终结果或确认信息。然而,如果不正确地管理这些异步操作,服务器可能会在所有任务完成之前就发送响应,导致数据不一致或客户端获取到不完整的信息。

问题分析:为何await Promise.all()未生效

开发者在使用async/await和Promise.all()时,常遇到的一个核心问题是,尽管代码中包含了await Promise.all(tasks);,但Express路由处理函数似乎并未等待所有任务完成。这通常是由于以下两个关键点被忽略:

  1. Express路由处理函数必须标记为async: await关键字只能在async函数内部使用。如果Express的路由处理函数(例如app.post('/', (req, res) => { ... }))没有被标记为async,那么其中的await语句将不会真正暂停函数的执行,而是会直接被解析为一个普通的表达式,导致后续代码立即执行。
  2. Promise的正确返回与错误传播: 确保所有被Promise.all()聚合的Promise都能正确地返回(resolve)或拒绝(reject),并且错误能够被有效地传播。原始的processTask函数在某些情况下可能没有正确地拒绝Promise,或者在fs.writeFile的回调中没有处理错误,导致Promise链断裂或无法被Promise.all()捕获。

解决方案:使用async/await重构异步逻辑

为了确保Express路由能够正确等待所有并发的Promise完成,我们需要对代码进行两方面的优化:

1. 优化 processTask 函数

原始的processTask函数使用了new Promise构造函数和嵌套的.then().catch(),这在现代JavaScript中通常可以通过async/await来简化。同时,需要确保文件写入操作的错误也能被捕获并拒绝Promise。

原始 processTask 示例(问题中的第一版):

function processTask(task: Task, configs: Configs) {
  return new Promise((resolve, reject) => {
    try {
      const fileName = './output/' + task.tag + 's.json';
      fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
        method: 'GET'
      }).then(result => {
        result.json().then(jsonResult => {
          fs.writeFile(fileName, JSON.stringify(jsonResult), function () { // 缺少错误处理
            console.log('finished writing :' + fileName);
            resolve();
          });
        }).catch(err => reject(err));
      }).catch(err => reject(err));
    } catch (err) {
      console.log(err); // 这里的错误不会拒绝外部Promise
    }
  });
}

优化后的 processTask 函数:

使用async/await和fs.promises模块可以大大简化代码,并提供更清晰的错误处理机制。

import * as fs from 'fs/promises'; // 导入fs.promises

async function processTask(task: Task, configs: Configs): Promise {
  try {
    const fileName = './output/' + task.tag + 's.json';

    // 使用 await 等待 fetch 请求完成
    const result = await fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
      method: 'GET'
    });

    // 使用 await 等待 JSON 解析完成
    const jsonResult = await result.json();

    // 使用 fs.promises.writeFile 写入文件,它返回一个 Promise
    await fs.writeFile(fileName, JSON.stringify(jsonResult));

    console.log('finished writing :' + fileName);
  } catch (err) {
    // 捕获任何发生在 fetch、json解析或文件写入过程中的错误
    console.error(`Error processing task ${task.tag}:`, err);
    // 重新抛出错误,以便 Promise.all 能够捕获到它
    throw err; 
  }
}

注意事项:

CodeBuddy
CodeBuddy

腾讯云AI代码助手

下载
  • async函数默认返回一个Promise。当函数正常执行完毕时,Promise会以undefined(如果函数没有明确return值)或return的值来resolve。
  • 当async函数内部抛出错误时,它返回的Promise会自动reject。
  • 我们使用了fs.promises模块,它提供了Promise版本的Node.js文件系统API,避免了回调地狱。

2. 优化 Express 路由处理函数

Express路由处理函数必须被标记为async,才能在其内部正确使用await。

原始 Express 路由处理函数示例(问题中的第二版):

app.post('/',  (req: Request, res: Response) => { // 缺少 async 关键字
  const tasksRequest = req.body as TasksRequest;
  let tasks = []

  tasks = tasksRequest.tasks.map( (t) =>  processTask(t, tasksRequest.configs));

  console.log(tasks);
  Promise.all(tasks).then(res=>{ // 缺少 await
    console.log('After awaiting');
  });
});

优化后的 Express 路由处理函数:

import { Request, Response } from 'express'; // 假设类型定义

app.post('/', async (req: Request, res: Response) => { // 关键:添加 async 关键字
  const tasksRequest = req.body as TasksRequest;
  let tasks: Promise[] = []; // 明确 Promise 类型

  try {
    tasks = tasksRequest.tasks.map((t) => processTask(t, tasksRequest.configs));

    console.log('Starting all tasks...');
    // 关键:使用 await Promise.all() 等待所有 Promise 完成
    await Promise.all(tasks); 
    console.log('After awaiting all tasks.');

    // 所有任务完成后,发送成功响应
    res.status(200).json({ message: 'All tasks processed successfully.' });

  } catch (error) {
    console.error('An error occurred during task processing:', error);
    // 如果任何一个 Promise 拒绝,Promise.all 会立即拒绝
    // 在这里发送错误响应
    res.status(500).json({ message: 'Failed to process some tasks.', error: error.message });
  }
});

注意事项:

  • app.post('/', async (req, res) => { ... })是确保await在路由处理函数中生效的关键。
  • await Promise.all(tasks);会暂停当前async函数的执行,直到tasks数组中的所有Promise都成功解决,或者其中任何一个Promise被拒绝。
  • 当Promise.all()中的任何一个Promise被拒绝时,Promise.all()自身也会立即拒绝,并抛出第一个拒绝的原因。因此,使用try...catch块来捕获潜在的错误并向客户端发送适当的错误响应至关重要。

完整示例代码

结合上述优化,一个完整的、健壮的Express路由处理并发异步任务的示例如下:

import express, { Request, Response } from 'express';
import * as fs from 'fs/promises'; // 导入fs.promises

const app = express();
app.use(express.json()); // 用于解析请求体

// 假设的类型定义
interface Task {
  tag: string;
  parentResource: string;
  mostRelatedPath: string;
}

interface Configs {
  Host: string;
  APIsBasePrefix: string;
}

interface TasksRequest {
  tasks: Task[];
  configs: Configs;
}

// 异步处理单个任务的函数
async function processTask(task: Task, configs: Configs): Promise {
  try {
    const fileName = `./output/${task.tag}s.json`; // 使用模板字符串更简洁

    // 模拟外部 API 请求
    const result = await fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
      method: 'GET'
    });

    if (!result.ok) {
      throw new Error(`HTTP error! status: ${result.status}`);
    }

    const jsonResult = await result.json();

    // 确保 output 目录存在
    const outputDir = './output';
    await fs.mkdir(outputDir, { recursive: true });

    // 写入文件
    await fs.writeFile(fileName, JSON.stringify(jsonResult, null, 2)); // 美化JSON输出

    console.log(`Finished writing: ${fileName}`);
  } catch (err) {
    console.error(`Error processing task ${task.tag}:`, err);
    // 重新抛出错误,让调用者(Promise.all)能够捕获
    throw err; 
  }
}

// Express POST 路由处理函数
app.post('/', async (req: Request, res: Response) => {
  const tasksRequest = req.body as TasksRequest;

  if (!tasksRequest || !tasksRequest.tasks || !Array.isArray(tasksRequest.tasks) || !tasksRequest.configs) {
    return res.status(400).json({ message: 'Invalid request body.' });
  }

  const tasksPromises: Promise[] = [];

  try {
    // 为每个任务创建并收集 Promise
    for (const t of tasksRequest.tasks) {
      tasksPromises.push(processTask(t, tasksRequest.configs));
    }

    console.log(`Processing ${tasksPromises.length} tasks concurrently...`);
    // 等待所有任务 Promise 完成
    await Promise.all(tasksPromises);
    console.log('All tasks completed successfully.');

    // 所有任务成功完成,发送成功响应
    res.status(200).json({ message: 'All tasks processed successfully.' });

  } catch (error: any) {
    // 捕获 Promise.all 中任何一个任务的错误
    console.error('An error occurred during concurrent task processing:', error);
    res.status(500).json({ 
      message: 'Failed to process some tasks.', 
      error: error.message,
      details: error.stack // 生产环境不建议直接暴露堆栈信息
    });
  }
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server running on port ${PORT}`);
});

// 示例用法 (假设在其他地方调用此API)
// curl -X POST -H "Content-Type: application/json" -d '{
//   "tasks": [
//     {"tag": "user", "parentResource": "/api/v1/", "mostRelatedPath": "users"},
//     {"tag": "product", "parentResource": "/api/v1/", "mostRelatedPath": "products"}
//   ],
//   "configs": {
//     "Host": "https://jsonplaceholder.typicode.com",
//     "APIsBasePrefix": "/"
//   }
// }' http://localhost:3000/

总结与最佳实践

在ExpressJs中处理并发异步任务并确保所有Promise完成,核心在于正确利用JavaScript的async/await语法和Promise.all()方法:

  1. 标记async路由处理函数: 任何包含await关键字的Express路由处理函数都必须用async关键字标记。
  2. 优化异步函数: 将复杂的.then().catch()链重构为更简洁、更易读的async/await模式。
  3. 使用Promise.all()并发执行: 对于多个相互独立的异步任务,使用Promise.all()可以高效地并发执行它们,并等待所有任务完成。
  4. 健壮的错误处理: 在async函数内部使用try...catch捕获并传播错误。在Express路由处理函数中,使用try...catch包裹await Promise.all(),以便在任何一个并发任务失败时能够捕获错误并向客户端发送适当的错误响应(例如500 Internal Server Error)。
  5. 利用Promise-based API: 优先使用返回Promise的API(如fetch、fs.promises),而不是基于回调的API,以更好地融入async/await生态。

通过遵循这些实践,开发者可以构建出更稳定、更易维护的ExpressJs应用,有效管理复杂的异步流程。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
scripterror怎么解决
scripterror怎么解决

scripterror的解决办法有检查语法、文件路径、检查网络连接、浏览器兼容性、使用try-catch语句、使用开发者工具进行调试、更新浏览器和JavaScript库或寻求专业帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

228

2023.10.18

500error怎么解决
500error怎么解决

500error的解决办法有检查服务器日志、检查代码、检查服务器配置、更新软件版本、重新启动服务、调试代码和寻求帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

297

2023.10.25

scripterror怎么解决
scripterror怎么解决

scripterror的解决办法有检查语法、文件路径、检查网络连接、浏览器兼容性、使用try-catch语句、使用开发者工具进行调试、更新浏览器和JavaScript库或寻求专业帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

228

2023.10.18

500error怎么解决
500error怎么解决

500error的解决办法有检查服务器日志、检查代码、检查服务器配置、更新软件版本、重新启动服务、调试代码和寻求帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

297

2023.10.25

js正则表达式
js正则表达式

php中文网为大家提供各种js正则表达式语法大全以及各种js正则表达式使用的方法,还有更多js正则表达式的相关文章、相关下载、相关课程,供大家免费下载体验。

514

2023.06.20

js获取当前时间
js获取当前时间

JS全称JavaScript,是一种具有函数优先的轻量级,解释型或即时编译型的编程语言;它是一种属于网络的高级脚本语言,主要用于Web,常用来为网页添加各式各样的动态功能。js怎么获取当前时间呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

244

2023.07.28

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

298

2023.08.03

js是什么意思
js是什么意思

JS是JavaScript的缩写,它是一种广泛应用于网页开发的脚本语言。JavaScript是一种解释性的、基于对象和事件驱动的编程语言,通常用于为网页增加交互性和动态性。它可以在网页上实现复杂的功能和效果,如表单验证、页面元素操作、动画效果、数据交互等。

5306

2023.08.17

clawdbot ai使用教程 保姆级clawdbot部署安装手册
clawdbot ai使用教程 保姆级clawdbot部署安装手册

Clawdbot是一个“有灵魂”的AI助手,可以帮用户清空收件箱、发送电子邮件、管理日历、办理航班值机等等,并且可以接入用户常用的任何聊天APP,所有的操作均可通过WhatsApp、Telegram等平台完成,用户只需通过对话,就能操控设备自动执行各类任务。

14

2026.01.29

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 4.3万人学习

TypeScript 教程
TypeScript 教程

共19课时 | 2.5万人学习

Bootstrap 5教程
Bootstrap 5教程

共46课时 | 3.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号