因为我想从binaryFiles中提取数据我使用val dataRDD = sc.binaryRecord("Path")
读取文件我得到的结果为org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)]
我想以PortableDataStream
的形式提取我的文件内容
为此,我尝试:val data = dataRDD.map(x => x._2.open()).collect()
但我得到以下错误:java.io.NotSerializableException:org.apache.hadoop.hdfs.client.HdfsDataInputStream
如果您有任何想法我如何解决我的问题,请帮助!
提前谢谢了。
实际上,PortableDataStream
是Serializable。这就是它的意思。然而,open()
返回一个简单的DataInputStream
(在你的情况下为HdfsDataInputStream
,因为你的文件在HDFS上),这是不可序列化的,因此你得到的错误。
事实上,当你打开PortableDataStream
时,你只需要立即读取数据。在scala中,您可以使用scala.io.Source.fromInputStream
:
val data : RDD[Array[String]] = sc
.binaryFiles("path/.../")
.map{ case (fileName, pds) => {
scala.io.Source.fromInputStream(pds.open())
.getLines().toArray
}}
此代码假定数据是文本的。如果不是,您可以调整它以读取任何类型的二进制数据。下面是一个创建字节序列的示例,您可以按照自己的方式进行处理。
val rdd : RDD[Seq[Byte]] = sc.binaryFiles("...")
.map{ case (file, pds) => {
val dis = pds.open()
val bytes = Array.ofDim[Byte](1024)
val all = scala.collection.mutable.ArrayBuffer[Byte]()
while( dis.read(bytes) != -1) {
all ++= bytes
}
all.toSeq
}}
有关更多可能性,请参阅javadoc of DataInputStream
。例如,它拥有readLong
,readDouble
(等等)方法。