我正在使用 Spring Data Elasticsearch 4.2.5,我们有一项工作对特定的数据库表进行 ETL(提取、转换和加载数据)。我在作业运行时使用 Elasticsearch 为这些数据建立索引。数据将达到数百万条甚至更多。目前,我正在对每次迭代进行索引。我读到,在每次迭代中使用 elasticsearch 索引可能需要一些时间。我想使用像bulk-index这样的东西,但为此我需要将indexQuery对象添加到List中。添加数百万条记录到列表并进行批量索引可能会带来内存问题。
我需要应用类似的删除过程。当根据一些常见的ID删除记录时,我需要删除相关的弹性文档,这也将是数百万甚至更多。
有没有办法可以非常快地完成此要求的索引/删除?非常感谢任何帮助,如果我的理解不正确,请纠正我。
索引
for (Map.Entry<Integer, ObjectDetails> key : objectDetailsHashMap.entrySet()) {
indexDocument(elasticsearchOperations, key, oPath);
// other code to insert data in db table...
}
private void indexDocument(ElasticsearchOperations elasticsearchOperations,
Map.Entry<Integer, ObjectDetails> key, String oPath) {
String docId = "" + key.getValue().getCatalogId() + key.getValue().getObjectId();
byte[] nameBytes = key.getValue().getName();
byte[] physicalNameBytes = key.getValue().getPhysicalName();
byte[] definitionBytes = key.getValue().getDefinition();
byte[] commentBytes = key.getValue().getComment();
IndexQuery indexQuery = new IndexQueryBuilder()
.withId(docId)
.withObject(new MetadataSearch(
key.getValue().getObjectId(),
key.getValue().getCatalogId(),
key.getValue().getParentId(),
key.getValue().getTypeCode(),
key.getValue().getStartVersion(),
key.getValue().getEndVersion(),
nameBytes != null ? new String(nameBytes, StandardCharsets.UTF_8) : "-",
physicalNameBytes != null ? new String(physicalNameBytes, StandardCharsets.UTF_8) : "-",
definitionBytes != null ? new String(definitionBytes, StandardCharsets.UTF_8) : "-",
commentBytes != null ? new String(commentBytes, StandardCharsets.UTF_8) : "-",
oPath
))
.build();
elasticsearchOperations.index(indexQuery, IndexCoordinates.of("portal_idx"));
}
删除
private void deleteElasticDocuments(String catalogId) {
String queryText = martServerContext.getQueryCacheInstance().getQuery(QUERY_PORTAL_GET_OBJECTS_IN_PORTAL_BY_MODEL);
MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource();
mapSqlParameterSource.addValue("cId", Integer.parseInt(catalogId));
namedParameterJdbcTemplate.query(queryText, mapSqlParameterSource, (resultSet -> {
int objectId = resultSet.getInt(O_ID);
String docId = catalogId + objectId;
elasticsearchOperations.delete(docId, IndexCoordinates.of("portal_idx"));
}));
}
要添加文档,您可以使用批量索引,例如通过收集要在列表/数组或其他内容中索引的文档,当达到预定义的大小(例如 500 个条目)时,然后对这些文档进行批量插入。
对于删除,没有批量操作,但您可以收集要在列表或数组中删除的 id,并使用最大大小,然后使用
ElasticsearchOperations.idsQuery(List<String>)
为这些 id 创建查询并将其传递到 delete(query)
方法中。
编辑2021年9月29日:
idsQuery
刚刚在4.3分支中添加,它是这样简化的(https://github.com/spring-projects/spring-data-elasticsearch/blob/main/src/main/java/org/ springframework/data/elasticsearch/core/AbstractElasticsearchRestTransportTemplate.java#L193-L200):
@Override
public Query idsQuery(List<String> ids) {
Assert.notNull(ids, "ids must not be null");
return new NativeSearchQueryBuilder().withQuery(QueryBuilders.idsQuery().addIds(ids.toArray(new String[] {})))
.build();
}
使用
idsQuery
进行批量删除并不是性能方面的最佳选择。在底层,它只执行一个普通的“查询”来识别要删除的文档 ID,即使我们事先知道这些 ID。我们求助于(Kotlin 中的片段):
import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.opensearch.core.BulkRequest
import org.opensearch.client.opensearch.core.bulk.BulkOperation
import org.opensearch.client.opensearch.core.bulk.DeleteOperation
...
val ids = listOf(1, 2, 3)
val index = "index"
val bulkOperation = BulkOperation.Builder().run {
ids.forEach {
delete(DeleteOperation.Builder().id(it).index(index).build())
}
build()
}
val bulkRequest = BulkRequest.Builder().operations(listOf(bulkOperation)).build()
openSearchClient.bulk(bulkRequest)