我有一个批处理作业,当用户更新 UI 的一行时运行。允许用户同时更新多行,这将触发多个批处理作业,每个批处理作业都有一个唯一的
run_id
.
此作业创建一个 CSV 文件并将值插入表中 (
allocations_update
)
在将值转储到此表后,我们使用前一个表 (
allocations_od
) 中的值更新第二个表 (allocations_update
)。
更新
allocations_od
的查询是:
UPDATE db.allocations_od target
SET rec_alloc = src.rec_alloc
FROM db.allocations_update src
WHERE src.run_id = '{run_id}'
AND src.col1 = target.col1
AND src.col2 = target.col2
但是,有时当用户触发此作业的多个实例(通过同时更新多个列)时,当它尝试运行
allocations_od
的第二次更新查询时,我会遇到死锁错误。
完整的ERROR Message如下图:
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL: Process 15455 waits for ShareLock on transaction 62597603; blocked by process 15538.
Process 15538 waits for ShareLock on transaction 62597592; blocked by process 15455.
HINT: See server log for query details.
CONTEXT: while updating tuple (479821,43) in relation \""allocations_od_20230514\""
我想知道是什么导致了死锁。我最好的猜测是 Job 的其他一些实例仍在运行第一个查询,该查询在
allocations_update
上获得锁定,因此这两个进程都被阻止了。
整个批处理过程相当漫长和复杂,但这是导致问题的最后一部分
def update_alloc_query(self, final_data, stage_location):
""" Method to bulk update allocations od table"""
# stage_location is the s3 path of csv file.
last_created_date = self.get_last_created_date()
last_created_date = last_created_date.strftime('%Y-%m-%d')
final_data['created_date'] = last_created_date
run_id = final_data['run_id'].unique()[0]
s3.s3_upload_df(stage_location, final_data)
UITableLoader.bulk_upload_from_csv(db_model=AllocationsUpdate,
file_location=stage_location,
data_types={"rsid": "str", "passenger_class": "str",
"journey_origin": "str",
"journey_destination": "str",
"bucket_code": "str",
"eff_departure_date": "str",
"recommended_allocation": "float",
"run_id": "str"},
sep="|",
created_date=last_created_date)
self.logger.info("Added table into new data")
allo_sql = f"UPDATE db.allocations_od target\
set rec_alloc = src.rec_alloc FROM\
db.allocations_update src\
WHERE src.run_id = '{run_id}' AND \
src.col1 = target.col1 AND\
src.col2 = target.col2'"
execute_sql_statement(allo_sql)
self.logger.info("executed update query")
# UITableLoader.bulk_upload_from_csv
@staticmethod
def bulk_upload_from_csv(db_model, file_location, data_types=None, sep=',',
created_date=None, chunk_size=1000):
"""Function uploads data from local csv file to sql alchemy db."""
LOGGER.info("Bulk loading data.",
file_location=file_location, table=db_model.__table__)
record_count = 0
chunks = pd.read_csv(
file_location,
dtype=data_types,
chunksize=chunk_size,
sep=sep,
on_bad_lines='skip'
)
for chunk in chunks:
chunk = chunk.where((pd.notnull(chunk)), None)
chunk = chunk.replace({np.nan: None})
record_count += chunk.shape[0]
if created_date is not None:
chunk['created_date'] = created_date
rows = chunk.to_dict(orient='records')
sqa_save(db_model, rows, save_many=True)
return record_count
def execute_sql_statement(sql_statement, conn_string=None): # pragma: no cover
"""Executes the given sql_statement"""
if not sql_statement:
return
if not conn_string:
conn_string = get_db_connection_string()
dbsession = get_db_session(conn_string)
try:
dbsession.execute(sql_statement)
dbsession.commit()
except SQLAlchemyError as ex:
LOGGER.exception(f"Error executing sql statement '{sql_statement}'")
dbsession.rollback()
raise ex
finally:
dbsession.close()