我能够在 kedro ipython 会话中加载 Spark 数据集。
ipython --ext kedro.extras.extensions.ipython
或 kedro ipython
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from pathlib import Path
import pyspark.sql
#from kedro.io import DataCatalog
from kedro.extras.datasets.spark import SparkDataSet
import os
os.chdir('/myproject')
project_root = Path.cwd()
bootstrap_project(project_root)
session = KedroSession.create()
context = session.load_context()
catalog = context.catalog
test = catalog.load("mydata@spark")
test.show(2)
isinstance(test, pyspark.sql.DataFrame) # True
因此正确定义了一个 Spark 会话。问题是,如何访问这个会话对象? 如果我运行
spark = SparkSession.builder.getOrCreate()
,我无法确认这确实是 Kedro 管理的会话,例如 spark.conf.get('spark.driver.maxResultSize')
会抛出 java.util.NoSuchElementException:
,尽管这个 maxResultSize
确实是在我的项目的 spark.yml
中定义的
如何访问正确的 kedro 管理的 Spark 会话?
因此,如果您执行
kedro ipython
(或使用扩展),您应该已经可以将 catalog
作为全局变量使用,而无需自己创建它。
我有一种感觉,这会起作用:
df = catalog.load('my_data')
type(df, pyspark.sql.DataFrame)
spark = df.sparkSession
...