我正在使用 NestJS v9、fast-csv v4 和 BigQuery。
@Post('upload')
@ApiOperation({ description: 'Upload CSV File' })
@ApiConsumes('multipart/form-data')
@ApiBody({
schema: {
type: 'object',
properties: {
file: {
type: 'string',
format: 'binary',
},
},
},
})
@ApiResponse({
type: UploaderResponse,
status: 200,
})
@UseInterceptors(FileInterceptor('file', { fileFilter: fileFilter }))
@Auth(UserRole.Admin, UserRole.SuperAdmin)
async uploadFile(
@Body() uploadCsvDto: UploadCsvDto,
@UploadedFile() file: Express.Multer.File,
): Promise<UploaderResponse> {
if (!file) {
throw new HttpException(FileErrors.CSVFormat, HttpStatus.BAD_REQUEST);
}
return await this.filesService.uploadCsv(file, uploadCsvDto);
}
async uploadCsv(
fileData: Express.Multer.File,
uploadCsvDto: UploadCsvDto,
): Promise<UploaderResponse> {
memoryUsage();
await this.checkForDuplicates(uploadCsvDto);
memoryUsage();
// 2. Parse CSV
await this.batchCsvResults(fileData.buffer, uploadCsvDto);
}
async batchCsvResults(
buffer: Buffer,
uploadCsvDto: UploadCsvDto,
): Promise<void> {
const streamFromBuffer = Readable.from(buffer);
const stream = parseStream(streamFromBuffer, {
headers: (headers: string[]) => {
return headers.map((h: string) => {
if (this.checkCorrectColumn(h)) {
return VALID_CSV_COLUMNS.find(
(column: ValidColumn) => column.name === h.toLowerCase(),
).column;
} else {
console.log(`'${h}' no es una columna válida.`);
}
});
},
delimiter: ';',
encoding: 'utf8',
trim: true, // trim white spaces on columns only
});
let batch: DataRow[] = [];
// Promise for waitting all the batches to end
let resolvePromise: () => void;
const processingPromise: Promise<void> = new Promise<void>((resolve) => {
resolvePromise = resolve;
});
stream.on('data', async (row: DataRow) => {
batch.push(row);
if (batch.length === 200) {
stream.pause();
await this.processBatch(batch, uploadCsvDto)
.then(() => {
batch.length = 0;
stream.resume();
})
.catch((err) => {
console.error(err);
stream.destroy();
});
}
});
stream.on('end', async () => {
if (batch.length > 0) {
await this.processBatch(batch, uploadCsvDto)
.then(() => {
batch = null;
console.log('All batches have been processed');
})
.catch((err) => {
console.error(err);
});
} else {
console.log('All batches have been processed');
}
resolvePromise();
});
stream.on('error', (err) => {
console.error(err);
});
await processingPromise;
}
async mapChildRows(
data: DataRow[],
uploadCsvDto: UploadCsvDto,
): Promise<void> {
memoryUsage();
let formattedRows = [];
for (let i = 0; i < data.length; i++) {
// Convert object values to their type
const dataObject: any = data[i];
Object.keys(dataObject).forEach((key) => {
dataObject[key] = this.getChildColumnValue(key, dataObject[key]);
});
// Add dynamic fields
dataObject[DYNAMIC_CSV_COLUMNS.idClient] = uploadCsvDto.idClient;
dataObject[DYNAMIC_CSV_COLUMNS.subservice] = uploadCsvDto.subservice;
dataObject[DYNAMIC_CSV_COLUMNS.subtask] = uploadCsvDto.subtask;
// Add actual day to uploaded file. We have an extra space at the end of the array, because
// they do not send us the upload_date, its a fixed column but a dynamic value added by us
dataObject[DYNAMIC_CSV_COLUMNS.upload_date] = actualDate();
formattedRows.push(dataObject);
}
memoryUsage();
await this.insertIntoCustomAnalyticsBigQuery(formattedRows);
data = null;
formattedRows = null;
}
async insertIntoCustomAnalyticsBigQuery(rows: DataRow[]) {
try {
memoryUsage();
await this.bigQueryClient.dataset(DB_NAME).table(DATA_TABLE).insert(rows);
console.log(`Inserted ${rows.length} rows`);
} catch (ex) {
console.log(JSON.stringify(ex));
throw new HttpException(
'Error inserting into Data table from CSV.',
HttpStatus.BAD_REQUEST,
);
}
}
问题: 这适用于 50.000 行的文件,但更高的文件会给我带来内存问题:
我不明白为什么,当我不需要时清空所有变量。
我的节点服务器还告诉我,我的最大内存大小是 512MB。
我的内存检查功能:
export function memoryUsage(): void {
return console.log(
`APP is using ${
Math.round((process.memoryUsage().rss / 1024 / 1024) * 100) / 100
} MB of memory.`,
);
}