我正在尝试创建具有自定义架构的表,在基类中创建。
基类:
@as_declarative()
class Base:
"""Base class for all database entities"""
@classmethod
@declared_attr
def __tablename__(cls) -> str:
"""Generate database table name automatically.
Convert CamelCase class name to snake_case db table name.
"""
return re.sub(r"(?<!^)(?=[A-Z])", "_", cls.__name__).lower()
@classmethod
@declared_attr
def __table_args__(cls) -> dict[str, str]:
return {'schema': f'{cls.__module__.split(".")[-2]}_{cls.__module__.split(".")[-1].upper()}'}
def __repr__(self) -> str:
attrs = []
for c in self.__table__.columns:
attrs.append(f"{c.name}={getattr(self, c.name)}")
return "{}({})".format(self.__class__.__name__, ", ".join(attrs))
问题是,alembic 自动生成的迁移只是看不到 __table_args_ 中的模式。我在在线和离线迁移中都添加了
include_schemas=True
。
奇怪的是,它确实在迁移文件中添加了
schema='STG_UNION_MEMBER'
,但根本没有创建模式。
迁移文件:
"""init
Revision ID: 3d2d5e14ded1
Revises:
Create Date: 2023-08-22 19:00:27.771231
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3d2d5e14ded1'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table('union_member',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('type_of_learning', sa.String(), nullable=False),
sa.Column('rzd_status', sa.String(), nullable=False),
sa.Column('academic_level', sa.String(), nullable=False),
sa.Column('status', sa.String(), nullable=False),
sa.Column('faculty', sa.String(), nullable=False),
sa.Column('first_name', sa.String(), nullable=False),
sa.Column('last_name', sa.String(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('date_of_birth', sa.String(), nullable=False),
sa.Column('phone_number', sa.String(), nullable=False),
sa.Column('image', sa.String(), nullable=False),
sa.Column('rzd_datetime', sa.String(), nullable=False),
sa.Column('rzd_number', sa.String(), nullable=False),
sa.Column('grade_level', sa.Integer(), nullable=False),
sa.Column('has_student_id', sa.Boolean(), nullable=False),
sa.Column('entry_date', sa.String(), nullable=False),
sa.Column('status_gain_date', sa.String(), nullable=False),
sa.Column('card_id', sa.Integer(), nullable=False),
sa.Column('card_status', sa.String(), nullable=False),
sa.Column('card_date', sa.String(), nullable=False),
sa.Column('card_number', sa.String(), nullable=False),
sa.Column('card_user', sa.String(), nullable=False),
sa.Column('card', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('id'),
schema='STG_UNION_MEMBER'
)
def downgrade():
op.drop_table('union_member', schema='STG_UNION_MEMBER')
env.py(蒸馏器):
import os
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config, pool, inspect
import profcomff_definitions
from profcomff_definitions.base import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = os.getenv('DB_DSN', 'postgresql://postgres:12345@localhost:5432/postgres')
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
include_schemas=True,
version_table_schema='public',
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
configuration = config.get_section(config.config_ini_section)
configuration["sqlalchemy.url"] = os.getenv('DB_DSN', 'postgresql://postgres:12345@localhost:5432/postgres')
connectable = engine_from_config(
configuration,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata, include_schemas=True, version_table_schema='public'
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
桌子本身:
import logging
from sqlalchemy import Boolean, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from profcomff_definitions.base import Base
logger = logging.getLogger(__name__)
class UnionMember(Base):
id: Mapped[int] = mapped_column(Integer, primary_key=True)
type_of_learning: Mapped[str] = mapped_column(String, nullable=False)
rzd_status: Mapped[str] = mapped_column(String)
academic_level: Mapped[str] = mapped_column(String, nullable=False)
status: Mapped[str] = mapped_column(String, nullable=False)
faculty: Mapped[str] = mapped_column(String, nullable=False)
first_name: Mapped[str] = mapped_column(String, nullable=False)
last_name: Mapped[str] = mapped_column(String, nullable=False)
email: Mapped[str] = mapped_column(String, nullable=False)
date_of_birth: Mapped[str] = mapped_column(String, nullable=False)
phone_number: Mapped[str] = mapped_column(String, nullable=False)
image: Mapped[str] = mapped_column(String, nullable=False)
rzd_datetime: Mapped[str] = mapped_column(String, nullable=False)
rzd_number: Mapped[str] = mapped_column(String, nullable=False)
grade_level: Mapped[int] = mapped_column(Integer)
has_student_id: Mapped[bool] = mapped_column(Boolean)
entry_date: Mapped[str] = mapped_column(String, nullable=False)
status_gain_date: Mapped[str] = mapped_column(String, nullable=False)
card_id: Mapped[int] = mapped_column(Integer, nullable=False)
card_status: Mapped[str] = mapped_column(String, nullable=False)
card_date: Mapped[str] = mapped_column(String, nullable=False)
card_number: Mapped[str] = mapped_column(String, nullable=False)
card_user: Mapped[int] = mapped_column(String, nullable=False)
card: Mapped[int] = mapped_column(String, nullable=False)
class Test(Base):
id: Mapped[int] = mapped_column(Integer, primary_key=True)
基本上,我想做的是强制alembic自动生成我指定的自定义模式(我的代码中的STG_UNION_MEMBER)。经过搜索,我在 alembic 文档中找到了重写器对象(链接:https://alembic.sqlalchemy.org/en/latest/api/autogenerate.html#fine-grained-autogenerate- Generation-with-rewriters)。
将以下代码添加到 env.py 文件中:
from alembic import op as aop
from alembic.autogenerate import rewriter
from alembic.operations import ops
writer = rewriter.Rewriter()
@writer.rewrites(ops.CreateTableOp)
def create_table(context, revision, op):
"""Ensure schema exists before creating the table"""
if op.schema:
op.execute(f'CREATE SCHEMA IF NOT EXISTS "{op.schema}"')
return [
op
]
并更改同一文件中的两个configure()函数:
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
include_schemas=True,
version_table_schema='public',
process_revision_directives=writer # Additional line
)
似乎对我有用。