Scala:如何从RDD获取PortableDataStream实例的内容

问题描述 投票:0回答:1

因为我想从binaryFiles中提取数据我使用val dataRDD = sc.binaryRecord("Path")读取文件我得到的结果为org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)]

我想以PortableDataStream的形式提取我的文件内容

为此,我尝试:val data = dataRDD.map(x => x._2.open()).collect()但我得到以下错误:java.io.NotSerializableException:org.apache.hadoop.hdfs.client.HdfsDataInputStream

如果您有任何想法我如何解决我的问题,请帮助!

提前谢谢了。

scala apache-spark rdd
1个回答
0
投票

实际上,PortableDataStream是Serializable。这就是它的意思。然而,open()返回一个简单的DataInputStream(在你的情况下为HdfsDataInputStream,因为你的文件在HDFS上),这是不可序列化的,因此你得到的错误。

事实上,当你打开PortableDataStream时,你只需要立即读取数据。在scala中,您可以使用scala.io.Source.fromInputStream

val data : RDD[Array[String]] = sc
    .binaryFiles("path/.../")
    .map{ case (fileName, pds) => {
        scala.io.Source.fromInputStream(pds.open())
            .getLines().toArray
    }}

此代码假定数据是文本的。如果不是,您可以调整它以读取任何类型的二进制数据。下面是一个创建字节序列的示例,您可以按照自己的方式进行处理。

val rdd : RDD[Seq[Byte]] = sc.binaryFiles("...")
    .map{ case (file, pds) => {
        val dis = pds.open()
        val bytes = Array.ofDim[Byte](1024)
        val all = scala.collection.mutable.ArrayBuffer[Byte]()
        while( dis.read(bytes) != -1) {
            all ++= bytes
        }
        all.toSeq
    }}

有关更多可能性,请参阅javadoc of DataInputStream。例如,它拥有readLongreadDouble(等等)方法。

© www.soinside.com 2019 - 2024. All rights reserved.