使用psycopg2 sql.SQL对象调用cursor.execute时出错

问题描述 投票:0回答:1

我正在尝试使用 django 中的 psycopg2 执行原始更新查询。代码如下:

from django.db import connection
from psycopg2 import sql

model_instances_to_update = []
for model_instance in models_queryset:    
    model_instances_to_update.append(
        sql.Identifier(
            f"({id},{col_1_value},{col_2_value},{col_3_value},{col_4_value})"
        )
    )

model_instance_query_values_string = sql.SQL(", ").join(model_instances_to_update)
model_instance_sql_params = {"updated_model_instance_values": model_instance_query_values_string}

model_instance_update_query = sql.SQL(
    """
    UPDATE {table} AS model_table SET
    col_1 = model_table_new.col_1,
    col_2 = model_table_new.col_2,
    col_3 = model_table_new.col_3,
    col_4 = model_table_new.col_4
    FROM (VALUES (%(updated_model_instance_values)s)) AS model_table_new(id, col_1, col_2, col_3, col_4)
    WHERE model_table.id = model_table_new.id;
    """
).format(
    table=sql.Identifier("table_name"),
)

with connection.cursor() as cursor:
    cursor.execute(
        model_instance_update_query,
        params=model_instance_sql_params,
    )

但是当我尝试执行此查询时,出现以下错误:

TypeError: "object of type 'Composed' has no len()"

我在这里做错了什么以及如何纠正它?

更新 我现在直接在cursor.execute 中传递元组列表。我已在 SQL 对象字符串中添加了占位符,但仍然遇到相同的错误。

for model_instance in models_queryset:    
    model_instances_to_update.append(
        (
            id,
            col_1_value,
            col_2_value,
            col_3_value,
            col_4_value
        )
    )

colList = ["id", "col_1", "col_2", "col_3", "col_4"]

model_instance_update_query = sql.SQL(
    """
    UPDATE {table} AS model_table SET
    col_1 = model_table_new.col_1,
    col_2 = model_table_new.col_2,
    col_3 = model_table_new.col_3,
    col_4 = model_table_new.col_4
    FROM (VALUES ({records_list_template})) AS model_table_new(id, col_1, col_2, col_3, col_4)
    WHERE model_table.id = model_table_new.id;
    """
).format(
    table=sql.Identifier("table_name"),
    records_list_template=sql.SQL(",").join(
        [sql.Placeholder()] * len(colList)
    ),
)

with connection.cursor() as cursor:
    cursor.execute(
        model_instance_update_query,
        model_instances_to_update,
    )

更新2

下面是从

execute
函数内部打印 sql 的输出。

Composed([SQL('\n        UPDATE '), 
Identifier('table_name'), 
SQL(' AS model_table SET\n        
col_2 = model_table_new.col_2,\n        
col_3 = model_table_new.col_3,\n        
col_4 = model_table_new.col_4,\n        
col_5 = model_table_new.col_5\n        
FROM (VALUES ('), 
Composed([Placeholder(), SQL(', '), Placeholder(), SQL(', '),
Placeholder(), SQL(', '), Placeholder(), SQL(', '),
Placeholder()]), 
SQL(')) AS 
model_table_new(col_1, col_2, col_3, col_4, col_5)\n        
WHERE model_table.id = model_table_new.id;\n        ')])

我尝试执行

print(model_instance_update_query.as_string(connection))
,但收到此错误 -
argument 2 must be connection or cursor
。这是因为我使用的是 django.db 中的连接对象,它是一个代理,而不是 psycopg2 中的连接对象。所以按照上面的方法尝试了。

python postgresql psycopg2 django-orm database-cursor
1个回答
0
投票

根据@Adrian Klaver的评论,解决方案是使用直接psycopg2连接对象而不是django.db.connection。

from psycopg2 import sql

model_instances_to_update = []
for model_instance in models_queryset:    
    model_instances_to_update.append(
        (
            id,
            col_1_value,
            col_2_value,
            col_3_value,
            col_4_value
        )
    )

model_instance_update_query = sql.SQL(
    """
    UPDATE {table} AS model_table SET
    col_1 = model_table_new.col_1,
    col_2 = model_table_new.col_2,
    col_3 = model_table_new.col_3,
    col_4 = model_table_new.col_4
    FROM (VALUES {records_list_template}) AS model_table_new(id, col_1, col_2, col_3, col_4)
    WHERE model_table.id = model_table_new.id;
    """
).format(
    table=sql.Identifier("table_name"),
    records_list_template=sql.SQL(",").join(
        [sql.Placeholder()] * len(model_instances_to_update)
    ),
)

database = settings.DATABASES["default"]
pg_connection_dict = {
    "dbname": database["NAME"],
    "user": database["USER"],
    "password": database["PASSWORD"],
    "port": database["PORT"],
    "host": database["HOST"],
}

connection = psycopg2.connect(**pg_connection_dict)
cursor = connection.cursor()
cursor.execute(
      model_instance_update_query,
      model_instances_to_update,
  )
© www.soinside.com 2019 - 2024. All rights reserved.