我在 AWS S3 存储桶中有一个 deltalake 表(镶木地板格式)。我需要在笔记本代码中使用 Pyspark 在数据框中读取它。 我尝试在网上搜索但尚未成功。任何人都可以分享如何在 Pyspark 中读取 deltalake 表(数据框或任何其他对象)的示例代码吗?
如果您已经创建了 Delta Table,那么您可以像下面一样读取 Spark 数据框;
s3_path = "s3://<bucket_name>/<delta_tables_path>/"
df = spark_session.read.load(s3_path)
df.show(n)
您需要以某种方式将 AWS 凭证传递给 Spark 会话。这可以通过在 CLI 中或在创建会话时在运行时设置 AWS 凭证配置文件来完成。请记住使用正确的提供商类型
fs.s3a.aws.credentials.provider
。
builder = (
SparkSession.builder.master("local[1]")
.appName("demo_delta_spark_app")
.config("spark.driver.host", "localhost")
.config("spark.driver.bindAddress", "localhost")
.config("spark.ui.port", "4040")
.config(
"spark.jars.packages",
"io.delta:delta-core_2.12:2.4.0, org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262",
)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set(
"fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
)
spark._jsc.hadoopConfiguration().set(
"fs.s3a.aws.credentials.provider",
# "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
)
spark._jsc.hadoopConfiguration().set(
"fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A"
)
spark._jsc.hadoopConfiguration().set(
"fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com"
)
# below configurations should be used only when you want to pass credentials directly instead of AWS profile
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", config["access_key"])
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config["secret_key"])
spark._jsc.hadoopConfiguration().set("fs.s3a.session.token", config["access_token"])
创建 Spark 会话后,您可以从 S3 存储桶读取 Delta 表,如下所示:
version_to_read = 0 # specify the version you want to read
df = (
spark.read.format("delta")
.option("versionAsOf", version_to_read) # Optional, default is latest version
.load("s3a://demobucket/dt1) # Openpkai S3 bucket path
)
print(df.show(100))