并行化 dlthub Rest API 管道

问题描述 投票:0回答:1

我正在尝试通过并行化加速以下 dlt (dlthub) 管道,如此处的文档所示:https://dlthub.com/docs/reference/performance#parallelism

这是有效的原始(非并行)代码

source = rest_api_source({
    "client": {
        "base_url": "https://www.filmweb.no/",
    },
    "resources": [
        {
            "name": f"movie_{MOVIE_ID}",
            "table_name": "movies",
            "endpoint": {
                "path": "_next/data/{build_id}/film/{movie_id}.json",
                "params": {
                    "movie_id": MOVIE_ID,
                    "build_id": BUILD_ID,
                    "edi": MOVIE_ID,
                },
                "data_selector": "pageProps.cmsDocument",
            },
            "write_disposition": "replace",
        }
        for MOVIE_ID in MOVIE_IDS
    ],
})

pipeline = dlt.pipeline(pipeline_name="movies", destination="filesystem")
load_info = pipeline.run(source)

我不知道如何实际并行化它。以下代码块...

@dlt.resource(parallelized=True)
def movies(movie_ids):
    for MOVIE_ID in movie_ids:
        yield {
            "name": f"movie_{MOVIE_ID}",
            "table_name": "movies",
            "endpoint": {
                "path": "_next/data/{build_id}/film/{movie_id}.json",
                "params": {
                    "movie_id": MOVIE_ID,
                    "build_id": BUILD_ID,
                    "edi": MOVIE_ID,
                },
                "data_selector": "pageProps.cmsDocument",
            },
            "write_disposition": "replace",
        }

@dlt.source
def movies_source(movie_ids):
    return [rest_api_source({
        "client": {
            "base_url": "https://www.filmweb.no/",
        },
        "resources": [movies(movie_ids)]
    })]

pipeline = dlt.pipeline(pipeline_name="movies", destination="filesystem")
pipeline.run(movies_source(MOVIE_IDS))

...给我错误:

ResourceNameMissing: Resource name is missing. If you create a resource directly from data ie. from a list you must pass the name explicitly in `name` argument.
        Please note that for resources created from functions or generators, the name is the function name by default.

我尝试了很多事情,例如将

rest_api_source
movies_source
函数内的列表中取出;在
name
包装器中明确指定
@dlt.resource
;从
name
资源中生成的字典中取出
movies
条目。这些都不能解决任何问题。

python concurrency parallel-processing data-engineering dlt
1个回答
0
投票

在 dlt REST API 声明性对象中,您可以定义传递可在普通 dlt 资源中使用的参数的资源。在你的情况下

parallelized=True

因此,您的配置对象将是:

source: RESTAPIConfig = rest_api_source({
    "client": {
        "base_url": "https://www.filmweb.no/",
    },
    "resources": [
        {
            "name": f"movie_{MOVIE_ID}",
            "table_name": "movies",
            "endpoint": {
                "path": "_next/data/{build_id}/film/{movie_id}.json",
                "params": {
                    "movie_id": MOVIE_ID,
                    "build_id": BUILD_ID,
                    "edi": MOVIE_ID,
                },
                "data_selector": "pageProps.cmsDocument",
            },
            "write_disposition": "replace",
        }
        for MOVIE_ID in MOVIE_IDS
    ],
})

我建议使用类型提示

RESTAPIConfig
让您的 IDE 向您建议哪些是 REST API 配置对象的可用参数。

© www.soinside.com 2019 - 2024. All rights reserved.