在数据流(Apache Beam)作业中导入共享 Python 模块

问题描述 投票:0回答:3

我正在使用 Google Cloud 的数据流工具构建一系列数据管道。

这是我的文件结构:

-- dataflow_jobs
    |-- README.md
    |-- cleanse_export
    |   |-- __init__.py
    |   |-- run_cleanse_export.py
    |   |-- setup.py
    |   `-- src
    |       |-- __init__.py
    |       `-- cleanse_export.py
    |-- constants
    |   |-- __init__.py
    |   `-- constants.py
    `-- utilities
        |-- __init__.py
        `-- data_transform_functions.py

想法是

constants
utilities
应该是
dataflow_jobs
下其他(未来)工作使用的共享库。

这个例子之后,我建立了一个

setup.py
文件,看起来像这样:

from setuptools import find_packages, setup

setup(
    name="dataflow_job",
    version="0.0.1",
    install_requires=["scrubadub==2.0.0"],
    packages=find_packages()
)

在我的主程序中(

cleanse_export.py
),我正在导入几个本地包:

import time
import uuid
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import sys

sys.path.append("../")
from constants.constants import (
    DATAFLOW_RUNNER,
    PROJECT,
    REGION,
    SETUP_FILE,
)
from utilities.data_transform_functions import (
    cleanse_pii,
    _logging,
)

我不断收到

ModuleNotFound
错误:
ModuleNotFoundError: No module named 'utilities'
.

我正在尝试让

setup.py
识别并安装我的本地 python 模块(工作启动时工作机器上的
utilities
constants

我试过添加

packages=find_packages() + 
packages=find_packages(where='../constants') + packages=find_packages(where='../utilities')

我的

setup.py
packages
参数,但我得到了相同的结果。

为什么

setup.py
无法识别其上方目录中的模块,我怎样才能让它识别当前文件结构中的这些本地模块?

python dependencies python-import google-cloud-dataflow apache-beam
3个回答
0
投票

看起来您的 setup.py 文件正确指定了要包含的包,但我认为问题可能与您在 cleanse_export.py 文件中导入模块的方式有关。

我们尝试使用相对导入语句从父目录导入模块,而不是使用 sys.path.append() 将父目录添加到路径。您可以将导入语句更改为如下内容:

from ..constants.constants import (
    DATAFLOW_RUNNER,
    PROJECT,
    REGION,
    SETUP_FILE,
)
from ..utilities.data_transform_functions import (
    cleanse_pii,
    _logging,
)

这应该允许您从父目录导入模块而不依赖于 sys.path.

此外,请确保每个目录中都有一个空白的init.py 文件,以使其成为 Python 包。


0
投票

要解决您的问题并像单独的包一样使用/导入共享文件夹,您可以将

setup.py
放在项目的根目录下,例如:

-- dataflow_jobs
    |-- setup.py
    |-- README.md
    |-- cleanse_export
    |   |-- __init__.py
    |   |-- run_cleanse_export.py
    |   `-- src
    |       |-- __init__.py
    |       `-- cleanse_export.py
    |-- constants
    |   |-- __init__.py
    |   `-- constants.py
    `-- utilities
        |-- __init__.py
        `-- data_transform_functions.py

进入

dataflow_jobs
文件夹并使用如下所示的命令启动您的
Dataflow
作业:

cd dataflow_jobs

python -m cleanse_export.run_cleanse_export \
        --runner=DataflowRunner \
        --staging_location=gs://your_staging_bucket/staging/ \
        --region=europe-west1 \
        --setup_file=./setup.py \
        ....

在这种情况下,您的工作将与您最初在问题中提出的导入一起正常工作。

当您在单独的包中创建和移动这些文件夹时,导入将保持相同。


0
投票

如果您仍想使用 sys.path,请尝试使用 Path(file).resolve() 从 pathlib 中获取正确的路径。

© www.soinside.com 2019 - 2024. All rights reserved.