Knex.js 内存优化 - 从 csv 读取数据

问题描述 投票:0回答:1

这就是我目前正在做的事情,将数据从大约 2,000,000 行的 csv 文件插入到 sqlite 数据库中。有没有更有效的内存方法?当我尝试在 .on('data' ...) 中使用 knex.js 插入数据时,我在处理它们的过程中遇到各种错误。

async function insertData(filePath: string, tableName: string) {
  const data: CsvRow[] = [];

  await new Promise<void>((resolve, reject) => {
    fs.createReadStream(filePath)
      .pipe(fastcsv.parse({ headers: true }))
      .on("data", (row: CsvRow) => {
        data.push(row);
      })
      .on("end", () => {
        // Insert any remaining rows in the buffer
        console.log("BATCH INSERTING DATA");

        db.batchInsert(tableName, data, 100)
          .then(() => {
            console.log("Batch Insert Complete!");
            resolve();
          })
          .catch((error) => {
            console.log("Error in batch insert: ", error);
            reject(error);
          });

      })
      .on("error", (error) => {
        console.log("Error in reading csv: ", error);
        reject(error);
      });
  });
}

我尝试在 .on('data' ...) 中调用 insert,但出现错误,我认为这是由于并发问题造成的,但不确定

javascript node.js sqlite knex.js
1个回答
0
投票

您是否正在申请某些 yc 初创公司,这是一个带回家的编码挑战吗? XD 我正在尝试解决同样的问题。然而,我开始认为 knex 在 sqlite3 连接方面可能存在错误,因为池中只有 1 个打开的连接。到目前为止,我已经尝试创建一个队列,确保批次一个接一个地进行,在它们之间添加延迟,因为我的批次无法找到导致它们中止的连接,从而为 knex 连接添加更多超时配置,甚至编辑用于batchInsert的node_module文件,因为几年前堆栈溢出的某人说这是由于1毫秒的延迟。

const insertData = async (
  csvFilePath: string,
  tableName: string,
  knex: Knex,
  batchSize: number = 100
): Promise<void> => {
  const stream = fs.createReadStream(csvFilePath);
  const csvStream = fastcsv.parse({ headers: true });
  const csvData: any[] = [];

  // Pipe the CSV data into the parser
  stream.pipe(csvStream);

  csvStream
    .on("data", async (data: any) => {
      // Add the data to the batch
      csvData.push(data);
      // If the batch size is reached, insert the batch into the database
      if (csvData.length >= batchSize) {
        const batch = csvData.splice(0, batchSize);
        await addToBuffer({ tableName, batch, knex });
      }
    })
    .on("end", async () => {
      // Insert any remaining rows
      if (csvData.length > 0) {
        await addToBuffer({ tableName, csvData, knex });
      }
    })
    .on("error", (error) => {
      console.error(`Error parsing CSV file ${csvFilePath}:`, error);
      throw error;
    });

  return new Promise((resolve, reject) => {
    csvStream.on("end", resolve);
    csvStream.on("error", reject);
  });
};

不幸的是,我被困在同一个领域,所以我无法提供太多建议。如果您有任何想法请告诉我!我不能将此作为评论,因为这是一个新帐户。

© www.soinside.com 2019 - 2024. All rights reserved.