使用 OpenSearch Python 批量 api 将数据插入到多个索引

问题描述 投票:0回答:2

本文档展示了如何使用curl中的POST请求插入具有多个索引的批量数据:https://opensearch.org/docs/latest/opensearch/index-data/

如果我有这种格式的数据,

[
{ "index": { "_index": "index-2022-06-08", "_id": "<id>" } }
{ "A JSON": "document" }
{ "index": { "_index": "index-2022-06-09", "_id": "<id>" } }
{ "A JSON": "document" }
{ "index": { "_index": "index-2022-06-10", "_id": "<id>" } }
{ "A JSON": "document" }
]

批量请求应从

"_index": "index-2022-06-08"

获取索引名称

我试图使用 OpenSearch-py 库来做同样的事情,但我找不到任何示例片段可以做到这一点。我正在使用这种格式从 AWS Lambda 发送请求。

client = OpenSearch(
            hosts = [{'host': host, 'port': 443}],
            http_auth = awsauth,
            use_ssl = True,
            verify_certs = True,
            connection_class = RequestsHttpConnection
            )
        
        resp = helpers.bulk(client, logs, index= index_name, max_retries = 3)

在这里,我必须提到index_name作为批量请求中的参数,因此它不会从数据本身获取index_name。如果我在参数中没有提及index_name,则会收到错误4xx index_name丢失。

我也在研究批量API源代码:https://github.com/opensearch-project/opensearch-py/blob/main/opensearchpy/helpers/actions.py#L373

看起来index_name不是强制参数。

任何人都可以帮我解决我缺少的东西吗?

elasticsearch opensearch
2个回答
6
投票

我遇到了同样的问题,并在 elasticsearch.pybulk-helpers 文档中找到了解决方案。当搜索端点返回的

_source-structure
中提供文档时,它就可以工作了。

批量方法的调用:

resp = helpers.bulk(
    self.opensearch,
    actions,
    max_retries=3,
)

其中

actions
是这样的字典列表:

[{
    '_op_type': 'update',
    '_index': 'index-name',
    '_id': 42,
    '_source': {
        "title": "Hello World!",
        "body": "..."
    }
}]

_op_type
可以用作附加字段来定义应为文档调用的操作(
index
update
delete
...)。

希望这可以帮助任何遇到同样问题的人!


1
投票

使用下面的代码希望您可以使用批量方法进行索引,在opensearch中有两种方法对文档进行索引,其中一种方法是批量方法

from opensearchpy import OpenSearch, helpers
from opensearchpy.helpers import bulk

client = OpenSearch(
    hosts=[{"host": "localhost", "port": 9200}],
    http_auth=("admin", "admin"),
    use_ssl=True,
    verify_certs=False,
    ssl_assert_hostname=False,
    ssl_show_warn=False,
)

inputtobeindexed = [
    {"index": {"_index": "index-2022-06-08", "_id": "<id>"}},
    {"A JSON": "document"},
    {"index": {"_index": "index-2022-06-09", "_id": "<id>"}},
    {"A JSON": "document"},
    {"index": {"_index": "index-2022-06-10", "_id": "<id>"}},
    {"A JSON": "document"},
]

search_index_name = "yourindexname"
bulk_data = [
    {"_index": search_index_name, "_id": i, "_source": doc} for i, doc in enumerate()
]

bulk(client, bulk_data)

© www.soinside.com 2019 - 2024. All rights reserved.