我正在尝试插入来自 MongoDB 中远程 CSV 文件的数据(使用 Mongoose)。 我想一次批量插入 100 个项目。
这是我的代码:
import csv from 'csv-parser'
import fetch from 'node-fetch'
import { Product } from '../models/Product'
export async function handleCSVProcessing(targetCsv: string) {
const batchSize = 100
try {
const response = await fetch(targetCsv, {
method: 'get',
headers: {
'content-type': 'text/csv;charset=UTF-8',
}
})
if (!response || !response.ok || !response.body) {
throw new Error(`Failed to fetch CSV file: ${response.statusText}`)
}
const productsToUpsert: any[] = []
response.body
.pipe(csv())
.on('data', async (row: any) => {
productsToUpsert.push({
updateOne: {
filter: { name: row['Product Name'] },
update: { name: row['Product Name'], price: row['Price'] },
upsert: true,
},
})
if (productsToUpsert.length >= batchSize) {
console.log('productsToUpsert.length: ' + productsToUpsert.length) // this keeps growing after 100! 101, 102, 103...
await Product.bulkWrite(productsToUpsert);
productsToUpsert.length = 0 // theoretically this should empty the array
}
})
.on('end', async () => {
if (productsToUpsert.length > 0) {
await Product.bulkWrite(productsToUpsert);
}
console.log('CSV file successfully processed and products upserted.');
})
.on('error', (error: any) => {
console.error('Error processing CSV:', error);
});
} catch (error) {
console.error('Error fetching CSV:', error);
}
}
我希望,一旦将 100 个项目添加到
productsToUpsert
,这些项目就会插入到数据库中,数组会被清空,然后重新开始,直到再次填充 100 个项目(CSV 文件有数千行)。
然而,我的
console.log
显示数组的长度在100之后继续增长。
我在这里做错了什么?我也尝试切换到使用
let
而不是 const
,然后执行 productsToUpsert = []
,但结果仍然相同。
是否可能是我错误地处理了
async
和 await
内容,导致数组以某种方式增长,同时发生了其他事情(即数据库插入)?或者也许我处理流的方式是错误的。
要使用异步函数进行数据处理,您需要运行流方法。
const stream = response.body
stream
.pipe(csv())
.on('data', async (row: any) => {
stream.pause();
...your async logic
stream.resuem();
}