让我们考虑以下 pySpark 代码
my_df = (spark.read.format("csv")
.option("header","true")
.option("inferSchema", "true")
.load(my_data_path))
这是一个相对较小的代码,但有时我们的代码有很多选项,传递字符串选项经常会导致拼写错误。此外,我们没有从代码编辑器那里得到任何建议。 作为一种解决方法,我正在考虑创建一个命名元组(或自定义类)来拥有我需要的所有选项。例如,
from collections import namedtuple
allOptions = namedtuple("allOptions", "csvFormat header inferSchema")
sparkOptions = allOptions("csv", "header", "inferSchema")
my_df = (spark.read.format(sparkOptions.csvFormat)
.option(sparkOptions.header,"true")
.option(sparkOptions.inferSchema, "true")
.load(my_data_path))
我想知道这种方法是否有缺点,或者其他 pySpark 开发人员是否使用了更好的标准方法。
如果使用
.csv
函数读取文件,选项被命名为参数,因此会抛出TypeError。此外,在带有 Python 插件的 VS Code 上,选项会自动完成。
df = spark.read.csv(my_data_path,
header=True,
inferSchema=True)
如果我运行时出现拼写错误,则会抛出错误。
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/var/folders/tv/32xjg80x6pb_9t4909z8_hh00000gn/T/ipykernel_3636/4060466279.py in <module>
----> 1 df = spark.read.csv('test.csv', inferSchemaa=True, header=True)
TypeError: csv() got an unexpected keyword argument 'inferSchemaa'
在 VS Code 上,自动完成中会建议选项。
我认为最好的方法是制作一个带有一些默认值和 kwargs 的包装器,如下所示
def csv(path, inferSchema=True, header=True, options={}):
return hdfs(path, 'csv', {'inferSchema': inferSchema, 'header': header, **options})
def parquet(path, options={}):
return hdfs(path, 'parquet', {**options})
def hdfs(path, format, options={}):
return (spark
.read
.format(format)
.options(**options)
.load(f'hdfs://.../{path}')
)
出于这个原因和许多其他原因,在生产级项目中,我们曾经编写一个项目来包装 Spark。
所以开发者不允许直接处理spark。
在这样的项目中我们可以:
一个简单但功能稍弱的替代方案是:
例如:
common_options = {
'user': 'my_db_user',
'password': 'my_db_password'
# whatever other option you require.
}
metrics_df = spark.read.format("csv") \
.options(**common_options) \
.load()