我正在尝试通过并行化加速以下 dlt (dlthub) 管道,如此处的文档所示:https://dlthub.com/docs/reference/performance#parallelism
这是有效的原始(非并行)代码
source = rest_api_source({
"client": {
"base_url": "https://www.filmweb.no/",
},
"resources": [
{
"name": f"movie_{MOVIE_ID}",
"table_name": "movies",
"endpoint": {
"path": "_next/data/{build_id}/film/{movie_id}.json",
"params": {
"movie_id": MOVIE_ID,
"build_id": BUILD_ID,
"edi": MOVIE_ID,
},
"data_selector": "pageProps.cmsDocument",
},
"write_disposition": "replace",
}
for MOVIE_ID in MOVIE_IDS
],
})
pipeline = dlt.pipeline(pipeline_name="movies", destination="filesystem")
load_info = pipeline.run(source)
我不知道如何实际并行化它。以下代码块...
@dlt.resource(parallelized=True)
def movies(movie_ids):
for MOVIE_ID in movie_ids:
yield {
"name": f"movie_{MOVIE_ID}",
"table_name": "movies",
"endpoint": {
"path": "_next/data/{build_id}/film/{movie_id}.json",
"params": {
"movie_id": MOVIE_ID,
"build_id": BUILD_ID,
"edi": MOVIE_ID,
},
"data_selector": "pageProps.cmsDocument",
},
"write_disposition": "replace",
}
@dlt.source
def movies_source(movie_ids):
return [rest_api_source({
"client": {
"base_url": "https://www.filmweb.no/",
},
"resources": [movies(movie_ids)]
})]
pipeline = dlt.pipeline(pipeline_name="movies", destination="filesystem")
pipeline.run(movies_source(MOVIE_IDS))
...给我错误:
ResourceNameMissing: Resource name is missing. If you create a resource directly from data ie. from a list you must pass the name explicitly in `name` argument.
Please note that for resources created from functions or generators, the name is the function name by default.
我尝试了很多事情,例如将
rest_api_source
从 movies_source
函数内的列表中取出;在 name
包装器中明确指定 @dlt.resource
;从 name
资源中生成的字典中取出 movies
条目。这些都不能解决任何问题。
在 dlt REST API 声明性对象中,您可以定义传递可在普通 dlt 资源中使用的参数的资源。在你的情况下
parallelized=True
。
因此,您的配置对象将是:
source: RESTAPIConfig = rest_api_source({
"client": {
"base_url": "https://www.filmweb.no/",
},
"resources": [
{
"name": f"movie_{MOVIE_ID}",
"table_name": "movies",
"endpoint": {
"path": "_next/data/{build_id}/film/{movie_id}.json",
"params": {
"movie_id": MOVIE_ID,
"build_id": BUILD_ID,
"edi": MOVIE_ID,
},
"data_selector": "pageProps.cmsDocument",
},
"write_disposition": "replace",
}
for MOVIE_ID in MOVIE_IDS
],
})
我建议使用类型提示
RESTAPIConfig
让您的 IDE 向您建议哪些是 REST API 配置对象的可用参数。