我有一张员工表。每个员工都可以与客户讨论公司的某些产品,即员工和产品之间是多对多的关系。架构如下:
metadata_obj = MetaData()
cv_calls_table = Table(
"employee",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("name", String),
Column("key_id", String, nullable=True),
)
cv_products_table = Table(
"products",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("product", String),
)
call_has_product_table = Table(
"employee_has_product",
metadata_obj,
Column("employee_id", Integer, ForeignKey("employee.id")),
Column("product_id", Integer, ForeignKey("products.id")),
)
假设我有一个 csv 文件,其中包含一百万行,其形式如下:
+-----------+-----------+----------+
| Name | key_id | product |
+-----------+-----------+----------+
| John Doe | xyz |pears |
+-----------+-----------+----------+
| John Doe | xyz |apples |
+-----------+-----------+----------+
| Ann Smith | abc |oranges |
+-----------+-----------+----------+
| Ann Smith | abc |apples |
+-----------+-----------+----------+
使用 Pandas(用于读取 csv 文件)和 SQLAlchemy 向数据库执行批量插入的最佳方法是什么? 将员工和产品数据插入表中没有问题。我正在努力寻找一种巧妙的方法将员工和水果的主键插入联结表
call_has_product_table
。
我正在使用 SQLite 数据库。
我尝试过使用SQLAlchemy ORM,但似乎只能为每个员工单独完成向关系添加元素,这在性能方面不可行。因此我尝试用 Core 设置数据库。我使用 ORM 方法的代码是
#keep only data of employees
employees = employee_data.drop_duplicates(subset=['Name','key_id']).reset_index(drop=True)
with Session(engine) as session:
emply = []
for i in employees.index:
#Access each employee
employee_row = employees.iloc[i,:]
#Create the corresponding object
employee_ = Employee(name = employee_row['Name'], key_id = employee_data['key_id'])
#Get products associated to currently selected employee
prods_ = employee_data.loc[(employee_data.Name == employee_row['Name']) & (employee_data.key_id == employee_row['key_id']), 'Product'].to_list()
#Create corresponding Product objects
products_ = [Product(product = prod_) for prod_ in prods_]
#Add product objects to current employee
employee_.products.extend(products_ )
emply.append(employee_)
session.add_all(emply)
session.commit()
session.close()
其中
employee_data
是上表形式的Pandas数据框。
我还可以通过选择
id
和 employee
中的 products
来解决 Pandas 的问题,但我认为应该有更好的方法,仅使用 SQLAlchemy(或者可能是纯 SQL)的功能。
您可以插入
employee
和 products
创建记录,然后选择所有记录以获取(自动生成的)主键。将它们合并到 employee_data
数据框上,然后填充最后一个表:
from sqlalchemy.engine import create_engine, Engine
from sqlalchemy.schema import MetaData, Table, Column, ForeignKey
from sqlalchemy.types import String, Integer
engine = create_engine('sqlite://')
metadata_obj = MetaData()
# Your table declaration here
metadata_obj.create_all(engine)
employee_data = pd.read_csv('data.csv').rename(columns={'Name': 'name'})
with engine.connect() as con:
# Employees
employees = employee_data[['name','key_id']].drop_duplicates()
con.execute(cv_calls_table.insert().values(employees.to_dict('records')))
# Products
products = employee_data[['product']].drop_duplicates()
con.execute(cv_products_table.insert().values(products.to_dict('records')))
con.commit()
# Get Primary Keys
tbl1 = pd.DataFrame(con.execute(cv_calls_table.select()).all()).rename(columns={'id': 'employee_id'})
tbl2 = pd.DataFrame(con.execute(cv_products_table.select()).all()).rename(columns={'id': 'product_id'})
employee_data = employee_data.merge(tbl1, on=['name', 'key_id'])
employee_data = employee_data.merge(tbl2, on='product')
# Employee / Product
con.execute(call_has_product_table.insert().values(employee_data[['employee_id', 'product_id']].to_dict('records')))
con.commit()
输出:
>>> employee_data
name key_id product employee_id product_id
0 John Doe xyz pears 1 1
1 John Doe xyz apples 1 2
2 Ann Smith abc apples 2 2
3 Ann Smith abc oranges 2 3
检查熊猫:
con = engine.connect()
>>> pd.read_sql_table('employee', con)
id name key_id
0 1 John Doe xyz
1 2 Ann Smith abc
>>> pd.read_sql_table('products', con)
id product
0 1 pears
1 2 apples
2 3 oranges
>>> pd.read_sql_table('employee_has_product', con)
employee_id product_id
0 1 1
1 1 2
2 2 2
3 2 3