我使用以下代码通过 Luigi 将数据加载到 SQLite 数据库中:
class LoadData(luigi.Task):
def requires(self):
return TransformData()
def run(self):
with sqlite3.connect('database.db') as db:
cursor = db.cursor()
cursor.execute("INSERT INTO prod SELECT * FROM staging;")
def output(self):
return luigi.LocalTarget('database.db')
这可行,但是当我想更新或插入新数据时,任务不会执行,因为 Luigi 认为它已完成(
database.db
已经存在)。
可能是我没有理解LocalTarget的好用。解决这个问题的正确方法是什么?
///编辑:我的问题适用于此页面上给出的示例(
le_create_db.py
的代码)。您如何解决该示例中的更新和插入问题?
///编辑:关于附加到文件的这个问题类似,但是使用标记文件的解决方案不起作用,因为 sqla 需要
SQLAlchemyTarget
输出。还有其他答案吗,特别是关于附加到数据库的答案吗?
考虑使用模拟文件: http://gouthamanbalaraman.com/blog/building-luigi-task-pipeline.html
在每次执行中,您将创建一个新文件。
另一种解决方案可以使用在数据库内创建标记表的策略,例如: https://luigi.readthedocs.io/en/stable/api/luigi.contrib.postgres.html#luigi.contrib.postgres .PostgresTarget
我遇到了同样的问题,并且能够通过重写
complete
方法来简单地返回 False
: 来解决它
def complete(self):
return False
现在,即使存在数据库文件,任务也会重新运行。
一般有以下三种选择:
complete
并始终返回False
)complete
的含义(例如,决定数据库中最近的最后一个快照,如果需要创建新快照)以下是单个片段中并排的所有三个变体:https://gist.github.com/miku/fb53af7576bc3e5dd1df3ad627db4d07