我正在构建 ETL,但是,我有一个限制: 我必须使用常规 HTTP 请求与数据库进行通信(将 SQL 查询作为字符串发送)。 我正在寻找可以帮助我做到这一点的软件包和工具。 我正在尝试的一种方法是使用 Pydantic 进行数据验证,并使用 sqlmodel 或 sqlglot 以 POSTGERS 方言构建 SQL 查询(将发送到数据库)。
但是,我没能创建以下查询:
INSERT INTO ... VALUES (...) ,(...) ON CONFLICT DO UPDATE SET ...
我发现有几个选项:
由于 SQLAlchemy 有很好的文档记录并且易于实现,所以我尝试了一下:
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData
from sqlalchemy.dialects.postgresql import insert
import requests
metadata = MetaData()
my_table = Table('my_table', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('value', Integer))
engine = create_engine('postgresql://username:passwd@ip/dbname')
stmt = insert(my_table).values(id=1, name='Example', value=123)
upsert_stmt = stmt.on_conflict_do_update(
index_elements=['id'],
set_=dict(name=stmt.excluded.name, value=stmt.excluded.value)
)
query = str(upsert_stmt.compile(dialect=engine.dialect))
response = requests.post('http://databaseapiurl.com/query', data={'sql': query})
print(response.text)