我已经在这几天打破了这个问题。感觉它应该直观简单......真的希望有人可以提供帮助!
我从一些半结构化数据构建了一个org.nd4j.linalg.api.ndarray.INDArray
的单词出现,如下所示:
import org.nd4j.linalg.factory.Nd4j
import org.nd4s.Implicits._
val docMap = collection.mutable.Map[Int,Map[Int,Int]] //of the form Map(phrase -> Map(phrasePosition -> word)
val words = ArrayBuffer("word_1","word_2","word_3",..."word_n")
val windows = ArrayBuffer("$phrase,$phrasePosition_1","$phrase,$phrasePosition_2",..."$phrase,$phrasePosition_n")
var matrix = Nd4j.create(windows.length*words.length).reshape(windows.length,words.length)
for (row <- matrix.shape(0)){
for(column <- matrix.shape(1){
//+1 to (row,column) if word occurs at phrase, phrasePosition indicated by window_n.
}
}
val finalmatrix = matrix.T.dot(matrix) // to get co-occurrence matrix
到现在为止还挺好...
在这一点的下游,我需要将数据集成到Spark中的现有管道中,并使用pca等的实现,因此我需要创建一个DataFrame,或者至少需要一个RDD。如果我事先知道单词和/或窗口的数量,我可以做类似的事情:
case class Row(window : String, word_1 : Double, word_2 : Double, ...etc)
val dfSeq = ArrayBuffer[Row]()
for (row <- matrix.shape(0)){
dfSeq += Row(windows(row),matrix.get(NDArrayIndex.point(row), NDArrayIndex.all()))
}
sc.parallelize(dfSeq).toDF("window","word_1","word_2",...etc)
但是窗口和单词的数量是在运行时确定的。我正在寻找一个WindowsxWords org.apache.spark.sql.DataFrame
作为输出,输入是一个WindowsxWords org.nd4j.linalg.api.ndarray.INDArray
提前感谢您提供的任何帮助。
好的,所以经过几天的工作,看起来简单的答案是:没有一个。事实上,在这种情况下尝试使用Nd4j
似乎是一个坏主意,原因有以下几点:
INDArray
格式中,就很难(很难)获取数据。我也考虑过使用Breeze,这可能实际上提供了一个可行的解决方案但在分布式数据结构上带有一些相同的问题和can't be used。
不幸的是,使用原生Spark / Scala数据类型,虽然一旦你知道如何更容易,但是 - 至于像我这样的人来自Python + numpy + pandas天堂至少 - 令人痛苦的复杂和丑陋。
不过,我确实成功实现了这个解决方案:
import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,DenseMatrix,DenseVector}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
//first make a pseudo-matrix from Scala Array[Double]:
var rowSeq = Seq.fill(windows.length)(Array.fill(words.length)(0d))
//iterate through 'rows' and 'columns' to fill it:
for (row 0 until windows.length){
for (column 0 until words.length){
// rowSeq(row)(column) += 1 if word occurs at phrase, phrasePosition indicated by window_n.
}
}
//create Spark DenseMatrix
val rows : Array[Double] = rowSeq.transpose.flatten.toArray
val matrix = new DenseMatrix(windows.length,words.length,rows)
我需要Nd4J的主要操作之一是matrix.T.dot(matrix)
,但事实证明你不能将org.apache.spark.mllib.linalg.DenseMatrix
类型的2个矩阵相乘,其中一个(A)必须是org.apache.spark.mllib.linalg.distributed.RowMatrix
而且 - 你猜对了 - 你不能在matrix.transpose()
上打电话给RowMatrix
,只在DenseMatrix
上!因为它与这个问题并不真正相关,所以我将把这部分留下来,除了解释那个步骤的结果是RowMatrix
。信用也归功于here和here的解决方案的最后部分:
val rowMatrix : [RowMatrix] = transposeAndDotDenseMatrix(matrix)
// get DataFrame from RowMatrix via DenseMatrix
val newdense = new DenseMatrix(rowMatrix.numRows().toInt,rowMatrix.numCols().toInt,rowMatrix.rows.collect.flatMap(x => x.toArray)) // the call to collect() here is undesirable...
val matrixRows = newdense.rowIter.toSeq.map(_.toArray)
val df = spark.sparkContext.parallelize(matrixRows).toDF("Rows")
// then separate columns:
val df2 = (0 until words.length).foldLeft(df)((df, num) =>
df.withColumn(words(num), $"Rows".getItem(num)))
.drop("Rows")
很想听听有关这方面的改进和建议,谢谢。