查找集合中的行数

问题描述 投票:0回答:1

我使用 upsert 函数多次插入具有唯一主键值的大约 7k 行,以了解 upsert 的工作原理。插入后,我尝试使用以下方法查找集合中的行数:

collection.num_entities

num_entities = client.get_collection_stats(collection_name)["row_count"]

print(f"Number of entities in the collection: {num_entities}")

collection.query(output_fields=["count(*)"])

他们每个人都返回计数为 46k,其中包括应该删除的行。虽然这恰好返回了 7k 个唯一 ID :

query_result = collection.query(
    expr="id != ''",
    output_fields=["id"]
)

在进行搜索或查询时我也看不到重复的行。造成这种行为的原因是什么?如果我想获取集合中的实际行数(不包括已删除的行)如何获取?

python database vector-database milvus
1个回答
0
投票
  • collection.num_entities
    内部调用
    client.get_collection_stats()
    我们知道,milvus 按段管理数据,每个密封段的元数据(包括初始行数、s3 路径)都记录在 Etcd 中。删除操作不会直接影响密封段,所有删除的 id 都存储在 S3 中的“delta_log”中,而 Etcd 中的初始行数不会更改。
    client.get_collection_stats()
    快速迭代 etcd 中的段,将每个段的初始行数相加,并返回总和值。因此,它不计算已删除的项目。当用户不想加载集合并快速获取原始数字时,此界面有时很有用。
  • collection.query(output_fields=["count(*)"])
    这是一个真正的查询操作,需要加载集合。它扫描所有的段,包括密封段和增长段,删除的项目被计数。 通常,如果需要精确的数字,则需要将consistency_level设置为“Strong”,以确保Pulsar中的所有数据都被查询节点消耗,从而使数据可查询。
results = collection.query(expr="", output_fields=["count(*)"], consistency_level="Strong")

query_result = collection.query(expr="id != ''", output_fields=["id"])
也是一个真正的查询操作,返回集合中所有唯一的id。

为了清楚地理解它,请阅读以下示例:

import random
import time

from pymilvus import (
    connections, utility, Collection, FieldSchema, CollectionSchema, DataType)

connections.connect(host="localhost", port=19530)

dim = 128
metric_type = "COSINE"

collection_name = "test"

schema = CollectionSchema(
    fields=[
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
        FieldSchema(name="vector", dtype = DataType.FLOAT_VECTOR, dim=dim),
    ])
utility.drop_collection(collection_name)
collection = Collection(collection_name, schema)
print(collection_name, "created")

index_params = {
    'metric_type': metric_type,
    'index_type': "FLAT",
    'params': {},
}
collection.create_index(field_name="vector", index_params=index_params)
collection.load()

# insert 7k items with unique ids
batch = 7000
data = [
    [k for k in range(batch)],
    [[random.random() for _ in range(dim)] for _ in range(batch)],
]
collection.insert(data=data)
print(collection_name, "data inserted")

# call upsert to repeat update the 7k items
for i in range(6):
    collection.upsert(data=data)
print(collection_name, "data upserted")

collection.flush() # call flush() here to force the buffer to be flushed to sealed segments. In practice, we don't recommend manually call this method
print("num_entities =", collection.num_entities) # expect 49k

# consistency_level = Stong to ensure all the data is consumed by querynode and queryable
# see https://milvus.io/docs/consistency.md#Consistency-levels
results = collection.query(expr="", output_fields=["count(*)"], consistency_level="Strong")
print("query((count(*)) =", results) # expect 7k

query_result = collection.query(expr="id >= 0", output_fields=["id"], consistency_level="Strong")
print("query(id >= 0) =", len(query_result)) # expect 7k

该脚本的结果:

test created
test data inserted
test data upserted
num_entities = 49000
query((count(*)) = data: ["{'count(*)': 7000}"] 
query(id >= 0) = 7000
© www.soinside.com 2019 - 2024. All rights reserved.