MongoDB Scala - 查询特定字段值的文档

问题描述 投票:1回答:2

所以我知道在Mongo Shell中,你使用点符号来获取任何文档中所需的字段。

如何在MongoDB Scala中实现点符号。我对它是如何工作感到困惑。以下是从集合中提取文档的代码:

val record = collection.find().projection(fields(include("offset"), excludeId())).limit(1)

编辑:

我正在努力研究一种机制,在消费者关闭的时候基本上重新消费Kafka记录。为此,我将我的kafka记录存储在外部数据库中,然后尝试从那里获取最近的偏移并从该点开始消耗。这是我应该这样做的Scala方法:

def getLatestCommitOffsetFromDB(collectionName: String): Long = {

import com.mongodb.Block
import org.bson.Document

val printBlock = new Block[Document]() {
  override def apply(document: Document): Unit = {
    println(document.toJson)
  }
}

import com.mongodb.async.SingleResultCallback
val callbackWhenFinished = new SingleResultCallback[Void]() {
  override def onResult(result: Void, t: Throwable): Unit = {
    System.out.println("Latest offset fetched from database.")
  }
}

var obj: String = " "

try {

  val record = collection.find().projection(fields(include("offset"), excludeId())).limit(1)
  //TODO FIND A WAY TO GET THE VALUE AND STORE IT IN A VARIABLE

} catch {
  case e: RuntimeException =>
    logger.error(s"MongoDB Server Error : Unable to fetch data from collection : $collection")
    logger.error(e.printStackTrace().toString())
}

obj.toLong

}

问题不在于我可以从Mongo获取文档,而是我试图访问Mongo中的特定字段。文档中有四个字段:主题,分区,消息和偏移量。我想获取“offset”字段并将其存储在变量中,因此我可以将其用作重新启动点来重新使用Kafka记录。

我从哪里去?

POM.hml

<?xml version="1.0" encoding="UTF-8"?>

HTTP://maven.Apache.org/下水道/maven-4.0.0.下水道"> 4.0.0

<groupId>OffsetManagementPoC</groupId>
<artifactId>OffsetManagementPoC</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-compiler</artifactId>
        <version>2.11.8</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.6.5</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.6.5</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.6.5</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah_2.12</artifactId>
        <version>3.1.1</version>
        <type>pom</type>
    </dependency>
    <dependency>
        <groupId>com.typesafe</groupId>
        <artifactId>config</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-driver_2.12</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-compiler</artifactId>
        <version>2.11.8</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>3.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-driver_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>bson</artifactId>
        <version>3.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-async</artifactId>
        <version>3.4.3</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-bson_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

mongodb scala mongo-java-driver
2个回答
1
投票

您可以这样修改查询:

import com.mongodb.MongoClient
import com.mongodb.client.MongoCollection
import com.mongodb.client.model.Projections

def getLatestCommitOffsetFromDB(
  databaseName: String,
  collectionName: String
): Long = {

  val mongoClient = new MongoClient("localhost", 27017);

  val collection =
    mongoClient.getDatabase(databaseName).getCollection(collectionName)

  val record = collection
    .find()
    .projection(
      Projections
        .fields(Projections.include("offset"), Projections.excludeId()))
    .first

  record.get("offset").asInstanceOf[Double].toLong
}

我想你错过了com.mongodb.client.model.Projections的进口,以便使用fieldsincludeexcludeId

我使用first而不是limit(1)来更容易提取结果。

first返回一个Document对象,您可以在其上调用get来检索所请求字段的值。

但事实上,既然你只想要一个记录和一个字段,你可以删除投影!:

val record = collection.find().first

0
投票

根据文件,collection.find()接受com.mongodb.DBObject

您可以使用的界面的一个实现是BasicDBObject,它基本上像mutable.Map[String, Object]。您可以使用接受地图的构造函数:

val query = new com.mongodb.BasicDBObject(Map(
  "foo.bar" -> "value1"
  "bar.foo" -> "value2"
))
val record = collection.find(query)....
© www.soinside.com 2019 - 2024. All rights reserved.