如何使用Pyspark读取DeltaLake表

问题描述 投票:0回答:2

我在 AWS S3 存储桶中有一个 deltalake 表(镶木地板格式)。我需要在笔记本代码中使用 Pyspark 在数据框中读取它。 我尝试在网上搜索但尚未成功。任何人都可以分享如何在 Pyspark 中读取 deltalake 表(数据框或任何其他对象)的示例代码吗?

python-3.x pyspark parquet delta-lake delta-live-tables
2个回答
1
投票

如果您已经创建了 Delta Table,那么您可以像下面一样读取 Spark 数据框;

s3_path = "s3://<bucket_name>/<delta_tables_path>/"
df = spark_session.read.load(s3_path)
df.show(n)

0
投票

您需要以某种方式将 AWS 凭证传递给 Spark 会话。这可以通过在 CLI 中或在创建会话时在运行时设置 AWS 凭证配置文件来完成。请记住使用正确的提供商类型

fs.s3a.aws.credentials.provider

builder = (
    SparkSession.builder.master("local[1]")
    .appName("demo_delta_spark_app")
    .config("spark.driver.host", "localhost")
    .config("spark.driver.bindAddress", "localhost")
    .config("spark.ui.port", "4040")
    .config(
        "spark.jars.packages",
        "io.delta:delta-core_2.12:2.4.0, org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262",
    )
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set(
    "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
)
spark._jsc.hadoopConfiguration().set(
    "fs.s3a.aws.credentials.provider",
    #  "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
    "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
)
spark._jsc.hadoopConfiguration().set(
    "fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A"
)
spark._jsc.hadoopConfiguration().set(
    "fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com"
)
# below configurations should be used only when you want to pass credentials directly instead of AWS profile
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", config["access_key"])
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config["secret_key"])
spark._jsc.hadoopConfiguration().set("fs.s3a.session.token", config["access_token"])

创建 Spark 会话后,您可以从 S3 存储桶读取 Delta 表,如下所示:

version_to_read = 0  # specify the version you want to read
df = (
     spark.read.format("delta")
          .option("versionAsOf", version_to_read) # Optional, default is latest version
          .load("s3a://demobucket/dt1)  # Openpkai S3 bucket path
)
print(df.show(100))
© www.soinside.com 2019 - 2024. All rights reserved.