从流读取时数组不会在 JavaScript 中重置

问题描述 投票:0回答:1

我正在尝试插入来自 MongoDB 中远程 CSV 文件的数据(使用 Mongoose)。 我想一次批量插入 100 个项目。

这是我的代码:

import csv from 'csv-parser'
import fetch from 'node-fetch'
import { Product } from '../models/Product'

export async function handleCSVProcessing(targetCsv: string) {
  const batchSize = 100

  try {
    const response = await fetch(targetCsv, {
      method: 'get',
      headers: {
        'content-type': 'text/csv;charset=UTF-8',
      }
    })
    if (!response || !response.ok || !response.body) {
      throw new Error(`Failed to fetch CSV file: ${response.statusText}`)
    }

    const productsToUpsert: any[] = []

    response.body
      .pipe(csv())
      .on('data', async (row: any) => {
 
        productsToUpsert.push({
          updateOne: {
            filter: { name: row['Product Name'] },
            update: { name: row['Product Name'], price: row['Price'] },
            upsert: true,
          },
        })

        if (productsToUpsert.length >= batchSize) {
          console.log('productsToUpsert.length: ' + productsToUpsert.length)  // this keeps growing after 100! 101, 102, 103...
          await Product.bulkWrite(productsToUpsert);
          productsToUpsert.length = 0  // theoretically this should empty the array
        }
      })
      .on('end', async () => {
        if (productsToUpsert.length > 0) {
          await Product.bulkWrite(productsToUpsert);
        }
        console.log('CSV file successfully processed and products upserted.');
      })
      .on('error', (error: any) => {
        console.error('Error processing CSV:', error);
      });
  } catch (error) {
    console.error('Error fetching CSV:', error);
  }
}

我希望,一旦将 100 个项目添加到

productsToUpsert
,这些项目就会插入到数据库中,数组会被清空,然后重新开始,直到再次填充 100 个项目(CSV 文件有数千行)。

然而,我的

console.log
显示数组的长度在100之后继续增长。

我在这里做错了什么?我也尝试切换到使用

let
而不是
const
,然后执行
productsToUpsert = []
,但结果仍然相同。

是否可能是我错误地处理了

async
await
内容,导致数组以某种方式增长,同时发生了其他事情(即数据库插入)?或者也许我处理流的方式是错误的。

node.js arrays typescript mongodb stream
1个回答
0
投票

要使用异步函数进行数据处理,您需要运行流方法。

const stream = response.body

stream
 .pipe(csv())
 .on('data', async (row: any) => {
   stream.pause();
   ...your async logic
   stream.resuem();     
}
© www.soinside.com 2019 - 2024. All rights reserved.