如何从代码外部提供spark / scala中的模式

问题描述 投票:0回答:1

我想读取ex:schema_file的文件,它将包含模式,并希望在代码中使用它来创建DataFrame

我已阅读有关ConfigFactory提供架构但无法使用它,因为将来可能会更改架构。

schema[
  {
     columnName = EXAMPLE_1
     type = string
  },
  {
     columnName = EXAMPLE_2
     type = string
  },
  {
     columnName = EXAMPLE_3
     type = string
  }
]

如果我使用这个,那么我必须读取每个columnName

    config.getString("schema.ColumnName1")

但列不固定,可以更改列数。

此外,我尝试使用案例类,但在那,我需要指定每个字段。

任何人都可以告诉我如何从代码外部读取模式。

scala apache-spark
1个回答
1
投票

您可以尝试使用此库来加载配置并将其映射到scala类:https://github.com/pureconfig/pureconfig

我希望这样的事能对你有用:

import scala.io.Source
import scala.util.parsing.combinator.syntactical.StandardTokenParsers

object Application extends App {
  override def main(args: Array[String]): Unit = {
    val fileContents = Source.fromFile("src/main/resources/schema_file").getLines.mkString
    print(ConfigDSL.parseSchema(fileContents))
  }
}

case class Schema(columns: List[Column])
case class Column(columnName: String, columnType: String)

object ConfigDSL extends StandardTokenParsers {
  lexical.delimiters ++= List("[", "]", "{", "}", ",", " ", "=", "\n")
  lexical.reserved ++= List("schema", "type", "columnName")

  def parseSchema(schemaString: String): Schema = 
    schema(new lexical.Scanner(schemaString)) match {
      case Success(columns, _) => Schema(columns)
      case Failure(msg, _) => throw new RuntimeException(msg)
      case Error(msg, _) => throw new RuntimeException(msg)
    }

  def schema: Parser[List[Column]] =
    "schema" ~ "[" ~ listOfColumns ~ "]" ^^ { case _ ~ _ ~ recipeList ~ _ => recipeList }

  def columnDefinition: Parser[Column] =
    "{" ~ "columnName" ~ "=" ~ ident ~ "type" ~ "=" ~ ident ~ "}" ^^ {
      case _ ~ _ ~ _ ~ column ~ _ ~ _ ~ columnType ~ _ => Column(column, columnType)
    }

  def listOfColumns: Parser[List[Column]] =
    repsep(columnDefinition, ",")  ^^ { stepList: List[Column] => stepList}
}
© www.soinside.com 2019 - 2024. All rights reserved.