Milvus 数据插入过程中多线程 Rendezvous 错误 - 如何修复?

问题描述 投票:0回答:1

我目前正在运行一个 Python 脚本,将大约 1000 万个向量的数据集插入到 Milvus 集合中。虽然插入过程对于前 40 批 10,000 条记录运行良好,但此后我始终遇到以下错误:

[describe_collection] retry: X, cost: Y, reason: <_MultiThreadedRendezvous: StatusCode.UNAVAILABLE, failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:19530: connection attempt timed out before receiving SETTINGS frame>

这是我用于插入的脚本:

import numpy as np
from dtx_data_tools.iterate import batched, map_threaded

times = []
files = [f'agg_dataset/{fp}' for fp in os.listdir('agg_dataset') if 'parquet' in fp]

db_ids_set = set()
counter = 0

for batch_file in files:
    prep_start = time.time()
    df = pd.read_parquet(batch_file).drop_duplicates(subset='scrape_uuid', keep="last")
    
    insert_ids = set(df['scrape_uuid'].tolist())

    new_uuids = insert_ids - db_ids_set
    df = df[df['scrape_uuid'].isin(new_uuids)]
    
    db_ids_set.update(new_uuids)
    
    prep_end  = time.time() - prep_start
    print(f"prep time took {prep_end} seconds")
    
    start = time.time()
    transform_time = time.time()
    df['content_vector'] = df['encoding'].apply(lambda x: np.frombuffer(x, dtype=np.float32))
    df = df[['scrape_uuid', 'content_vector']]
    print(f"Transform time: {time.time() - transform_time} seconds")
    
    try:
        client.insert(
            collection_name="milvus_orb_benchmark",
            data=df.to_dict('records')
        )
    
    counter += 10000
    print(f"Imported {counter} articles..., batch {batch_file} uploaded")
    end = time.time() - start
    print(f"batch insert_time took {end} seconds")
    times.append(end)

我怀疑这个问题与 gRPC 服务器连接超时有关。我还附上了我的 Milvus Operator 配置以供参考:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: milvus
  annotations:
    eks.amazonaws.com/role-arn: xxxxxx
apiVersion: milvus.io/v1beta1
kind: Milvus
metadata:
  name: milvus
  labels:
    app: milvus
spec:
  components:
    serviceAccountName: milvus
  config:
    minio:
      bucketName: xxxxxx
      # enable AssumeRole
      useIAM: true
      useSSL: true
  dependencies:
    storage:
      external: true
      type: S3
      endpoint:xxxxxxxx
      secretRef: ""

我的问题是:

  1. 如何解决
    MultiThreadedRendezvous: StatusCode.UNAVAILABLE
    错误?
  2. 我应该在 Milvus 配置或 gRPC 服务器中修改哪些特定设置,以防止在大数据插入过程中连接超时?
  3. 是否有更好的方法来处理将如此大的数据集插入 Milvus 而不会遇到此问题?
python vector-database milvus
1个回答
0
投票

您可以从增加 Milvus 配置文件中的 gRPC 服务器超时和保持活动设置开始。调整 grpc.server_keepalive_time_ms 和 grpc.server_keepalive_timeout_ms 等参数可确保连接在长时间插入操作期间保持活动状态。此外,减少数据插入的批量大小有助于减轻服务器过载;虽然 10,000 条记录最初可能会起作用,但将其降低到较小的大小(例如 5,000 条)可以防止随着时间的推移出现瓶颈。通过并行化优化插入过程可以进一步提高效率,但要确保每个线程或进程使用独立的连接,以避免服务器不堪重负。监控 Milvus 服务器的资源利用率至关重要; CPU、RAM 或网络带宽不足可能会导致超时,因此可能需要扩展硬件资源或在分布式设置中部署 Milvus。实施强大的错误处理,以使用指数退避策略捕获和重试失败的插入,从而防止立即重试导致服务器不堪重负。查看 Milvus 服务器日志和监控系统指标将有助于识别导致问题的特定约束或错误配置,从而实现有针对性的解决方案。

© www.soinside.com 2019 - 2024. All rights reserved.