链接三角洲流编程提高分析感受

问题描述 投票:0回答:4

Situation:我正在生产一个带有以前流媒体查询A的数据的Delta文件夹,并从另一个DF中读取,如下所示

DF_OUT.writeStream.format("delta").(...).start("path")

(...)

DF_IN = spark.readStream.format("delta").load("path)

1 - 当我尝试在此过程中读取它时,随后的readstream(ETL管道的链接查询)。 2 - 当我在Scala替补中运行它时,它会顺利运行。

不确定那里发生了什么,但肯定会令人困惑。

org.apache.spark.sql.AnalysisException: Table schema is not set. Write data into it or use CREATE TABLE to set the schema.; at org.apache.spark.sql.delta.DeltaErrors$.schemaNotSetException(DeltaErrors.scala:365) at org.apache.spark.sql.delta.sources.DeltaDataSource.sourceSchema(DeltaDataSource.scala:74) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:209) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:95) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:95) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:171) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225) at org.apache.spark.ui.DeltaPipeline$.main(DeltaPipeline.scala:114)

delta湖快速指南 - 故障排除
apache-spark spark-structured-streaming delta-lake
4个回答
3
投票

表模式未设置错误

  • 问题: 当未存在增量表的路径并尝试从中流数据时,您将获得以下错误。 org.apache.spark.sql.analysisexception:未设置表模式。 将数据写入其中或使用创建表来设置模式。;

Solution: 确保创建了增量表的路径。

阅读错误消息后,我确实尝试成为一个好男孩并遵循建议,因此我试图确保在Delta文件夹中实际上试图从

0
投票

中读取的Delta文件夹中实际上有效数据打电话给readstream,瞧! def hasFiles(dir: String):Boolean = { val d = new File(dir) if (d.exists && d.isDirectory) { d.listFiles.filter(_.isFile).size > 0 } else false } DF_OUT.writeStream.format("delta").(...).start(DELTA_DIR) while(!hasFiles(DELTA_DIR)){ print("DELTA FOLDER STILL EMPTY") Thread.sleep(10000) } print("FOUND DATA ON DELTA A - WAITING 30 SEC") Thread.sleep(30000) DF_IN = spark.readStream.format("delta").load(DELTA_DIR) 最终工作了,但我必须确保等待足够的时间才能发生“某事发生”(不知道到底是什么,但是似乎来自Delta的阅读需要一些写作才能完成 - 也许是元数据? ,但是,这仍然是一个黑客。我希望可以开始从一个空的Delta文件夹开始阅读,并等待内容开始倒入其中。

对于我来说,我找不到一个简单的解决方案使用此替代方案的绝对路径:
spark.readStream.format("delta").table("tableName")

我在设置两个流时遇到了此错误:一个从kafka到本地的三角洲表A,然后从相同的本地三角洲表到另一个本地delta表B. 当我给出第一个流的时间超过processingTime

在开始流2之前运行时,它仍然给我带来错误。 trustation trustation trom

0
投票

绝望,我最终只是用以下内容来初始化本地三角洲表的初始化:

AnalysisException: [DELTA_UNSUPPORTED_SCHEMA_DURING_READ] Delta does not support specifying the schema at read time.

0
投票
...然后流入其中。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.