在 psycopg2 中构建动态更新查询

问题描述 投票:0回答:6

我必须为 postgresql 构造一个动态更新查询。 它是动态的,因为事先我必须确定要更新哪些列。

给出一个示例表:

create table foo (id int, a int, b int, c int)

然后我将以编程方式构造“set”子句

_set = {}
_set['a'] = 10
_set['c'] = NULL

之后我必须构建更新查询。我被困在这里了。 我必须构造这个 sql Update 命令:

update foo set a = 10, b = NULL where id = 1

如何使用 psycopg2 参数化命令执行此操作? (即,如果字典不为空,则循环遍历该字典并构建 set 子句)?

更新

当我睡觉时,我自己找到了解决方案。它是动态的,正是我想要的:-)

create table foo (id integer, a integer, b integer, c varchar)

updates = {}
updates['a'] = 10
updates['b'] = None
updates['c'] = 'blah blah blah'
sql = "upgrade foo set %s where id = %s" % (', '.join("%s = %%s" % u for u in updates.keys()), 10)
params = updates.values()
print cur.mogrify(sql, params)
cur.execute(sql, params)

结果就是我需要的内容和方式(特别是可为空和可引用的列):

"upgrade foo set a = 10, c = 'blah blah blah', b = NULL where id = 10"
python postgresql psycopg2
6个回答
11
投票

使用 psycopg2.sql – SQL 字符串组合模块

该模块包含可用于以方便且安全的方式动态生成 SQL 的对象和函数。

from psycopg2 import connect, sql

conn = connect("dbname=test user=postgres")

upd = {'name': 'Peter', 'age': 35, 'city': 'London'}
ref_id = 12

sql_query = sql.SQL("UPDATE people SET {data} WHERE id = {id}").format(
    data=sql.SQL(', ').join(
        sql.Composed([sql.Identifier(k), sql.SQL(" = "), sql.Placeholder(k)]) for k in upd.keys()
    ),
    id=sql.Placeholder('id')
)
upd.update(id=ref_id)
with conn:
    with conn.cursor() as cur:
        cur.execute(sql_query, upd)
conn.close()

在关闭连接之前运行

print(sql_query.as_string(conn))
将显示以下输出:

UPDATE people SET "name" = %(name)s, "age" = %(age)s, "city" = %(city)s WHERE id = %(id)s

10
投票

实际上有一种更简洁的方法,使用替代列列表语法

sql_template = "UPDATE foo SET ({}) = %s WHERE id = {}"
sql = sql_template.format(', '.join(updates.keys()), 10)
params = (tuple(addr_dict.values()),)
print cur.mogrify(sql, params)
cur.execute(sql, params)

1
投票

不需要动态SQL。假设

a
不可为空且
b
可为空。

如果您想同时更新

a
b
:

_set = dict(
    id = 1,
    a = 10,
    b = 20, b_update = 1
)
update = """
    update foo
    set
        a = coalesce(%(a)s, a), -- a is not nullable
        b = (array[b, %(b)s])[%(b_update)s + 1] -- b is nullable
    where id = %(id)s
"""
print cur.mogrify(update, _set)
cur.execute(update, _set)

输出:

update foo
set
    a = coalesce(10, a), -- a is not nullable
    b = (array[b, 20])[1 + 1] -- b is nullable
where id = 1

如果您不想更新:

_set = dict(
    id = 1,
    a = None,
    b = 20, b_update = 0
)

输出:

update foo
set
    a = coalesce(NULL, a), -- a is not nullable
    b = (array[b, 20])[0 + 1] -- b is nullable
where id = 1

1
投票

没有 python 格式的选项,使用 psycopg2 的

AsIs
函数作为列名(尽管这并不能阻止您对列名进行 SQL 注入)。字典被命名为
data

update_statement = f'UPDATE foo SET (%s) = %s WHERE id_column=%s'
columns = data.keys()
values = [data[column] for column in columns]
query = cur.mogrify(update_statement, (AsIs(','.join(columns)), tuple(values), id_value))

0
投票

这是我在通用

DatabaseHandler
类中的解决方案,在使用
pd.DataFrame
作为源时提供了很大的灵活性。

    def update_data(
        self,
        table: str,
        df: pd.DataFrame,
        indexes: Optional[list] = None,
        column_map: Optional[dict] = None,
        commit: Optional[bool] = False,
    ) -> int:
        """Update data in the media database

        Args:
            table (str): the "tablename" or "namespace.tablename"
            df (pandas.DataFrame): dataframe containing the data to update
            indexes (list): the list of columns in the table that will be in the WHERE clause of the update statement.
                If not provided, will use df indexes.
            column_map (dict): dictionary mapping the columns in df to the columns in the table
                columns in the column_map that are also in keys will not be updated
                Key = df column.
                Value = table column.
            commit (bool): if True, the transaction will be committed (default=False)

            Notes:
                If using a column_map, only the columns in the data_map will be updated or used as indexes.
                Order does not matter. If not using a column_map, all columns in df must exist in table.

        Returns:
            int : rows updated
        """
        try:
            if not indexes:
                # Use the dataframe index instead
                indexes = []
                for c in df.index.names:
                    if not c:
                        raise Exception(
                            f"Dataframe contains indexes without names. Unable to determine update where clause."
                        )
                    indexes.append(c)

            update_strings = []
            tdf = df.reset_index()
            if column_map:
                target_columns = [c for c in column_map.keys() if c not in indexes]
            else:
                column_map = {c: c for c in tdf.columns}
                target_columns = [c for c in df.columns if c not in indexes]

            for i, r in tdf.iterrows():
                upd_params = ", ".join(
                    [f"{column_map[c]} = %s" for c in target_columns]
                )
                upd_list = [r[c] if pd.notna(r[c]) else None for c in target_columns]
                upd_str = self._cur.mogrify(upd_params, upd_list).decode("utf-8")

                idx_params = " AND ".join([f"{column_map[c]} = %s" for c in indexes])
                idx_list = [r[c] if pd.notna(r[c]) else None for c in indexes]
                idx_str = self._cur.mogrify(idx_params, idx_list).decode("utf-8")

                update_strings.append(f"UPDATE {table} SET {upd_str} WHERE {idx_str};")
            full_update_string = "\n".join(update_strings)
            print(full_update_string)  # Debugging
            self._cur.execute(full_update_string)
            rowcount = self._cur.rowcount
            if commit:
                self.commit()
            return rowcount
        except Exception as e:
            self.rollback()
            raise e

用法示例:

>>> df = pd.DataFrame([
    {'a':1,'b':'asdf','c':datetime.datetime.now()}, 
    {'a':2,'b':'jklm','c':datetime.datetime.now()}
])

>>> cls.update_data('my_table', df, indexes = ['a'])
UPDATE my_table SET b = 'asdf', c = '2023-01-17T22:13:37.095245'::timestamp WHERE a = 1;
UPDATE my_table SET b = 'jklm', c = '2023-01-17T22:13:37.095250'::timestamp WHERE a = 2;

>>> cls.update_data('my_table', df, indexes = ['a','b'])
UPDATE my_table SET c = '2023-01-17T22:13:37.095245'::timestamp WHERE a = 1 AND b = 'asdf';
UPDATE my_table SET c = '2023-01-17T22:13:37.095250'::timestamp WHERE a = 2 AND b = 'jklm';

>>> cls.update_data('my_table', df.set_index('a'), column_map={'a':'db_a','b':'db_b','c':'db_c'} )
UPDATE my_table SET db_b = 'asdf', db_c = '2023-01-17T22:13:37.095245'::timestamp WHERE db_a = 1;
UPDATE my_table SET db_b = 'jklm', db_c = '2023-01-17T22:13:37.095250'::timestamp WHERE db_a = 2;

但请注意,由于它生成 where 子句的方式,这对于 SQL 注入来说并不安全。


0
投票

如果要在 python v3.10 下运行,以下是对 @Gabor 答案的更新,否则将收到错误“TypeError:'dict_values'对象不支持索引”:

create table foo (id integer, a integer, b integer, c varchar)

updates = {}
updates['a'] = 10
updates['b'] = None
updates['c'] = 'blah blah blah'
sql = "upgrade foo set %s where id = %s" % (', '.join("%s = %%s" % u for u in updates.keys()), 10)
params = list(updates.values())
print(cur.mogrify(sql, params))
cur.execute(sql, params)
© www.soinside.com 2019 - 2024. All rights reserved.