我正在做一个我有的项目
celery任务通过以下代码使用数据库池:
import os
from psycopg_pool import ConnectionPool
from contextlib import contextmanager
PG_USERNAME = os.getenv('PG_USERNAME')
if not PG_USERNAME:
raise ValueError(f"Invalid postgres username")
PG_PASSWORD = os.getenv('PG_PASSWORD')
if not PG_PASSWORD:
raise ValueError(f"Invalid postgres pass")
PG_HOST = os.getenv('PG_HOST')
if not PG_HOST:
raise ValueError(f"Invalid postgres host")
PG_PORT = os.getenv('PG_PORT')
if not PG_PORT:
raise ValueError(f"Invalid postgres port")
# Options used to prevent closed connections
# conn_options = f"-c statement_timeout=1800000 -c tcp_keepalives_idle=30 -c tcp_keepalives_interval=30"
conninfo = f'host={PG_HOST} port={PG_PORT} dbname=postgres user={PG_USERNAME} password={PG_PASSWORD}'
connection_pool = ConnectionPool(
min_size=4,
max_size=100,
conninfo=conninfo,
check=ConnectionPool.check_connection,
#options=conn_options,
)
@contextmanager
def get_db_conn():
conn = connection_pool.getconn()
try:
yield conn
finally:
connection_pool.putconn(conn)
芹菜任务的示例是
@app.task(bind=True)
def example_task(self, id):
with get_db_conn() as conn:
try:
with conn.cursor(row_factory=dict_row) as cursor:
test = None
cursor.execute('SELECT * FROM test WHERE id = %s', (id,))
try:
test = cursor.fetchone()
except psycopg.errors.ProgrammingError:
logger.warning(f'Test log msg')
conn.rollback()
return
cursor.execute("UPDATE test SET status = 'running' WHERE id = %s", (id,))
conn.commit()
# Some processing...
# Fetch another resource needed
cursor.execute('SELECT * FROM test WHERE id = %s', (test['resource_id'],))
cursor.fetchone()
# Update the entry with the result
cursor.execute("""
UPDATE test
SET status = 'done', properties = %s
WHERE id = %s
""", (Jsonb(properties), id))
conn.commit()
except Exception as e:
logger.exception(f'Error: {e}')
conn.rollback()
with conn.cursor(row_factory=dict_row) as cursor:
# Update status to error with exception information
cursor.execute("""
UPDATE test
SET status = 'error', error = %s
WHERE id = %s
""", (Jsonb({'error': str(e), 'stacktrace': traceback.format_exc()}), webpage_id))
conn.commit()
该代码在大多数情况下都可以工作,但有时,当启动多个相同类型的任务时,我会在第二次 fetchone() 调用时收到一些
psycopg.ProgrammingError: the last operation didn't produce a result
类型的错误。
同时,在数据库上我可以看到以下警告
WARNING: there is already a transaction in progress
我怀疑我处理连接的方式可能存在一些问题,但我找不到问题。
据我所知,一旦调用 get_db_conn() ,连接就不可用于其他任务,因此理论上不能有多个任务使用同一连接,因此在执行第二个 fetchone 时不应该有正在进行的事务() 打电话。
资源存在,因为所有其他任务都可以访问它,所以这不是问题。
如果
test
的主目标行以及基于其 test.resource_id
外键选择的附加目标行均不可共享,请锁定它们。否则,并发工作人员可能会相互碰撞,处理同一行并通过 resource_id
更改其字段以及与其关联的字段,在此操作的后续步骤之间的不可预测的点。
常规显式锁会在
commit
/rollback
上自动释放,因此要在更新目标的conn.commit()
字段后保留status
,您可以使用会话级咨询锁让它们持续多个事务:
@app.task(bind=True)
def example_task(self, id):
with get_db_conn() as conn:
try:
with conn.cursor(row_factory=dict_row) as cursor:
test = None
cursor.execute("""SELECT *, pg_try_advisory_lock_shared(resource_id)
FROM test
WHERE id = %s
AND pg_try_advisory_lock(id)
""", (id,))
try:
test = cursor.fetchone()
#if it fails here, someone else is already processing this `id`
#if it waits, someone else was altering the row behind `resource_id`
#in the 2nd case, it's best to wait for them to finish
except psycopg.errors.ProgrammingError:
logger.warning(f'Test log msg')
conn.rollback()
return
cursor.execute("""UPDATE test
SET status = 'running'
WHERE id = %s
""", (id,))
conn.commit()
# Some processing...
# Fetch another resource needed
cursor.execute("""SELECT *
FROM test
WHERE id = %s
/*AND probably more conditions here*/
""", (test['resource_id'],))
cursor.fetchone()
# Update the entry with the result
cursor.execute("""UPDATE test
SET status = 'done'
, properties = %s
WHERE id = %s
RETURNING pg_advisory_unlock(id)
, pg_advisory_unlock(resource_id)
""", (Jsonb(properties), id))
conn.commit()
except Exception as e:
logger.exception(f'Error: {e}')
conn.rollback()
with conn.cursor(row_factory=dict_row) as cursor:
# Update status to error with exception information
cursor.execute("""UPDATE test
SET status = 'error', error = %s
WHERE id = %s
RETURNING pg_advisory_unlock(id)
, pg_advisory_unlock(resource_id)
""", (Jsonb({'error': str(e), 'stacktrace': traceback.format_exc()}), webpage_id))
conn.commit()
问题也可能出在您未共享的代码部分,您在其中选择并分配从外部传递给
id
的 example_task(self, id)
。如果这或多或少是工人们找到下一个任务的方式:
select id
from test
where status='ready'
order by priority
, created_at
limit 1;
那么,如果第二个工人在第一个工人有机会
conn.commit()
其status
改变之前抓住了同一个工人,那么就没有什么可以阻止两个工人挑选同一个工人。select id
from test
where status='ready'
order by priority
, created_at
for update skip locked--this
limit 1;
但是要保持这样的锁,您只需在完成整个操作后就必须
conn.commit()
,而无需在其子步骤之间运行提交 - 否则您会一路失去锁。
为了保护最近的
.commit()
以外的操作的其余部分,请使用该锁来确保查询不会立即发生冲突,但还要添加一个在多个事务中仍然存在的咨询锁。skip locked
,但可以使用递归 cte 进行模拟(遍历 id
,并在锁定尝试时不返回 false
的第一个锁处停止)。或者,您可以根据 id
查找哪些
pg_locks.objid
已经被咨询锁定,并排除那些
select id, pg_try_advisory_lock(id)
from test
where status='ready'
and id not in(select objid
from pg_locks
where locktype='advisory')
order by priority
, created_at
for update skip locked
limit 1;