使用 psycopg 执行 Celery 任务:编程错误,最后一个操作没有产生结果

问题描述 投票:0回答:1

我正在做一个我有的项目

  1. PostgreSQL 16.2 数据库
  2. 使用 psycopg 3.2.1 和 psycopg_pool 3.2.2 的 Python 3.12 后端。
  3. Celery 用于处理异步任务。

celery任务通过以下代码使用数据库池:


import os
from psycopg_pool import ConnectionPool
from contextlib import contextmanager

PG_USERNAME = os.getenv('PG_USERNAME')
if not PG_USERNAME:
    raise ValueError(f"Invalid postgres username")

PG_PASSWORD = os.getenv('PG_PASSWORD')
if not PG_PASSWORD:
    raise ValueError(f"Invalid postgres pass")

PG_HOST = os.getenv('PG_HOST')
if not PG_HOST:
    raise ValueError(f"Invalid postgres host")

PG_PORT = os.getenv('PG_PORT')
if not PG_PORT:
    raise ValueError(f"Invalid postgres port")

# Options used to prevent closed connections
# conn_options = f"-c statement_timeout=1800000 -c tcp_keepalives_idle=30 -c tcp_keepalives_interval=30"
conninfo = f'host={PG_HOST} port={PG_PORT} dbname=postgres user={PG_USERNAME} password={PG_PASSWORD}'
connection_pool = ConnectionPool(
    min_size=4,
    max_size=100,
    conninfo=conninfo,
    check=ConnectionPool.check_connection,
    #options=conn_options,
)


@contextmanager
def get_db_conn():
    conn = connection_pool.getconn()
    try:
        yield conn
    finally:
        connection_pool.putconn(conn)

芹菜任务的示例是

@app.task(bind=True)
def example_task(self, id):
    with get_db_conn() as conn:
        try:
            with conn.cursor(row_factory=dict_row) as cursor:
                test = None
                cursor.execute('SELECT * FROM test WHERE id = %s', (id,))
                try:
                    test = cursor.fetchone()
                except psycopg.errors.ProgrammingError:
                    logger.warning(f'Test log msg')
                    conn.rollback()
                    return
                
                cursor.execute("UPDATE test SET status = 'running' WHERE id = %s", (id,))
                conn.commit()
                
                # Some processing...
                
               # Fetch another resource needed
               cursor.execute('SELECT * FROM test WHERE id = %s', (test['resource_id'],))
               cursor.fetchone()

                # Update the entry with the result
                cursor.execute("""
                    UPDATE test
                    SET status = 'done', properties = %s
                    WHERE id = %s
                """, (Jsonb(properties),  id))
                conn.commit()
        except Exception as e:
            logger.exception(f'Error: {e}')
            conn.rollback()
            with conn.cursor(row_factory=dict_row) as cursor:
                # Update status to error with exception information
                cursor.execute("""
                    UPDATE test
                    SET status = 'error', error = %s
                    WHERE id = %s
                """, (Jsonb({'error': str(e), 'stacktrace': traceback.format_exc()}), webpage_id))
                conn.commit()

该代码在大多数情况下都可以工作,但有时,当启动多个相同类型的任务时,我会在第二次 fetchone() 调用时收到一些

psycopg.ProgrammingError: the last operation didn't produce a result
类型的错误。

同时,在数据库上我可以看到以下警告

WARNING:  there is already a transaction in progress

我怀疑我处理连接的方式可能存在一些问题,但我找不到问题。

据我所知,一旦调用 get_db_conn() ,连接就不可用于其他任务,因此理论上不能有多个任务使用同一连接,因此在执行第二个 fetchone 时不应该有正在进行的事务() 打电话。

资源存在,因为所有其他任务都可以访问它,所以这不是问题。

python postgresql celery psycopg3
1个回答
0
投票

如果

test
的主目标行以及基于其
test.resource_id
外键选择的附加目标行均不可共享,请锁定它们。否则,并发工作人员可能会相互碰撞,处理同一行并通过
resource_id
更改其字段以及与其关联的字段,在此操作的后续步骤之间的不可预测的点。

常规显式锁会在

commit
/
rollback
上自动释放,因此要在更新目标的
conn.commit()
字段后保留
status
,您可以使用会话级咨询锁让它们持续多个事务:

@app.task(bind=True)
def example_task(self, id):
    with get_db_conn() as conn:
        try:
            with conn.cursor(row_factory=dict_row) as cursor:
                test = None
                cursor.execute("""SELECT *, pg_try_advisory_lock_shared(resource_id)
                                  FROM test 
                                  WHERE id = %s
                                    AND pg_try_advisory_lock(id)
                               """, (id,))
                try:
                    test = cursor.fetchone()
                    #if it fails here, someone else is already processing this `id`
                    #if it waits, someone else was altering the row behind `resource_id`
                    #in the 2nd case, it's best to wait for them to finish
                except psycopg.errors.ProgrammingError:
                    logger.warning(f'Test log msg')
                    conn.rollback()
                    return
                cursor.execute("""UPDATE test 
                                  SET status = 'running' 
                                  WHERE id = %s
                               """, (id,))
                conn.commit()
                # Some processing...
                # Fetch another resource needed
                cursor.execute("""SELECT *
                                  FROM test 
                                  WHERE id = %s
                                  /*AND probably more conditions here*/
                               """, (test['resource_id'],))
                cursor.fetchone()
                # Update the entry with the result
                cursor.execute("""UPDATE test
                                  SET status = 'done'
                                    , properties = %s
                                  WHERE id = %s
                                  RETURNING pg_advisory_unlock(id)
                                          , pg_advisory_unlock(resource_id)
                               """, (Jsonb(properties),  id))
                conn.commit()
        except Exception as e:
            logger.exception(f'Error: {e}')
            conn.rollback()
            with conn.cursor(row_factory=dict_row) as cursor:
                # Update status to error with exception information
                cursor.execute("""UPDATE test
                                  SET status = 'error', error = %s
                                  WHERE id = %s
                                  RETURNING pg_advisory_unlock(id)
                                          , pg_advisory_unlock(resource_id)
                               """, (Jsonb({'error': str(e), 'stacktrace': traceback.format_exc()}), webpage_id))
                conn.commit()

问题也可能出在您未共享的代码部分,您在其中选择并分配从外部传递给

id
example_task(self, id)
。如果这或多或少是工人们找到下一个任务的方式:

select id 
from test 
where status='ready'
order by priority
       , created_at
limit 1;

那么,如果第二个工人在第一个工人有机会

conn.commit()
status
改变之前抓住了同一个工人,那么就没有什么可以阻止两个工人挑选同一个工人。
您可以在那里获取锁,并使所有后续调用跳到仍然空闲的最近的行:

select id 
from test 
where status='ready'
order by priority
       , created_at
for update skip locked--this
limit 1;

但是要保持这样的锁,您只需在完成整个操作后就必须

conn.commit()
,而无需在其子步骤之间运行提交 - 否则您会一路失去锁。

为了保护最近的

.commit()
以外的操作的其余部分,请使用该锁来确保查询不会立即发生冲突,但还要添加一个在多个事务中仍然存在的咨询锁。
建议锁不提供
skip locked
,但可以使用递归 cte 进行模拟(遍历
id
,并在锁定尝试时不返回
false
的第一个锁处停止)。或者,您可以根据
id
 查找哪些 
pg_locks.objid
已经被咨询锁定,并排除那些

select id, pg_try_advisory_lock(id)
from test 
where status='ready' 
and id not in(select objid 
              from pg_locks
              where locktype='advisory')
order by priority
       , created_at
for update skip locked
limit 1;        
© www.soinside.com 2019 - 2024. All rights reserved.