内存分配失败-JavaScript堆内存不足

问题描述 投票:0回答:1

我在Node.js代码中遇到内存泄漏问题。我正在尝试流式读取具有10万行的CSV(链接中的示例文件)文件,并处理文件中的每个条目。一段时间后,该进程因内存分配错误而发生。

“致命错误:接近堆限制的无效标记压缩分配失败-JavaScript堆内存不足”

sample csv:

我的代码示例

const fs = require('fs');
const config = require('../config/config');
const csv = require('csv-parser');
const tls = require('../services/tls');

processCSV('crawler', 'sample-csv.csv');

具有10万个条目的流处理csv文件

async function processCSV (jobName, fileName) {
  return new Promise((resolve, reject) => {
    let filePath = config.api.basePath + fileName;
    fs.createReadStream(filePath)
        .on('error', () => {
          // handle error
          console.log('error processing csv');
          reject();

        })
        .pipe(csv())
        .on('data', (row) => {
          createJob(jobName, row);
        })
        .on('end', () => {
          // handle end of CSV
          console.log('Finished processing csv');
          resolve(filePath);
        })
  });
}

验证csv文件中的每个URL

async function createJob (name, data) {
  let {hostname, port, ip} = data;
  let protocol = 'https';
  if (port === 80) {
    protocol = 'http';
  }
  let url = protocol + '://' + hostname;
  try {
    await tls.getHostData(url); // call an external api to get details of hostname
    return url;
  } catch (error) {
    return error;
  }
}

我不知道哪个函数导致内存泄漏。

node.js csv memory-leaks stream
1个回答
1
投票

在我看来,您正在为CSV文件中的每一行调用createJob(),并且您可能正在立即使这些作业中的每一项都在处理中并在内存中。这会耗尽系统资源,尤其是在文件中有很多行的情况下。

解决此问题的一个方法是调整代码,以便同时仅进行N个createJob()操作。这是一种方法,当您同时达到飞行中的最大请求数时暂停流,然后在有更多空间时恢复它:

async function processCSV (jobName, fileName) {
  return new Promise((resolve, reject) => {
    let filePath = config.api.basePath + fileName;
    let numConcurrent = 0;
    let paused = false;
    const maxConcurrent = 10;
    let stream = fs.createReadStream(filePath)
        .on('error', (err) => {
          // handle error
          console.log('error processing csv');
          reject(err);

        })
        .pipe(csv())
        .on('data', (row) => {

          function checkResume() {
              --numConcurrent;
              if (paused && numConcurrent < maxConcurrent) {
                  // restart the stream, there's room for more
                  paused = false;
                  stream.resume();
              }
          }
          ++numConcurrent;
          createJob(jobName, row).then(checkResume, checkResume);
          if (numConcurrent >= maxConcurrent) {
              // pause the stream because we have max number of operations going
              stream.pause();
              paused = true;
          }
        })
        .on('end', () => {
          // handle end of CSV
          console.log('Finished processing csv');
          resolve(filePath);
        })
  });
}


async function createJob (name, data) {
  let {hostname, port, ip} = data;
  let protocol = 'https';
  if (port === 80) {
    protocol = 'http';
  }
  let url = protocol + '://' + hostname;
  try {
    await tls.getHostData(url); // call an external api to get details of hostname
    return url;
  } catch (error) {
    // make sure returned promise is rejected
    throw error;
  }
}

注意:如果在处理给定行时出错,则此实现(就像您在问题中显示的那样)继续进行。该行为可以根据需要进行更改。

© www.soinside.com 2019 - 2024. All rights reserved.