Databricks DLT DataFrame - 如何使用带有注释的模式

问题描述 投票:0回答:1

Databricks DLT DataFrame - 如何使用模式

我是 Databricks Delta Live Tables 和 DataFrames 的新手,我对阅读时如何使用模式感到困惑 从溪流中。我正在做桌到桌的流媒体。我们的要求之一是对专栏有评论 在可以从 DBX 目录概述页面查看的表格中。

输入表具有以下列(为简洁起见,进行了精简):

message_schema = StructType([
    StructField("bayId", StringType(), True, {"comment": "Bay ID"}),
    StructField("storeId", StringType(), True, {"comment": "Store ID"}),
    StructField("message", StringType(), True, {"comment": "Message content"}),
])

上面的

message
列包含一个字符串化的 JSON 对象,具有以下字段(为简洁起见,进行了精简):

event_schema = StructType([
    StructField("Id", StringType(), True, {"comment": "Event ID"}),
    StructField("Payload", StructType([
        StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
        StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
    ]),
])

这是我从流中读取表并构建 DF 的代码:

    df = spark.readStream.table("tablename")

    df = df.where(
        col("MESSAGE").isNotNull()
    ).select(
        col("BAYID"),
        col("STOREID"),
        F.from_json(col("MESSAGE"), message_schema).alias("MESSAGE")
    ).select(
        col("BAYID"),
        col("STOREID"),
        F.from_json(col("MESSAGE.message"), event_schema).alias("EVENT")
    ).select(
        F.expr("uuid()").alias("ID"),
        col("BAYID").alias("BAY_ID"),
        col("STOREID").alias("STORE_ID"),
        col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
        col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
    )

运行此命令后,当我查看输出表的目录概述时,

PLAYER_DEXTERITY_NAME
ATTACK_ANGLE_NBR
列显示了我在架构中设置的注释。但是,
BAY_ID
STORE_ID
列 没有我的评论。

我可以通过在上面的块之后添加以下代码来构建注释:

    df = (df
          .withMetadata("BAY_ID", {"comment": "Bay ID"})
          .withMetadata("STORE_ID", {"comment": "Store ID"})
          )

但是,为了保持一致性,我想在模式本身中设置注释。我怎样才能做到这一点?我是什么 我做错了吗?

更新:

在下面的第一个答案中,建议使用

@dlt.table()
上的模式。但是,我们需要使用
@dlt.view()
,它不允许指定模式。

但是,我看到

schema()
上有一个
DataStreamReader
,所以我尝试了这个:

    df = (
        spark.readStream
        .schema(message_schema)
        .table(
            f"{bronze_catalog}.{bronze_schema}.{bronze_table_name}",
        )
    )

但不幸的是,这对表格没有影响。

dataframe azure-databricks databricks-sql delta-live-tables
1个回答
0
投票

您在 dlt 装饰器中给出架构,如下所示。

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

在您的情况下,您创建一个用于从 dlt 返回数据帧的架构并使用它。

from pyspark.sql.types import *
from pyspark.sql import functions as F
import dlt

output_schema = StructType([
    StructField("ID", StringType(), True, {"comment": "Unique identifier for the row"}),
    StructField("BAY_ID", StringType(), True, {"comment": "Bay ID"}),
    StructField("STORE_ID", StringType(), True, {"comment": "Store ID"}),
    StructField("PLAYER_DEXTERITY_NAME", StringType(), True, {"comment": "Player dexterity"}),
    StructField("ATTACK_ANGLE_NBR", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"})
])

event_schema = StructType([
    StructField("Id", StringType(), True, {"comment": "Event ID"}),
    StructField("Payload", StructType([
        StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
        StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
    ]),)
])


@dlt.table(
  comment="Raw data on sales",
  schema=output_schema)
def sales():
  df = spark.readStream.table("tablename")

  df = df.where(F.col("MESSAGE").isNotNull()).\
  select(F.col("BAYID"),F.col("STOREID"),F.from_json(F.col("MESSAGE"), event_schema).alias("EVENT")).\
  select(
        F.expr("uuid()").alias("ID"),
        F.col("BAYID").alias("BAY_ID"),
        F.col("STOREID").alias("STORE_ID"),
        F.col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
        F.col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
    )
  return df

以下是使用的示例数据。

sample_data = [ 
("BAY001", "STORE123", '{"Id":"E001", "Payload":{"PlayerDexterity":"High", "AttackAngle":45.5}}'),
("BAY002", "STORE456", '{"Id":"E002", "Payload":{"PlayerDexterity":"Medium", "AttackAngle":30.0}}'), 
("BAY003", "STORE789", '{"Id":"E003", "Payload":{"PlayerDexterity":"Low", "AttackAngle":15.2}}') ]

输出:

enter image description here

以及概述选项卡中的评论。

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.