Postgresql + psycopg:使用POSTGRESQL函数调用批量插入大数据

问题描述 投票:0回答:3

我正在处理大量非常简单的数据(点云)。我想使用 Python 将此数据插入到 Postgresql 数据库中的一个简单表中。

我需要执行的insert语句示例如下:

INSERT INTO points_postgis (id_scan, scandist, pt) VALUES (1, 32.656,  **ST_MakePoint**(1.1, 2.2, 3.3));

注意 INSERT 语句中对 Postgresql 函数 ST_MakePoint 的调用。

我必须调用这数十亿次(是的,数十亿次),所以显然我必须以更优化的方式将数据插入到 Postgresql 中。正如本文以非常好的且信息丰富的方式呈现的那样,有许多策略可以批量插入数据(insertmany、copy 等)。 https://hakibenita.com/fast-load-data-python-postgresql

但是没有示例显示当您需要调用服务器端的函数时如何执行这些插入。我的问题是:当我需要使用 psycopg 调用 Postgresql 数据库服务器端的函数时,如何批量插入数据?

非常感谢任何帮助!谢谢!


请注意,使用 CSV 没有多大意义,因为我的数据很大。 或者,我已经尝试使用 ST_MakePoint 函数的 3 个输入的简单列填充临时表,然后在所有数据都进入该临时函数后,调用 INSERT/SELECT。问题是这需要大量时间,而且我需要的磁盘空间量是无意义的。

postgresql psycopg2
3个回答
1
投票

使用生成带有校验位的 UPC A 条形码的函数的一些简单示例:

  1. 使用execute_batch

    execute_batch
    page_size
    参数,允许您使用多行语句批量插入。默认情况下,此设置为
    100
    ,一次将插入 100 行。您可以提高此值以减少与服务器的往返次数。

  2. 仅使用

    execute
    并从另一个表中选择数据。

import psycopg2
from psycopg2.extras import execute_batch

con = psycopg2.connect(dbname='test', host='localhost', user='postgres', 
port=5432)
cur = con.cursor()

cur.execute('create table import_test(id integer, suffix_val varchar, upca_val 
varchar)')
con.commit()

# Input data as a list of tuples. Means some data is duplicated.
input_list = [(1, '12345', '12345'), (2, '45278', '45278'), (3, '61289', 
'61289')]
execute_batch(cur, 'insert into import_test values(%s, %s, 
upc_check_digit(%s))', input_list)
con.commit()

select * from import_test ;
 id | suffix_val |   upca_val   
----+------------+--------------
  1 | 12345      | 744835123458
  2 | 45278      | 744835452787
  3 | 61289      | 744835612891

# Input data as list of dicts and using named parameters to avoid duplicating data.
input_list_dict = [{'id': 50, 'suffix_val': '12345'}, {'id': 51, 'suffix_val': '45278'}, {'id': 52, 'suffix_val': '61289'}]  


execute_batch(cur, 'insert into import_test values(%(id)s, 
%(suffix_val)s, upc_check_digit(%(suffix_val)s))', input_list_dict)
con.commit()

select * from import_test ;
 id | suffix_val |   upca_val   
----+------------+--------------
  1 | 12345      | 744835123458
  2 | 45278      | 744835452787
  3 | 61289      | 744835612891
 50 | 12345      | 744835123458
 51 | 45278      | 744835452787
 52 | 61289      | 744835612891



# Create a table with values to be used for inserting into final table
cur.execute('create table input_vals (id integer, suffix_val varchar)')
con.commit()

execute_batch(cur, 'insert into input_vals values(%s, %s)', [(100, '76234'), 
(101, '92348'), (102, '16235')])
con.commit()
cur.execute('insert into import_test select id, suffix_val, 
upc_check_digit(suffix_val) from input_vals')
con.commit()

 select * from import_test ;
  id   | suffix_val |   upca_val   
-------+------------+--------------
     1 | 12345      | 744835123458
     2 | 45278      | 744835452787
     3 | 61289      | 744835612891
 12345 | 12345      | 744835123458
 45278 | 45278      | 744835452787
 61289 | 61289      | 744835612891
   100 | 76234      | 744835762343
   101 | 92348      | 744835923485
   102 | 16235      | 744835162358


0
投票

为了在合理的时间内以最小的努力完成此任务,最重要的是将此任务分解为多个组成部分,以便您可以单独利用不同的 Postgres 功能。

首先,您需要先创建减去几何变换的表格。如:

create table temp_table (
    id_scan bigint, 
    scandist numeric, 
    pt_1 numeric, 
    pt_2 numeric, 
    pt_3 numeric
);

由于我们不添加任何索引和约束,这很可能是将“原始”数据获取到 RDBMS 的最快方法。

最好的方法是使用 COPY 方法,您可以直接从 Postgres 使用该方法(如果您有足够的访问权限),也可以通过 Python 接口使用 https://www.psycopg.org/docs/cursor .html#cursor.copy_expert

这是实现此目的的示例代码:

iconn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(target_host, target_usr, target_db, target_pw, "require")
iconn = psycopg2.connect(iconn_string)
import_cursor = iconn.cursor()
csv_filename = '/path/to/my_file.csv'
copy_sql = "COPY temp_table (id_scan, scandist, pt_1, pt_2, pt_3) FROM STDIN WITH CSV HEADER DELIMITER ',' QUOTE '\"' ESCAPE '\\' NULL AS 'null'"
with open(csv_filename, mode='r', encoding='utf-8', errors='ignore') as csv_file:
    import_cursor.copy_expert(copy_sql, csv_file)
iconn.commit()

下一步将是根据现有的原始数据高效地创建您想要的表。然后,您将能够使用单个 SQL 语句创建实际的目标表,并让 RDBMS 发挥其魔力。

一旦数据位于 RDBMS 中,对其进行一些优化并添加一个或两个索引(如果适用)是有意义的(最好是主索引或唯一索引以加速转换)

这将取决于您的数据/用例,但这样的事情应该有所帮助:

alter table temp_table add primary key (id_scan); --if unique
-- or
create index idx_temp_table_1 on temp_table(id_scan); --if not unique

要将数据从原始表移动到目标表中:

with temp_t as (
    select id_scan, scandist, ST_MakePoint(pt_1, pt_2, pt_3) as pt from temp_table
)
INSERT INTO points_postgis (id_scan, scandist, pt)
SELECT temp_t.id_scan, temp_t.scandist, temp_t.pt
FROM temp_t;

这将一次性选择上一个表中的所有数据并对其进行转换。

您可以使用的第二个选项类似。您可以直接将所有原始数据加载到points_postgis,同时将其分为3个临时列。然后使用

alter table points_postgis add column pt geometry;
并进行更新,并删除临时列:
update points_postgis set pt = ST_MakePoint(pt_1, pt_2, pt_3);
&
alter table points_postgis drop column pt_1, drop column pt_2, drop column pt_3;

主要的收获是,最有效的选择是不要专注于最终的决赛桌状态,而是将其分解为容易实现的块。 Postgres 将轻松处理数十亿行的导入以及之后的转换。


0
投票

最近使用postgres

copy
命令插入2000万条记录,只用了不到1分钟。您可以阅读我写的这篇文章,以便更好地理解。

你只需在Python代码中使用执行命令即可。

https://www.linkedin.com/pulse/optimizing-postgresql-large-scale-data-insertions-from-sagore-sarker-omwlc/?trackingId=zhAS1TudQ2ae8s7I09L34Q%3D%3D

© www.soinside.com 2019 - 2024. All rights reserved.