将数据帧写入并保存到 CSV 文件中会在 Pyspark 中引发错误

问题描述 投票:0回答:4

我以本地模式启动了pyspark,目的是为了学习。一切几乎都很顺利,直到我尝试使用以下代码将数据帧写入并保存到 CSV 文件中:

out_path = "data/sparkify_log_small.csv"
user_log.write.save(out_path, format="csv", header=True)

它给了我这个错误,同时创建一个带有 CSV 文件名称的空文件夹,而不编写文件本身,我一直在寻找解决方案,但没有任何线索:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-18-960f808bce3b> in <module>
----> 1 user_log.write.save(out_path, format="csv", header=True)

C:\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\sql\readwriter.py in save(self, path, format, mode, partitionBy, **options)
    737             self._jwrite.save()
    738         else:
--> 739             self._jwrite.save(path)
    740 
    741     @since(1.4)

C:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

C:\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

C:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o195.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 11, localhost, executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 F:\BOOKS\Data Analytics\Data Engineering Nanodegree\4. Data Lakes with Spark\Lesson 2 - Data Wrangling with Spark\Data and Exercises\data\sparkify_log_small.csv\_temporary\0\_temporary\attempt_20201002001955_0007_m_000000_11\part-00000-33b03dc3-af9a-4343-81cb-d97c53f31fd2-c000.csv
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    ... 32 more
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 F:\BOOKS\Data Analytics\Data Engineering Nanodegree\4. Data Lakes with Spark\Lesson 2 - Data Wrangling with Spark\Data and Exercises\data\sparkify_log_small.csv\_temporary\0\_temporary\attempt_20201002001955_0007_m_000000_11\part-00000-33b03dc3-af9a-4343-81cb-d97c53f31fd2-c000.csv
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    ... 1 more
dataframe csv pyspark file-io
4个回答
2
投票

我也犯了同样的错误。但是,我找到了一个主题并解决了我的问题。就我而言,我在站点中下载了正确版本的winutils:https://github.com/cdarlint/winutils在我下载的hadoop.dll文件夹中并将其放入与winutils.exe相同的路径中例如“C:\Spark\spark-3.2.1-bin-hadoop3.2 in”


0
投票

在 Spark 中你无法命名文件,因此如果你想要一个 csv 文件,你可以这样做,然后根据需要重命名它。

out_path = "data/"
user_log.repartition(1).write.option("header", "true").csv(out_path, mode = 'append')

0
投票

正确的解决方案需要结合本网站给出的答案。

  1. 您需要从站点获取 hadoop.dll,其中 winutils 已下载。

  2. 保存功能接受路径,而不是 .csv 文件,正如许多人想到的那样。

    真实的故事是,它要写一个hdfs 文件作为输出,并且需要一个路径。

    如果我们深入研究代码,我们可以推断出 将方法签名保存为: df.write.save(dir_path,“csv”,“追加”)

    其中 dir_path 是任何 Windows 路径。

    当然还有很多选择,但是 这适用于spark-3.1.2-bin-hadoop3.2, 在旧的 Windows 7 笔记本电脑上运行。

    生成的输出文件如下所示: 部分-00000-4e8efdcb-60e3-483a-b165- 9d1a67394a0c-c000.csv


0
投票

同样的问题。还是没有解决。我正在窗户上工作。我创建了 Hadoop 文件夹,并在其中创建了 bin 文件夹。 bin 文件夹包含 hadoop.dll 和 winutil 文件。我还设置了环境变量路径。仍然无法创建 csv 文件。我得到了文件夹,但没有得到 csv。出现如上问题所示的错误。有人可以帮忙吗?

© www.soinside.com 2019 - 2024. All rights reserved.