Situation:我正在生产一个带有以前流媒体查询A的数据的Delta文件夹,并从另一个DF中读取,如下所示
DF_OUT.writeStream.format("delta").(...).start("path")
(...)
DF_IN = spark.readStream.format("delta").load("path)
1 - 当我尝试在此过程中读取它时,随后的readstream(ETL管道的链接查询)。 2 - 当我在Scala替补中运行它时,它会顺利运行。
不确定那里发生了什么,但肯定会令人困惑。
org.apache.spark.sql.AnalysisException: Table schema is not set. Write data into it or use CREATE TABLE to set the schema.;
at org.apache.spark.sql.delta.DeltaErrors$.schemaNotSetException(DeltaErrors.scala:365)
at org.apache.spark.sql.delta.sources.DeltaDataSource.sourceSchema(DeltaDataSource.scala:74)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:209)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:95)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:95)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:171)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225)
at org.apache.spark.ui.DeltaPipeline$.main(DeltaPipeline.scala:114)
从delta湖快速指南 - 故障排除
中读取的Delta文件夹中实际上有效数据打电话给readstream,瞧!
def hasFiles(dir: String):Boolean = {
val d = new File(dir)
if (d.exists && d.isDirectory) {
d.listFiles.filter(_.isFile).size > 0
} else false
}
DF_OUT.writeStream.format("delta").(...).start(DELTA_DIR)
while(!hasFiles(DELTA_DIR)){
print("DELTA FOLDER STILL EMPTY")
Thread.sleep(10000)
}
print("FOUND DATA ON DELTA A - WAITING 30 SEC")
Thread.sleep(30000)
DF_IN = spark.readStream.format("delta").load(DELTA_DIR)
最终工作了,但我必须确保等待足够的时间才能发生“某事发生”(不知道到底是什么,但是似乎来自Delta的阅读需要一些写作才能完成 - 也许是元数据?
,但是,这仍然是一个黑客。我希望可以开始从一个空的Delta文件夹开始阅读,并等待内容开始倒入其中。
对于我来说,我找不到一个简单的解决方案使用此替代方案的绝对路径:
spark.readStream.format("delta").table("tableName")
我在设置两个流时遇到了此错误:一个从kafka到本地的三角洲表A,然后从相同的本地三角洲表到另一个本地delta表B.
当我给出第一个流的时间超过processingTime
绝望,我最终只是用以下内容来初始化本地三角洲表的初始化:
AnalysisException: [DELTA_UNSUPPORTED_SCHEMA_DURING_READ] Delta does not support specifying the schema at read time.