我正在尝试在 Spark 3 中实现
TableProvider
来写入自定义格式。我对阅读不感兴趣,所以看起来像:
class MySpecialDataSource extends TableProvider {
override def inferSchema(caseInsensitiveStringMap: CaseInsensitiveStringMap): StructType = {
// caseInsensitiveStringMap contains path -> /tmp/path/to/write/to
???
}
override def getTable(structType: StructType, transforms: Array[Transform], map: util.Map[String, String]): Table = {
new MySpecialTable(structType)
}
}
class MySpecialTable(val schema: StructType) extends Table with SupportsWrite {
???
}
我尝试这样使用它:
df.write
.format("my.package.MySpecialDataSource")
.mode("append")
.save("/tmp/path/to/write/to")
首先,仅使用我传递给
inferSchema
的路径来调用 save
。如果我返回一个空模式,稍后我会收到此错误(我的数据框有一个名为 acolumn
的列):
Cannot write to 'atable', too many data columns:
Table columns: .
Data columns: 'acolumn'.
如果这是我第一次编写,我如何“推断”模式?我应该实现与
TableProvider
不同的抽象吗?