我目前正在运行一个 Python 脚本,将大约 1000 万个向量的数据集插入到 Milvus 集合中。虽然插入过程对于前 40 批 10,000 条记录运行良好,但此后我始终遇到以下错误:
[describe_collection] retry: X, cost: Y, reason: <_MultiThreadedRendezvous: StatusCode.UNAVAILABLE, failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:19530: connection attempt timed out before receiving SETTINGS frame>
这是我用于插入的脚本:
import numpy as np
from dtx_data_tools.iterate import batched, map_threaded
times = []
files = [f'agg_dataset/{fp}' for fp in os.listdir('agg_dataset') if 'parquet' in fp]
db_ids_set = set()
counter = 0
for batch_file in files:
prep_start = time.time()
df = pd.read_parquet(batch_file).drop_duplicates(subset='scrape_uuid', keep="last")
insert_ids = set(df['scrape_uuid'].tolist())
new_uuids = insert_ids - db_ids_set
df = df[df['scrape_uuid'].isin(new_uuids)]
db_ids_set.update(new_uuids)
prep_end = time.time() - prep_start
print(f"prep time took {prep_end} seconds")
start = time.time()
transform_time = time.time()
df['content_vector'] = df['encoding'].apply(lambda x: np.frombuffer(x, dtype=np.float32))
df = df[['scrape_uuid', 'content_vector']]
print(f"Transform time: {time.time() - transform_time} seconds")
try:
client.insert(
collection_name="milvus_orb_benchmark",
data=df.to_dict('records')
)
counter += 10000
print(f"Imported {counter} articles..., batch {batch_file} uploaded")
end = time.time() - start
print(f"batch insert_time took {end} seconds")
times.append(end)
我怀疑这个问题与 gRPC 服务器连接超时有关。我还附上了我的 Milvus Operator 配置以供参考:
apiVersion: v1
kind: ServiceAccount
metadata:
name: milvus
annotations:
eks.amazonaws.com/role-arn: xxxxxx
apiVersion: milvus.io/v1beta1
kind: Milvus
metadata:
name: milvus
labels:
app: milvus
spec:
components:
serviceAccountName: milvus
config:
minio:
bucketName: xxxxxx
# enable AssumeRole
useIAM: true
useSSL: true
dependencies:
storage:
external: true
type: S3
endpoint:xxxxxxxx
secretRef: ""
我的问题是:
MultiThreadedRendezvous: StatusCode.UNAVAILABLE
错误?您可以从增加 Milvus 配置文件中的 gRPC 服务器超时和保持活动设置开始。调整 grpc.server_keepalive_time_ms 和 grpc.server_keepalive_timeout_ms 等参数可确保连接在长时间插入操作期间保持活动状态。此外,减少数据插入的批量大小有助于减轻服务器过载;虽然 10,000 条记录最初可能会起作用,但将其降低到较小的大小(例如 5,000 条)可以防止随着时间的推移出现瓶颈。通过并行化优化插入过程可以进一步提高效率,但要确保每个线程或进程使用独立的连接,以避免服务器不堪重负。监控 Milvus 服务器的资源利用率至关重要; CPU、RAM 或网络带宽不足可能会导致超时,因此可能需要扩展硬件资源或在分布式设置中部署 Milvus。实施强大的错误处理,以使用指数退避策略捕获和重试失败的插入,从而防止立即重试导致服务器不堪重负。查看 Milvus 服务器日志和监控系统指标将有助于识别导致问题的特定约束或错误配置,从而实现有针对性的解决方案。