下面的代码正常工作,即数据被写入输出表,并可在 10 秒内从表中选择。问题是 foreachBatch 没有执行。
当我用 .format("console") 测试它并调用 .start() 然后运行 foreachBatch。所以感觉 .toTable() 是罪魁祸首。
此代码使用的是 Kafka 连接器,但事件中心连接器也存在同样的问题。
如果我尝试在 toTable() 之后添加 .start() 得到错误
“StreamingQuery”对象没有属性“start”
这是除 foreachBatch 之外的有效代码
TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"
df = spark.readStream \
.format("kafka") \
.option("subscribe", TOPIC) \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "60000") \
.option("failOnDataLoss", "false") \
.option("startingOffsets", "earliest") \
.load()
n = 100
count = 0
def run_command(batchDF, epoch_id):
global count
count += 1
if count % n == 0:
spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")
...Omitted code where I transform the data in the value column to strongly typed data...
myTypedDF.writeStream \
.foreachBatch(run_command) \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
.partitionBy("somecolumn") \
.toTable("myunitycatalog.bronze.mytable")
你要么做
foreachBatch
或toTable
,但不能同时做。您可以将写入移动到 foreachBatch 函数内的表 - 只需确保您执行 idempotent writes 因为可以重新启动批处理。将您的代码更改为:
def run_command(batchDF, epoch_id):
global count
batchDF.write.format("delta") \
.option("txnVersion", epoch_id) \
.option("txnAppId", "my_app") \
.partitionBy("somecolumn") \
.mode("append") \
.saveAsTable("myunitycatalog.bronze.mytable")
count += 1
if count % n == 0:
spark.sql("OPTIMIZE myunitycatalog.bronze.mytable ZORDER BY (readtimestamp)")
myTypedDF.writeStream \
.foreachBatch(run_command) \
.outputMode("append") \
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
.start()