ElasticSearch NEST:批量索引操作未使用指定的文档ID

问题描述 投票:0回答:1

我目前正在使用ElasticSearch NEST 7.x库。

在托管我的ElasticSearch主节点的VM上,我正在运行一个Web服务器,该服务器通过REST接收JSON数据。然后将这些JSON数据保存在ElasticSearch中。

首先,将接收到的JSON数据传递到此方法中进行解析:

private static (bool Success, string ErrorMessage) TryReadRawJsonData(
    string rawJsonData, out IEnumerable<(string Index, ExpandoObject JsonContent)> jsonLines)
{
    var results = new List<(string Index, ExpandoObject JsonContent)>();

    foreach (string rawDataLine in HttpContext.Current.Server.UrlDecode(rawJsonData).Split('\n').Where(line => !string.IsNullOrWhiteSpace(line)))
    {
        dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(rawDataLine);

        if (!Dynamic.HasProperty(expandoObject, "IndexId"))
        {
            jsonLines = Enumerable.Empty<(string, ExpandoObject)>();
            return (Success: false, ErrorMessage: $"No field named 'IndexId' found in {rawDataLine}.");
        }

        string indexId = (string)expandoObject.IndexId.ToLower();
        results.Add((indexId, JsonContent: expandoObject));
    }

    jsonLines = results;
    return (Success: true, ErrorMessage: null);
}

如果成功解析,则随后将返回值传递到此方法中以进行批量索引:

private static async Task<HttpResponseMessage> BulkIndexAsync(IEnumerable<(string Index, ExpandoObject JsonContent)> contents)
{
    foreach (var group in contents.GroupBy(line => line.Index))
    {
        BulkResponse bulkIndexResponse = 
            await ElasticClient.BulkAsync(bulk => bulk.Index(group.Key).IndexMany(group.Select(member => member.JsonContent)));

        if (bulkIndexResponse.Errors)
        {
            return new HttpResponseMessage(HttpStatusCode.BadRequest)
            {
                Content = new StringContent(bulkIndexResponse.ItemsWithErrors
                                                             .Select(itemWithError =>
                                                                 $"Index: {itemWithError.Index}; " +
                                                                 $"Document Id: {itemWithError.Id}; " +
                                                                 $"Error: {itemWithError.Error.Reason}.")
                                                             .ConcatenateIntoString(separator: "\n"))
            };
        }
    }
    return new HttpResponseMessage(HttpStatusCode.OK);
}

批量索引操作成功,但是很遗憾,文档ID并非我所期望的。这是一个例子:

{
    "_index": "dummyindex",
    "_type": "_doc",
    "_id": "U1W4Z20BcmiMRnw-blTi",
    "_score": 1.0,
    "_source": {
        "IndexId": "dummyindex",
        "Id": "0c2d48bd-6842-4f15-b7f2-57fa259b0642",
        "UserId": "dummy_user_1",
        "Country": "dummy_stan"
    }
}

如您所见,Id字段为0c2d48bd-6842-4f15-b7f2-57fa259b0642,根据documentation,应自动将其推断为文档ID。但是,_id字段设置为U1W4Z20BcmiMRnw-blTi,而不是0c2d48bd-6842-4f15-b7f2-57fa259b0642

我在做什么错?

c# elasticsearch nest
1个回答
0
投票

here中的答案:

Id上的ExpandoObject不是该类型的属性,而是支持IDictionary<string,object>的基础ExpandoObject中的键。

您可以通过使用ExpandoObject的属性来反思

dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(@"{
        ""IndexId"": ""dummyindex"",
        ""Id"": ""0c2d48bd-6842-4f15-b7f2-57fa259b0642"",
        ""UserId"": ""dummy_user_1"",
        ""Country"": ""dummy_stan""
    }
");

Type t = expandoObject.GetType();
PropertyInfo[] properties = t.GetProperties(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance);
foreach (PropertyInfo property in properties)
{
    Console.WriteLine(property.ToString());
}

打印

System.Dynamic.ExpandoClass Class
System.Collections.Generic.ICollection`1[System.String] System.Collections.Generic.IDictionary<System.String,System.Object>.Keys
System.Collections.Generic.ICollection`1[System.Object] System.Collections.Generic.IDictionary<System.String,System.Object>.Values
System.Object System.Collections.Generic.IDictionary<System.String,System.Object>.Item [System.String]
Int32 System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<System.String,System.Object>>.Count
Boolean System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<System.String,System.Object>>.IsReadOnly

为了解决您的问题,您可以为每个文档指定Id,但是可以通过将第二个委托参数传递给.IndexMany()

dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(@"{
        ""IndexId"": ""dummyindex"",
        ""Id"": ""0c2d48bd-6842-4f15-b7f2-57fa259b0642"",
        ""UserId"": ""dummy_user_1"",
        ""Country"": ""dummy_stan""
    }
");

var bulkResponse = client.Bulk(bu => bu
    .IndexMany(new[] { expandoObject }, (b, d) => b.Id((Id)d.Id))
);

d.Id强制转换为Id(或可能是字符串,因为它是实际类型,但是强制转换为Id将使用从字符串到Id的隐式转换),因为d是动态类型,没有它,运行时将无法分派。

© www.soinside.com 2019 - 2024. All rights reserved.