nodejs typescript PostgreSQL 使用 COPY 将数组批量插入到表中

问题描述 投票:0回答:1

我的数组将有接近 500k 行。对数据进行去earls化后,我在nodejs中的数组看起来像这样

const bulkData = [
    [1234, '[email protected]', 'tokyo'],
    [2345, '[email protected]', 'london'],
    [1243, '[email protected]', 'paris'],
    ...
    ...
]

其余变量是:

const tableName = 'my_test_table'
const columnNamesString = 'user_id, email, city'
// there are other columns in the table that will have default values so i don't need to insert data into all columns. only the above 3

我的

COPY
代码不起作用。我收到
Connection: Terminated
错误,但该错误不包含任何其他信息。这是完整的错误

Error: Connection terminated
    at Connection.<anonymous> (D:\repos\hfscripts\node_modules\pg\lib\client.js:132:36)
    at Object.onceWrapper (node:events:627:28)
    at Connection.emit (node:events:525:35)
    at Connection.emit (node:domain:489:12)
    at Socket.<anonymous> (D:\repos\hfscripts\node_modules\pg\lib\connection.js:63:12)
    at Socket.emit (node:events:525:35)
    at Socket.emit (node:domain:489:12)
    at TCP.<anonymous> (node:net:313:12)

我的代码是:

import { Client } from 'pg';
import { from as copyFrom } from 'pg-copy-streams';
const pgClient = new Client(config);
// connection works. i have tested it. 'config' contains key-value paris needed for connection

const queryText = `COPY ${tableName}(${columnNameString}) FROM STDIN WITH CSV HEADER DELIMITER ','`;
console.log(queryText)
const stream = pgClient.query(copyFrom(queryText));

stream.on('finish', () => {
  console.log('Bulk insert completed');
});

stream.on('error', (err) => {
  console.error(
    `ERROR during Bulk insert`
  );
  throw err;
});

bulkData.forEach((rowData) => {
  const csvRow = `${rowData.join(',')}`;
  console.log(csvRow);
  stream.write(csvRow);
});


当我对查询和数据执行

console.log()
时,我得到了这个

COPY my_test_table(user_id,email,city) FROM STDIN WITH CSV HEADER DELIMITER ','
1234,'[email protected]','tokyo'

知道如何解决这个问题吗?我的替代方法是将整个数组复制到 .csv 中,然后创建一个 readStream 并插入数据。但这似乎是一个开销

node.js typescript postgresql bulkinsert
1个回答
0
投票

感谢@adrian-klaver 提供的解决方案。我也需要流式传输字符串。

这就是我所做的:

  1. 将我的bulkData转换为csvString,比如

    bulkDataCsvString

  2. 为此字符串创建一个可读流

import { Readable } from 'node:stream';

const src = new Readable();
src.readable = true;
src.push(bulkDataCsvString);
src.push(null);

src.pipe(stream) // stream is the name of the PostgreSQL query

// rest of the code as is. BUT remove the forEach code. its not needed


© www.soinside.com 2019 - 2024. All rights reserved.