我的数组将有接近 500k 行。对数据进行去earls化后,我在nodejs中的数组看起来像这样
const bulkData = [
[1234, '[email protected]', 'tokyo'],
[2345, '[email protected]', 'london'],
[1243, '[email protected]', 'paris'],
...
...
]
其余变量是:
const tableName = 'my_test_table'
const columnNamesString = 'user_id, email, city'
// there are other columns in the table that will have default values so i don't need to insert data into all columns. only the above 3
我的
COPY
代码不起作用。我收到 Connection: Terminated
错误,但该错误不包含任何其他信息。这是完整的错误
Error: Connection terminated
at Connection.<anonymous> (D:\repos\hfscripts\node_modules\pg\lib\client.js:132:36)
at Object.onceWrapper (node:events:627:28)
at Connection.emit (node:events:525:35)
at Connection.emit (node:domain:489:12)
at Socket.<anonymous> (D:\repos\hfscripts\node_modules\pg\lib\connection.js:63:12)
at Socket.emit (node:events:525:35)
at Socket.emit (node:domain:489:12)
at TCP.<anonymous> (node:net:313:12)
我的代码是:
import { Client } from 'pg';
import { from as copyFrom } from 'pg-copy-streams';
const pgClient = new Client(config);
// connection works. i have tested it. 'config' contains key-value paris needed for connection
const queryText = `COPY ${tableName}(${columnNameString}) FROM STDIN WITH CSV HEADER DELIMITER ','`;
console.log(queryText)
const stream = pgClient.query(copyFrom(queryText));
stream.on('finish', () => {
console.log('Bulk insert completed');
});
stream.on('error', (err) => {
console.error(
`ERROR during Bulk insert`
);
throw err;
});
bulkData.forEach((rowData) => {
const csvRow = `${rowData.join(',')}`;
console.log(csvRow);
stream.write(csvRow);
});
当我对查询和数据执行
console.log()
时,我得到了这个
COPY my_test_table(user_id,email,city) FROM STDIN WITH CSV HEADER DELIMITER ','
1234,'[email protected]','tokyo'
知道如何解决这个问题吗?我的替代方法是将整个数组复制到 .csv 中,然后创建一个 readStream 并插入数据。但这似乎是一个开销
感谢@adrian-klaver 提供的解决方案。我也需要流式传输字符串。
这就是我所做的:
将我的bulkData转换为csvString,比如
bulkDataCsvString
为此字符串创建一个可读流
import { Readable } from 'node:stream';
const src = new Readable();
src.readable = true;
src.push(bulkDataCsvString);
src.push(null);
src.pipe(stream) // stream is the name of the PostgreSQL query
// rest of the code as is. BUT remove the forEach code. its not needed