我目前正在使用ElasticSearch NEST 7.x库。
在托管我的ElasticSearch主节点的VM上,我正在运行一个Web服务器,该服务器通过REST接收JSON数据。然后将这些JSON数据保存在ElasticSearch中。
首先,将接收到的JSON数据传递到此方法中进行解析:
private static (bool Success, string ErrorMessage) TryReadRawJsonData(
string rawJsonData, out IEnumerable<(string Index, ExpandoObject JsonContent)> jsonLines)
{
var results = new List<(string Index, ExpandoObject JsonContent)>();
foreach (string rawDataLine in HttpContext.Current.Server.UrlDecode(rawJsonData).Split('\n').Where(line => !string.IsNullOrWhiteSpace(line)))
{
dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(rawDataLine);
if (!Dynamic.HasProperty(expandoObject, "IndexId"))
{
jsonLines = Enumerable.Empty<(string, ExpandoObject)>();
return (Success: false, ErrorMessage: $"No field named 'IndexId' found in {rawDataLine}.");
}
string indexId = (string)expandoObject.IndexId.ToLower();
results.Add((indexId, JsonContent: expandoObject));
}
jsonLines = results;
return (Success: true, ErrorMessage: null);
}
如果成功解析,则随后将返回值传递到此方法中以进行批量索引:
private static async Task<HttpResponseMessage> BulkIndexAsync(IEnumerable<(string Index, ExpandoObject JsonContent)> contents)
{
foreach (var group in contents.GroupBy(line => line.Index))
{
BulkResponse bulkIndexResponse =
await ElasticClient.BulkAsync(bulk => bulk.Index(group.Key).IndexMany(group.Select(member => member.JsonContent)));
if (bulkIndexResponse.Errors)
{
return new HttpResponseMessage(HttpStatusCode.BadRequest)
{
Content = new StringContent(bulkIndexResponse.ItemsWithErrors
.Select(itemWithError =>
$"Index: {itemWithError.Index}; " +
$"Document Id: {itemWithError.Id}; " +
$"Error: {itemWithError.Error.Reason}.")
.ConcatenateIntoString(separator: "\n"))
};
}
}
return new HttpResponseMessage(HttpStatusCode.OK);
}
批量索引操作成功,但是很遗憾,文档ID并非我所期望的。这是一个例子:
{
"_index": "dummyindex",
"_type": "_doc",
"_id": "U1W4Z20BcmiMRnw-blTi",
"_score": 1.0,
"_source": {
"IndexId": "dummyindex",
"Id": "0c2d48bd-6842-4f15-b7f2-57fa259b0642",
"UserId": "dummy_user_1",
"Country": "dummy_stan"
}
}
如您所见,Id
字段为0c2d48bd-6842-4f15-b7f2-57fa259b0642
,根据documentation,应自动将其推断为文档ID。但是,_id
字段设置为U1W4Z20BcmiMRnw-blTi
,而不是0c2d48bd-6842-4f15-b7f2-57fa259b0642
。
我在做什么错?
here中的答案:
Id
上的ExpandoObject
不是该类型的属性,而是支持IDictionary<string,object>
的基础ExpandoObject
中的键。
您可以通过使用ExpandoObject
的属性来反思
dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(@"{
""IndexId"": ""dummyindex"",
""Id"": ""0c2d48bd-6842-4f15-b7f2-57fa259b0642"",
""UserId"": ""dummy_user_1"",
""Country"": ""dummy_stan""
}
");
Type t = expandoObject.GetType();
PropertyInfo[] properties = t.GetProperties(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance);
foreach (PropertyInfo property in properties)
{
Console.WriteLine(property.ToString());
}
打印
System.Dynamic.ExpandoClass Class
System.Collections.Generic.ICollection`1[System.String] System.Collections.Generic.IDictionary<System.String,System.Object>.Keys
System.Collections.Generic.ICollection`1[System.Object] System.Collections.Generic.IDictionary<System.String,System.Object>.Values
System.Object System.Collections.Generic.IDictionary<System.String,System.Object>.Item [System.String]
Int32 System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<System.String,System.Object>>.Count
Boolean System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<System.String,System.Object>>.IsReadOnly
为了解决您的问题,您可以为每个文档指定Id
,但是可以通过将第二个委托参数传递给.IndexMany()
:
dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(@"{
""IndexId"": ""dummyindex"",
""Id"": ""0c2d48bd-6842-4f15-b7f2-57fa259b0642"",
""UserId"": ""dummy_user_1"",
""Country"": ""dummy_stan""
}
");
var bulkResponse = client.Bulk(bu => bu
.IndexMany(new[] { expandoObject }, (b, d) => b.Id((Id)d.Id))
);
将d.Id
强制转换为Id
(或可能是字符串,因为它是实际类型,但是强制转换为Id
将使用从字符串到Id
的隐式转换),因为d
是动态类型,没有它,运行时将无法分派。