Databricks DLT DataFrame - 如何使用模式
我是 Databricks Delta Live Tables 和 DataFrames 的新手,我对阅读时如何使用模式感到困惑 从溪流中。我正在做桌到桌的流媒体。我们的要求之一是对专栏有评论 在可以从 DBX 目录概述页面查看的表格中。
输入表具有以下列(为简洁起见,进行了精简):
message_schema = StructType([
StructField("bayId", StringType(), True, {"comment": "Bay ID"}),
StructField("storeId", StringType(), True, {"comment": "Store ID"}),
StructField("message", StringType(), True, {"comment": "Message content"}),
])
上面的
message
列包含一个字符串化的 JSON 对象,具有以下字段(为简洁起见,进行了精简):
event_schema = StructType([
StructField("Id", StringType(), True, {"comment": "Event ID"}),
StructField("Payload", StructType([
StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
]),
])
这是我从流中读取表并构建 DF 的代码:
df = spark.readStream.table("tablename")
df = df.where(
col("MESSAGE").isNotNull()
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE"), message_schema).alias("MESSAGE")
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE.message"), event_schema).alias("EVENT")
).select(
F.expr("uuid()").alias("ID"),
col("BAYID").alias("BAY_ID"),
col("STOREID").alias("STORE_ID"),
col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
)
运行此命令后,当我查看输出表的目录概述时,
PLAYER_DEXTERITY_NAME
和
ATTACK_ANGLE_NBR
列显示了我在架构中设置的注释。但是,BAY_ID
和 STORE_ID
列
没有我的评论。
我可以通过在上面的块之后添加以下代码来构建注释:
df = (df
.withMetadata("BAY_ID", {"comment": "Bay ID"})
.withMetadata("STORE_ID", {"comment": "Store ID"})
)
但是,为了保持一致性,我想在模式本身中设置注释。我怎样才能做到这一点?我是什么 我做错了吗?
更新:
在下面的第一个答案中,建议使用
@dlt.table()
上的模式。但是,我们需要使用 @dlt.view()
,它不允许指定模式。
但是,我看到
schema()
上有一个DataStreamReader
,所以我尝试了这个:
df = (
spark.readStream
.schema(message_schema)
.table(
f"{bronze_catalog}.{bronze_schema}.{bronze_table_name}",
)
)
但不幸的是,这对表格没有影响。
您在 dlt 装饰器中给出架构,如下所示。
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
在您的情况下,您创建一个用于从 dlt 返回数据帧的架构并使用它。
from pyspark.sql.types import *
from pyspark.sql import functions as F
import dlt
output_schema = StructType([
StructField("ID", StringType(), True, {"comment": "Unique identifier for the row"}),
StructField("BAY_ID", StringType(), True, {"comment": "Bay ID"}),
StructField("STORE_ID", StringType(), True, {"comment": "Store ID"}),
StructField("PLAYER_DEXTERITY_NAME", StringType(), True, {"comment": "Player dexterity"}),
StructField("ATTACK_ANGLE_NBR", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"})
])
event_schema = StructType([
StructField("Id", StringType(), True, {"comment": "Event ID"}),
StructField("Payload", StructType([
StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
]),)
])
@dlt.table(
comment="Raw data on sales",
schema=output_schema)
def sales():
df = spark.readStream.table("tablename")
df = df.where(F.col("MESSAGE").isNotNull()).\
select(F.col("BAYID"),F.col("STOREID"),F.from_json(F.col("MESSAGE"), event_schema).alias("EVENT")).\
select(
F.expr("uuid()").alias("ID"),
F.col("BAYID").alias("BAY_ID"),
F.col("STOREID").alias("STORE_ID"),
F.col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
F.col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
)
return df
以下是使用的示例数据。
sample_data = [
("BAY001", "STORE123", '{"Id":"E001", "Payload":{"PlayerDexterity":"High", "AttackAngle":45.5}}'),
("BAY002", "STORE456", '{"Id":"E002", "Payload":{"PlayerDexterity":"Medium", "AttackAngle":30.0}}'),
("BAY003", "STORE789", '{"Id":"E003", "Payload":{"PlayerDexterity":"Low", "AttackAngle":15.2}}') ]
输出:
以及概述选项卡中的评论。