如何更高效地管理 PySpark 中的选项

问题描述 投票:0回答:4

让我们考虑以下 pySpark 代码

my_df = (spark.read.format("csv")
                     .option("header","true")
                     .option("inferSchema", "true")
                     .load(my_data_path))

这是一个相对较小的代码,但有时我们的代码有很多选项,传递字符串选项经常会导致拼写错误。此外,我们没有从代码编辑器那里得到任何建议。 作为一种解决方法,我正在考虑创建一个命名元组(或自定义类)来拥有我需要的所有选项。例如,

from collections import namedtuple
allOptions = namedtuple("allOptions", "csvFormat header inferSchema")
sparkOptions = allOptions("csv", "header", "inferSchema")
my_df = (spark.read.format(sparkOptions.csvFormat)
                     .option(sparkOptions.header,"true")
                     .option(sparkOptions.inferSchema, "true")
                     .load(my_data_path))

我想知道这种方法是否有缺点,或者其他 pySpark 开发人员是否使用了更好的标准方法。

python python-3.x apache-spark pyspark design-patterns
4个回答
0
投票

如果使用

.csv
函数读取文件,选项被命名为参数,因此会抛出TypeError。此外,在带有 Python 插件的 VS Code 上,选项会自动完成。

df = spark.read.csv(my_data_path,
                    header=True,
                    inferSchema=True)

如果我运行时出现拼写错误,则会抛出错误。

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/var/folders/tv/32xjg80x6pb_9t4909z8_hh00000gn/T/ipykernel_3636/4060466279.py in <module>
----> 1 df = spark.read.csv('test.csv', inferSchemaa=True, header=True)

TypeError: csv() got an unexpected keyword argument 'inferSchemaa'

在 VS Code 上,自动完成中会建议选项。

enter image description here


0
投票

我认为最好的方法是制作一个带有一些默认值和 kwargs 的包装器,如下所示

def csv(path, inferSchema=True, header=True, options={}):
    return hdfs(path, 'csv', {'inferSchema': inferSchema, 'header': header, **options})

def parquet(path, options={}):
    return hdfs(path, 'parquet', {**options})

def hdfs(path, format, options={}):
    return (spark
        .read
        .format(format)
        .options(**options)
        .load(f'hdfs://.../{path}')
    )

0
投票

出于这个原因和许多其他原因,在生产级项目中,我们曾经编写一个项目来包装 Spark。

所以开发者不允许直接处理spark。

在这样的项目中我们可以:

  • 使用枚举和继承的抽象选项,以避免拼写错误和不兼容的选项。
  • 为每种数据格式设置默认选项,开发者可以根据需要覆盖它们,以减少开发者编写的代码量
  • 设置和定义任何重复代码,例如常用数据源、默认输出数据格式等。

0
投票

一个简单但功能稍弱的替代方案是:

  • 在字典中定义重复选项
  • 每当需要重用解压的选项时,将它们传递给spark.read.options()或spark.write.options()。

例如:

common_options = {
    'user': 'my_db_user',
    'password': 'my_db_password'
    # whatever other option you require.
}

metrics_df = spark.read.format("csv") \
        .options(**common_options) \
        .load()
© www.soinside.com 2019 - 2024. All rights reserved.