我正在尝试将MongoDB中的一些数据加载到Spark中。我已经定义了一个ReadConfig来指定数据库和集合。我还想应用一个过滤器,以避免丢失所有的集合。我跟随https://docs.mongodb.com/spark-connector/master/scala/aggregation/的例子如下:
val rc = ReadConfig(Map(“database” - >“myDB”,“collection” - >“myCol”),Some(ReadConfig(spark)))
val rdd = MongoSpark.load(spark,rc)
但是rdd没有任何名为withPipeline的函数(似乎它生成了一个regualr DataFrame而不是MongoRDD)我是否错过了导入的东西?我已经进口了
import com.mongodb.spark._
import spark.implicits._
我猜你在使用Spark.sparkContext
时使用spark 2.0使用MongoSpark.load
val collectionDf = MongoSpark.load(spark.sparkContext, readConfig)
val aggregatedRdd = collectionDf.withPipeline(Seq(Document.parse("{ $match: { _id: 'value' } }")))