下面是database.py文件:
def get_dba_pgdsn() -> str:
return f"mysql://{user}:{pass}@dba"
def get_dbb_pgdsn() -> str:
return f"mysql://{user}:{pass}@dbb"
def engines() -> Engine:
engines = {
"dba": get_dba_pgdsn(),
"dbb": get_dbb_pgdsn()
}
return engines
def get_engine() -> Engine:
if ENGINE is None:
raise Exception(
"SQLAlchemy DB Engine has not been created yet. "
"This is normally done in database.init()"
)
return ENGINE
Base = declarative_base(cls=SQLAlchemyBase)
engine_list = engines()
Session_a = scoped_session(sessionmaker(engine_list["dba"]))
Session_b = scoped_session(sessionmaker(engine_list["dbb"]))
def init(session_expire_on_commit=True):
global ENGINE
if not ENGINE:
ENGINE = engines()
Session_a.configure(bind=engine_list["dba"], expire_on_commit=session_expire_on_commit)
Session_b.configure(bind=engine_list["dbb"], expire_on_commit=session_expire_on_commit)
这是我的conftest 文件:
from shared import database
from sqlalchemy import text
import pytest
@pytest.fixture()
def db_session_a():
database.init()
yield database.Session_a
database.Session_a.rollback()
for table in reversed(database.Base.metadata.sorted_tables):
if table.name != "flyway_schema_history":
sql_stmt = text(f"TRUNCATE TABLE {table.name} RESTART IDENTITY CASCADE;")
database.Session_a.execute(sql_stmt)
database.Session_a.commit()
database.Session_a.close_all()
@pytest.fixture()
def db_session_b():
database.init()
yield database.Session_b
database.Session_b.rollback()
for table in reversed(database.Base.metadata.sorted_tables):
if table.name != "flyway_schema_history":
sql_stmt = text(f"TRUNCATE TABLE {table.name} RESTART IDENTITY CASCADE;")
database.Session_b.execute(sql_stmt)
database.Session_b.commit()
database.Session_b.close_all()
最后是测试用例
def test_run(self, db_session_a, db_session_b):
setup_sql_db()
entry = factories.entry(id=1)
print("entry:" + str(entry.id))
运行时出现此错误
self = <sqlalchemy.orm.session.SessionTransaction object at 0x147f27310>
bind = "mysql://{user}:{pass}@dba"
execution_options = None
def _connection_for_bind(self, bind, execution_options):
self._assert_active()
if bind in self._connections:
if execution_options:
util.warn(
"Connection is already established for the "
"given bind; execution_options ignored"
)
return self._connections[bind][0]
local_connect = False
should_commit = True
if self._parent:
conn = self._parent._connection_for_bind(bind, execution_options)
if not self.nested:
return conn
else:
if isinstance(bind, engine.Connection):
conn = bind
if conn.engine in self._connections:
raise sa_exc.InvalidRequestError(
"Session already has a Connection associated for the "
"given Connection's Engine"
)
else:
conn = bind.connect()
AttributeError: 'str' object has no attribute 'connect'
所以我的问题是上面的代码,如何在测试用例执行期间正确绑定这两个数据库连接?
我希望能够同时建立两个数据库连接,因为我将有需要同时从两个数据库执行查询的测试用例。
您遇到的问题是您的
engines()
函数没有返回引擎的字典,而是返回连接字符串的字典:
def engines() -> Engine:
engines = {
"dba": get_dba_pgdsn(),
"dbb": get_dbb_pgdsn()
}
return engines
Session.configure(bind=...)
采用引擎或连接,而不是连接字符串。
试试这个:
def engines() -> dict[str, Engine]:
engines = {
"dba": create_engine(get_dba_pgdsn()),
"dbb": create_engine(get_dbb_pgdsn()),
}
return engines