如何使用sc.textFile从本地文件系统加载文件到Spark?我需要更改任何 -env 变量吗?另外,当我在未安装 Hadoop 的 Windows 上尝试相同操作时,我遇到了同样的错误。
> val inputFile = sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")
/17 22:28:18 INFO MemoryStore: ensureFreeSpace(63280) called with curMem=0, maxMem=278019440
/17 22:28:18 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.8 KB, free 265.1 MB)
/17 22:28:18 INFO MemoryStore: ensureFreeSpace(19750) called with curMem=63280, maxMem=278019440
/17 22:28:18 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.3 KB, free 265.1 MB)
/17 22:28:18 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:53659 (size: 19.3 KB, free: 265.1 MB)
/17 22:28:18 INFO SparkContext: Created broadcast 0 from textFile at <console>:21
File: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
> val words = input.flatMap(line => line.split(" "))
ole>:19: error: not found: value input
val words = input.flatMap(line => line.split(" "))
^
> val words = inputFile.flatMap(line => line.split(" "))
: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:23
> val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/c:/spark-1.4.1-bin-hadoop2.6/bin/file/C:/Users/swaapnika/Desktop/to do list
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:290)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:290)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:289)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC.<init>(<console>:38)
at $iwC$$iwC.<init>(<console>:40)
at $iwC.<init>(<console>:42)
at <init>(<console>:44)
at .<init>(<console>:48)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
我再次检查了所有的依赖项和环境变量。 实际路径“file:///home/..../.. .txt”将从本地文件系统获取数据,因为 hadoop env.sh 文件的默认文件系统设置为 fs.defaultFs。 如果我们将 Spark-env.sh 保留为默认值而不做任何更改,它会在遇到“file://...”时使用本地文件系统,而在路径为“hdfs://..”时使用 hdfs。 如果您特别需要任何文件系统,请将 HADOOP_CONF_DIR 导出到 spark-env.sh 它将支持 Hadoop 支持的任何文件系统。 这是我的观察。接受任何更正或建议。谢谢
尝试改变
val inputFile = sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")
对此:
val inputFile = sc.textFile("file:///Users/swaapnika/Desktop/to do list")
我对 hadoop 和 spark 也很陌生,但据我所知,当在 Windows 上本地运行 spark 时,传递给
file:///
的字符串 sc.textFile
已经指代 C:\
.
您定义的文件路径不正确
尝试改变
sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")
到
sc.textFile("file://C:/Users/swaapnika/Desktop/to do list")
或
sc.textFile("C:/Users/swaapnika/Desktop/to do list")
在集群中运行 spark 时会出现此错误。当您将作业提交给 spark 集群时,集群管理器(YARN 或 Mesos 或任何)会将其提交给工作节点。当工作节点试图找到我们需要加载到 spark 中的文件的路径时,它失败了,因为工作节点没有这样的文件。所以尝试在本地模式下运行 spark-shell 再试一次,
\bin\spark-shell --master local
sc.textFile("file:///C:/Users/swaapnika/Desktop/to do list")
让我知道这是否有帮助。