Spark - 将CSV文件加载为DataFrame?

问题描述 投票:102回答:10

我想在spark中读取CSV并将其转换为DataFrame并使用df.registerTempTable("table_name")将其存储在HDFS中

我试过了:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

我得到的错误:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

在Apache Spark中将CSV文件作为DataFrame加载的正确命令是什么?

scala apache-spark hadoop apache-spark-sql hdfs
10个回答
127
投票

spark-csv是Spark核心功能的一部分,不需要单独的库。所以你可以这样做

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

在scala中,(这适用于任何格式 - 分隔符提及“,”对于csv,“\ t”对于tsv等)val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")


0
投票

默认文件格式是Parquet with spark.read ..和文件读取csv,说明你获得异常的原因。使用您尝试使用的api指定csv格式


0
投票

加载CSV文件并将结果作为DataFrame返回。

For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")


For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")

For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")

Dataframe将文件视为csv格式。


144
投票

Parse CSV and load as DataFrame/DataSet with Spark 2.x

首先默认初始化SparkSession对象,它将在shell中作为spark使用

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local")
        .appName("Spark CSV Reader")
        .getOrCreate;

使用以下任一方法将CSV加载为qazxsw poi

1.以程序化方式进行

DataFrame/DataSet

2. val df = spark.read .format("csv") .option("header", "true") //first line in file has headers .option("mode", "DROPMALFORMED") .load("hdfs:///csv/file/dir/file.csv")

You can do this SQL way as well

依赖关系:

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")


Spark version < 2.0

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,

依赖关系:

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

13
投票

它的Hadoop是2.6,Spark是1.6,没有“databricks”包。

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,

11
投票

使用Spark 2.0,您可以通过以下方式阅读CSV

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;

val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))

val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))

val df = sqlContext.createDataFrame(rdd, schema)

8
投票

在Java 1.8中此代码片段完美地用于读取CSV文件

POM.hml

val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
  .config(conf = conf)
  .appName("spark session example")
  .getOrCreate()

val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
  csv(path)

Java的

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

4
投票

Penny的Spark 2示例是在spark2中实现它的方法。还有一个技巧:通过对数据进行初始扫描为您生成标题,方法是将选项SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); // create Spark Context SparkContext context = new SparkContext(conf); // create spark Session SparkSession sparkSession = new SparkSession(context); Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); System.out.println("========== Print Schema ============"); df.printSchema(); System.out.println("========== Print Data =============="); df.show(); System.out.println("========== Print title =============="); df.select("title").show(); 设置为inferSchema

在这里,假设true是你设置的一个火花会话,是加载在S3上的亚马逊主机的所有Landsat图像的CSV索引文件中的操作。

spark

坏消息是:这会触发扫描文件;对于像这个20 + MB压缩CSV文件那样大的东西,在长途连接上可能需要30秒。请记住:一旦你进入架构,你最好手动编写架构编码。

(代码片段Apache软件许可证2.0被授权以避免所有歧义;我作为S3集成的演示/集成测试所做的事情)


3
投票

解析CSV文件存在很多挑战,如果文件大小较大,如果列值中存在非英语/转义/分隔符/其他字符,则可能会导致解析错误。

然后神奇的是在使用的选项中。那些适合我和希望的应该覆盖大多数边缘情况的代码如下:

  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *    http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */

val csvdata = spark.read.options(Map(
    "header" -> "true",
    "ignoreLeadingWhiteSpace" -> "true",
    "ignoreTrailingWhiteSpace" -> "true",
    "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
    "inferSchema" -> "true",
    "mode" -> "FAILFAST"))
  .csv("s3a://landsat-pds/scene_list.gz")

希望有所帮助。更多参考:### Create a Spark Session spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate() ### Note the options that are used. You may have to tweak these in case of error html_df = spark.read.csv(html_csv_file_path, header=True, multiLine=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, encoding="UTF-8", sep=',', quote='"', escape='"', maxColumns=2, inferSchema=True)

注意:上面的代码来自Spark 2 API,其中CSV文件读取API与Spark可安装的内置包捆绑在一起。

注意:PySpark是Spark的Python包装器,与Scala / Java共享相同的API。


1
投票

如果您正在使用scala 2.11和Apache 2.0或更高版本构建jar。

无需创建Using PySpark 2 to read CSV having HTML source codesqlContext对象。只有一个sparkContext对象满足所有需求的要求。

以下是mycode工作正常:

SparkSession

如果你在集群中运行,只需在定义import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} import org.apache.log4j.{Level, LogManager, Logger} object driver { def main(args: Array[String]) { val log = LogManager.getRootLogger log.info("**********JAR EXECUTION STARTED**********") val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate() val df = spark.read.format("csv") .option("header", "true") .option("delimiter","|") .option("inferSchema","true") .load("d:/small_projects/spark/test.pos") df.show() } } 对象时将.master("local")更改为.master("yarn")

Spark Doc涵盖了这个:sparkBuilder


1
投票

如果使用spark 2.0+,请尝试此操作

https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

注意: - 这适用于任何分隔文件。只需使用选项(“分隔符”,)来更改值。

希望这是有帮助的。

© www.soinside.com 2019 - 2024. All rights reserved.