spark non json to json和dataframe error

问题描述 投票:1回答:1

我有一个json类型的文件(不是真正的json结构),但我转换为json并通过spark read json读取(我们在spark 1.6.0中),我还不能使用来自spark 2的多行功能。它显示结果,但同时出错。任何帮助非常感谢。

我有这样的文档..只举了一个例子,但它是一个数组:

$result = [
            {
              'name' => 'R-2018:1583',
              'issue_date' => '2018-05-17 02:51:06',
              'type' => 'Product Enhancement Advisory', 
              'last_modified_date' => '2018-05-17 03:51:00',
              'id' => 273,
              'update_date' => '2018-05-17 02:51:06',
              'synopsis' => ' enhancement  update',
              'advory' => 'R:1583'
            }
                ]

我用过这样的:

jsonRDD = sc.wholeTextFiles("/user/xxxx/aa.json").map(lambda x: x[1]).map(lambda x:x.replace('$result =','')).map(lambda x: x.replace("'",'"')).map(lambda x:x.replace("\n","")).map(lambda x:x.replace("=>",":")).map(lambda x:x.replace("  ",""))
sqlContext.read.json(rdd).show() 

它显示结果,但我也得到以下错误,请帮忙。

18/08/31 11:19:30 WARN util.ExecutionListenerManager:在org.apache.spark.sql.query.analysis.QueryAnalysis $$ anonfun $ getInputMetadata $ 2.apply执行查询执行监听器java.lang.ArrayIndexOutOfBoundsException:0时出错QueryAnalysis.scala:121)atg.apache.spark.sql.query.analysis.QueryAnalysis $$ anonfun $ getInputMetadata $ 2.apply(QueryAnalysis.scala:108)at scala.collection.LinearSeqOptimized $ class.foldLeft(LinearSeqOptimized.scala: 111)位于com.cloudera.spark的org.apache.spark.sql.query.analysis.QueryAnalysis $ .getInputMetadata(QueryAnalysis.scala:108)的scala.collection.immutable.List.foldLeft(List.scala:84)。位于org.apache.spark.sql.util.ExecutionListenerManager的com.cloudera.spark.lineage.ClouderaNavigatorListener.onSuccess(ClouderaNavigatorListener.scala:54)中的lineage.ClouderaNavigatorListener.writeQueryMetadata(ClouderaNavigatorListener.scala:74)$$ anonfun $ onSuccess $ 1 $ $ anonfun $在org.apache.spark.sql.util.ExecutionListener上应用$ mcV $ sp $ 1.apply(QueryExecutionListener.scala:100) Manager $$ anonfun $ onSuccess $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(QueryExecutionListener.scala:99)at org.apache.spark.sql.util.ExecutionListenerManager $$ anonfun $ org $ apache $ spark $ sql $ util $ ExecutionListenerManager $$ withErrorHandling $ 1.apply(QueryExecutionListener.scala:121)at org.apache.spark.sql.util.ExecutionListenerManager $$ anonfun $ org $ apache $ spark $ sql $ util $ ExecutionListenerManager $$ withErrorHandling $ 1.apply( QueryExecutionListener.scala:119)at scala.collection.immutable.List.foreach(List.scala:318)at scala.collection.generic.TraversableForwarder $ class.foreach(TraversableForwarder.scala:32)at scala.collection.mutable.ListBuffer .foreach(ListBuffer.scala:45)atg.apache.spark.sql.util.ExecutionListenerManager.org $ apache $ spark $ sql $ util $ ExecutionListenerManager $$ withErrorHandling(QueryExecutionListener.scala:119)at org.apache.spark。在org.apache.spark.sql.util.ExecutionListenerManager $$ ano的sql.util.ExecutionListenerManager $$ anonfun $ onSuccess $ 1.apply $ mcV $ sp(QueryExecutionListener.scala:99) nfun $ onSuccess $ 1.apply(QueryExecutionListener.scala:99)org.apache.spark.spal.Exp位于org.apache.spark.spark.sqlback.DataFrame.withCallback的org.apache.spark.sql.util.ExecutionListenerManager.onSuccess(QueryExecutionListener.scala:98)中的.ExecutionListenerManager.readLock(QueryExecutionListener.scala:132)(DataFrame.scala:2116) )org.apache.spark.sql.DataFrame.head(DataFrame.scala:1389)org.apache.spark.sql.DataFrame.take(DataFrame.scala:1471)atg.apache.spark.sql.DataFrame。 showString(DataFrame.scala:184)at sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java: 606)位于py4j.reflection的一个py4j.reflection.MethodInvoke.invoke(MethodInvoker.java:231)和py4j.Gateway.invoke(Gateway.java:259)的py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:209)at java.lang.Thread。运行(Thread.java:745)

json apache-spark pyspark
1个回答
0
投票

json函数将json文件的路径作为参数,因此您需要先将json保存在某处,然后再读取此文件。

这样的事情应该有效

jsonRDD = sc.wholeTextFiles("/user/xxxx/aa.json")
            .map(lambda x: x[1])
            .map(lambda x:x.replace('$result =',''))
            .map(lambda x: x.replace("'",'"'))
            .map(lambda x:x.replace("\n",""))
            .map(lambda x:x.replace("=>",":"))
            .map(lambda x:x.replace("  ",""))
            .saveAsTextFile("/user/xxxx/aa_transformed.json") 
sqlContext.read.json(jsonRDD).show() 
© www.soinside.com 2019 - 2024. All rights reserved.