使用 toTable 在 Databricks 中写入流不会执行 foreachBatch

问题描述 投票:0回答:1

下面的代码正常工作,即数据被写入输出表,并可在 10 秒内从表中选择。问题是 foreachBatch 没有执行。

当我用 .format("console") 测试它并调用 .start() 然后运行 foreachBatch。所以感觉 .toTable() 是罪魁祸首。

此代码使用的是 Kafka 连接器,但事件中心连接器也存在同样的问题。

如果我尝试在 toTable() 之后添加 .start() 得到错误

“StreamingQuery”对象没有属性“start”

这是除 foreachBatch 之外的有效代码

TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

n = 100
count = 0

def run_command(batchDF, epoch_id):
    global count
    count += 1
    if count % n == 0:
        spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")

...Omitted code where I transform the data in the value column to strongly typed data...

myTypedDF.writeStream \
    .foreachBatch(run_command) \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
    .partitionBy("somecolumn") \
    .toTable("myunitycatalog.bronze.mytable")
databricks spark-streaming azure-databricks spark-structured-streaming delta-lake
1个回答
1
投票

你要么做

foreachBatch
toTable
,但不能同时做。您可以将写入移动到 foreachBatch 函数内的表 - 只需确保您执行 idempotent writes 因为可以重新启动批处理。将您的代码更改为:

def run_command(batchDF, epoch_id):
    global count
    batchDF.write.format("delta") \
       .option("txnVersion", epoch_id) \
       .option("txnAppId", "my_app") \
       .partitionBy("somecolumn") \
       .mode("append") \
       .saveAsTable("myunitycatalog.bronze.mytable")
    count += 1
    if count % n == 0:
        spark.sql("OPTIMIZE myunitycatalog.bronze.mytable ZORDER BY (readtimestamp)")

myTypedDF.writeStream \
    .foreachBatch(run_command) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
    .start()
© www.soinside.com 2019 - 2024. All rights reserved.