本文档展示了如何使用curl中的POST请求插入具有多个索引的批量数据:https://opensearch.org/docs/latest/opensearch/index-data/
如果我有这种格式的数据,
[
{ "index": { "_index": "index-2022-06-08", "_id": "<id>" } }
{ "A JSON": "document" }
{ "index": { "_index": "index-2022-06-09", "_id": "<id>" } }
{ "A JSON": "document" }
{ "index": { "_index": "index-2022-06-10", "_id": "<id>" } }
{ "A JSON": "document" }
]
批量请求应从
"_index": "index-2022-06-08"
获取索引名称
我试图使用 OpenSearch-py 库来做同样的事情,但我找不到任何示例片段可以做到这一点。我正在使用这种格式从 AWS Lambda 发送请求。
client = OpenSearch(
hosts = [{'host': host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)
resp = helpers.bulk(client, logs, index= index_name, max_retries = 3)
在这里,我必须提到index_name作为批量请求中的参数,因此它不会从数据本身获取index_name。如果我在参数中没有提及index_name,则会收到错误4xx index_name丢失。
我也在研究批量API源代码:https://github.com/opensearch-project/opensearch-py/blob/main/opensearchpy/helpers/actions.py#L373
看起来index_name不是强制参数。
任何人都可以帮我解决我缺少的东西吗?
我遇到了同样的问题,并在 elasticsearch.pybulk-helpers 文档中找到了解决方案。当搜索端点返回的
_source-structure
中提供文档时,它就可以工作了。
批量方法的调用:
resp = helpers.bulk(
self.opensearch,
actions,
max_retries=3,
)
其中
actions
是这样的字典列表:
[{
'_op_type': 'update',
'_index': 'index-name',
'_id': 42,
'_source': {
"title": "Hello World!",
"body": "..."
}
}]
_op_type
可以用作附加字段来定义应为文档调用的操作(index
、update
、delete
...)。
希望这可以帮助任何遇到同样问题的人!
使用下面的代码希望您可以使用批量方法进行索引,在opensearch中有两种方法对文档进行索引,其中一种方法是批量方法
from opensearchpy import OpenSearch, helpers
from opensearchpy.helpers import bulk
client = OpenSearch(
hosts=[{"host": "localhost", "port": 9200}],
http_auth=("admin", "admin"),
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
inputtobeindexed = [
{"index": {"_index": "index-2022-06-08", "_id": "<id>"}},
{"A JSON": "document"},
{"index": {"_index": "index-2022-06-09", "_id": "<id>"}},
{"A JSON": "document"},
{"index": {"_index": "index-2022-06-10", "_id": "<id>"}},
{"A JSON": "document"},
]
search_index_name = "yourindexname"
bulk_data = [
{"_index": search_index_name, "_id": i, "_source": doc} for i, doc in enumerate()
]
bulk(client, bulk_data)