如何在 EMR 集群上执行的 pyspark 代码中使用 yaml 文件?

问题描述 投票:0回答:1

在 EMR 集群中,我正在运行 pyspark 代码,该代码使用 yaml 文件,我收到路径未找到错误。

我正在使用以下 Spark 提交:

spark-submit --deploy-mode client --executor-memory 20G --num-executors 4 --executor-cores 4 --jars s3://stage/jdbc_driver/ojdbc8.jar --files s3://rxl-dev-stage/etl_script/constants.yml s3://stage/etl_script/main.py

并且正在执行以下代码:

with open("constants.yml","r") as f:
    constants=yaml.safe_load(f)

我也尝试过:

with open("/mnt/spark/work/files/constants.yml","r") as f:
     constants=yaml.safe_load(f)

但是在这两种情况下我都收到“找不到这样的路径错误”。 如果有人可以帮助我,我应该寻找什么? 谢谢。

python amazon-web-services apache-spark pyspark amazon-emr
1个回答
0
投票

如果可以,请使用 x = "x_string" 的 python 文件并调用 x 赋值。 如果您需要使用 yaml 文件,则必须找到要调用的每个脚本的正确路径。

def _sanitize_path_for_windows(path):
    treated_path = path
    if os.name == 'nt':
        treated_path = "../" + path
    return treated_path

将此路径替换为你的路径

示例路径 =“main_project_name/pipelines/datasources/data/”

替换以下函数中的路径部分

def find_data_path():
    if os.getcwd().split("/")[-1] == "main_project_name":
        location = "./data/"  # Running on Github Runner
    else:
        current_path = os.getcwd().split("/")
        come_out = "../"
        for i in range(len(current_path)):
            if current_path[-i] == "pipelines":
                come_out = come_out * i
                break
        location = come_out + "data/"
    location_sanitized = _sanitize_path_for_windows(location)
    return location_sanitized
© www.soinside.com 2019 - 2024. All rights reserved.