我目前正在学习 scala-spark,所以请耐心等待。
我正在尝试将函数应用于 scala 数据框来创建一个新列,如下所示 -
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
object DatasetName extends Enumeration {
type DatasetNameType = Value
val XYZ: DatasetName.Value = Value("xyz")
}
object CommonEntity {
object Fields {
val YEAR_COL = "year"
val MONTH_COL = "month"
}
}
object Mappings {
import DatasetName._
import CommonEntity._
val PERIOD_MAPPING: Map[DatasetName.Value, List[String]] = Map(
XYZ -> List(CommonEntity.Fields.YEAR_COL, CommonEntity.Fields.MONTH_COL)
)
}
def withPeriod(datasetName: DatasetName.Value): DataFrame = {
if (Mappings.PERIOD_MAPPING.contains(datasetName)) {
val columnList = Mappings.PERIOD_MAPPING(datasetName)
val yearCol = columnList.apply(0)
val monthCol = columnList.apply(1)
df.withColumn("period", concat(col(yearCol),col(monthCol)))
}
else df
所以,当我运行
df.withPeriod(DatasetName.XYZ)
时,会出现如下错误 -
<console>:163: error: value withPeriod is not a member of org.apache.spark.sql.DataFrame
但是,它适用于
withPeriod(DatasetName.XYZ).show()
所以,我有点困惑。请帮忙。
下面是我的输入数据集
val df = Seq(("2023", "Jul"), ("2022", "Dec")).toDF("year", "month")
如果您希望能够在现有类型(如
df.withPeriod(...)
)上调用您的方法,则需要定义所谓的 扩展方法。
在 Scala 2 中,这是通过定义隐式类来实现的,如下所示:
object DataframeExtensions {
implicit class DataframePeriodExtension(df: Dataframe) {
def withPeriod(...) = {
// You can refer to df here
???
}
}
}
并像这样使用它:
import DataframeExtensions._
val someDataframe: Dataframe = ???
someDataframe.withPeriod(...)
请注意,这只是为了方便。使用普通的旧方法和调用
withPeriod(df, ...)
是完全相同的。