我有一个 Flask Python 应用程序,其中使用多处理库与主线程并行执行函数。该功能的目的是将文件上传到OneDrive。该函数还需要从我的数据库读取和写入。
我的应用程序使用 PostgreSQL 数据库和 SQLAlchemy。为了从临时线程访问数据库,每次执行函数时我都会创建一个新的 SQLALchemy 引擎。当这些临时线程执行其函数时,我遇到了一些不同的错误。错误非常间歇性。
psycopg2.OperationalError: SSL error: decryption failed or bad record mac
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL SYSCALL error: EOF detected
我尝试研究这些错误,但我发现的许多帖子都有足够不同的设置,我不确定它们是否适用于我的情况。我已经尝试了这些帖子中建议的一些解决方案,但它们并不能防止错误。我对 SQLAlchemy 不太有经验,所以我在这里很盲目。
# Initialize the class that contains my upload function
vmb_client = vmb_controller()
# Upload files asynchronously
from multiprocessing import Process
p = Process(target=vmb_client.upload_file, args=(<arguments>))
p.start()
def upload_file(self, corp_index, filename):
#
# Set up new DB connection
#
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine(db_uri)
Session = sessionmaker(bind=engine)
sess = Session()
#
# This does the API call to upload the file. It doesn't involve any DB interaction
#
results = self.call_upload_file(<arguments>)
#
# Track the uploaded file in the DB
#
command = f"""
-- Insert new item in DB
INSERT INTO corporate.vmb_items (corporation_key, onedrive_item_id, parent_id, type, name, created_datetime,
modified_datetime, path, web_url, child_count, size)
VALUES ({corp_index}, '{res.get('id')}', '{res.get('parent_id')}', 'file', '{res.get('name').replace("'", "''")}',
'{res.get('created_datetime')}', '{res.get('modified_datetime')}', '{res.get('path').replace("'", "''")}',
'{res.get('web_url').replace("'", "''")}', {res.get('child_count')}, '{res.get('size')}');"""
sess.execute(command)
#
# Update parent folder
#
command = f"""-- Update child count of parent folder
UPDATE corporate.vmb_items AS i SET child_count = (SELECT COUNT(vmb_item_key) FROM corporate.vmb_items AS sub WHERE sub.parent_id = '{res.get('parent_id')}')
WHERE i.onedrive_item_id = '{res.get('parent_id')}';"""
sess.execute(command)
#
# Cleanup
#
sess.commit()
sess.close()
engine.dispose()
return results
根据我看到的与我遇到的错误相关的其他帖子,我在
create_engine
调用中尝试了几个不同的参数,但这些参数没有任何区别:
from sqlalchemy.pool import NullPool
corporate_engine = create_engine(db_uri, pool_pre_ping=True, poolclass=NullPool)
就我而言,这是关于回滚时重置。许多进程同时从池中请求连接。 SQLAlchemy 的
pool_reset_on_return
参数的默认值为 True
。会话在返回池时进行重置操作。我通过执行以下步骤修复了它:
我首先在分叉另一个进程之前添加了
engine.dispose()
。它将释放父进程的连接并从池中请求新的连接。
当你在create_engine函数中添加参数
pool_reset_on_return=None,
时(False
也适用于我),它不会在结帐时重置。
但是此修复可能会导致另一个关键问题,例如留下一些未提交的语句。这就是为什么你应该确保你的代码可以解决这个问题。这是官方文档:SQLAlchemy 官方文档。我希望它也能解决您的问题。