我正在尝试使用 django 中的 psycopg2 执行原始更新查询。代码如下:
from django.db import connection
from psycopg2 import sql
model_instances_to_update = []
for model_instance in models_queryset:
model_instances_to_update.append(
sql.Identifier(
f"({id},{col_1_value},{col_2_value},{col_3_value},{col_4_value})"
)
)
model_instance_query_values_string = sql.SQL(", ").join(model_instances_to_update)
model_instance_sql_params = {"updated_model_instance_values": model_instance_query_values_string}
model_instance_update_query = sql.SQL(
"""
UPDATE {table} AS model_table SET
col_1 = model_table_new.col_1,
col_2 = model_table_new.col_2,
col_3 = model_table_new.col_3,
col_4 = model_table_new.col_4
FROM (VALUES (%(updated_model_instance_values)s)) AS model_table_new(id, col_1, col_2, col_3, col_4)
WHERE model_table.id = model_table_new.id;
"""
).format(
table=sql.Identifier("table_name"),
)
with connection.cursor() as cursor:
cursor.execute(
model_instance_update_query,
params=model_instance_sql_params,
)
但是当我尝试执行此查询时,出现以下错误:
TypeError: "object of type 'Composed' has no len()"
我在这里做错了什么以及如何纠正它?
更新 我现在直接在cursor.execute 中传递元组列表。我已在 SQL 对象字符串中添加了占位符,但仍然遇到相同的错误。
for model_instance in models_queryset:
model_instances_to_update.append(
(
id,
col_1_value,
col_2_value,
col_3_value,
col_4_value
)
)
colList = ["id", "col_1", "col_2", "col_3", "col_4"]
model_instance_update_query = sql.SQL(
"""
UPDATE {table} AS model_table SET
col_1 = model_table_new.col_1,
col_2 = model_table_new.col_2,
col_3 = model_table_new.col_3,
col_4 = model_table_new.col_4
FROM (VALUES ({records_list_template})) AS model_table_new(id, col_1, col_2, col_3, col_4)
WHERE model_table.id = model_table_new.id;
"""
).format(
table=sql.Identifier("table_name"),
records_list_template=sql.SQL(",").join(
[sql.Placeholder()] * len(colList)
),
)
with connection.cursor() as cursor:
cursor.execute(
model_instance_update_query,
model_instances_to_update,
)
更新2
下面是从
execute
函数内部打印 sql 的输出。
Composed([SQL('\n UPDATE '),
Identifier('table_name'),
SQL(' AS model_table SET\n
col_2 = model_table_new.col_2,\n
col_3 = model_table_new.col_3,\n
col_4 = model_table_new.col_4,\n
col_5 = model_table_new.col_5\n
FROM (VALUES ('),
Composed([Placeholder(), SQL(', '), Placeholder(), SQL(', '),
Placeholder(), SQL(', '), Placeholder(), SQL(', '),
Placeholder()]),
SQL(')) AS
model_table_new(col_1, col_2, col_3, col_4, col_5)\n
WHERE model_table.id = model_table_new.id;\n ')])
我尝试执行
print(model_instance_update_query.as_string(connection))
,但收到此错误 - argument 2 must be connection or cursor
。这是因为我使用的是 django.db 中的连接对象,它是一个代理,而不是 psycopg2 中的连接对象。所以按照上面的方法尝试了。
根据@Adrian Klaver的评论,解决方案是使用直接psycopg2连接对象而不是django.db.connection。
from psycopg2 import sql
model_instances_to_update = []
for model_instance in models_queryset:
model_instances_to_update.append(
(
id,
col_1_value,
col_2_value,
col_3_value,
col_4_value
)
)
model_instance_update_query = sql.SQL(
"""
UPDATE {table} AS model_table SET
col_1 = model_table_new.col_1,
col_2 = model_table_new.col_2,
col_3 = model_table_new.col_3,
col_4 = model_table_new.col_4
FROM (VALUES {records_list_template}) AS model_table_new(id, col_1, col_2, col_3, col_4)
WHERE model_table.id = model_table_new.id;
"""
).format(
table=sql.Identifier("table_name"),
records_list_template=sql.SQL(",").join(
[sql.Placeholder()] * len(model_instances_to_update)
),
)
database = settings.DATABASES["default"]
pg_connection_dict = {
"dbname": database["NAME"],
"user": database["USER"],
"password": database["PASSWORD"],
"port": database["PORT"],
"host": database["HOST"],
}
connection = psycopg2.connect(**pg_connection_dict)
cursor = connection.cursor()
cursor.execute(
model_instance_update_query,
model_instances_to_update,
)