对于我们的业务场景,我们需要对 Cosmos DB 中存储的项目提供强大的“Read-Your-Write”保证。
我们的 cosmos DB 配置为会话一致性。对于 GET Single Item 调用,我们使用会话令牌来确保返回最近的写入。
我的问题是,查询多个项目如何工作?
Cosmos DB SDK 提供了
GetItemQueryIterator
方法来传递查询定义。 QueryRequestOptions
公开了接受单个会话令牌的会话令牌属性。
如果我从查询请求中检索 100 条记录,我应该将哪个会话令牌传递给 QueryRequestOptions 以确保响应中的所有 100 条记录都是最新版本?
我的假设是,查询请求将尝试从给定会话令牌映射到的实例中检索所有 100 条记录,无论这 100 条记录是否包含该实例上的最新版本。
理想情况下,我们希望始终收到最新版本的记录。我能想到的一种选择是使用 Bounded Staleness。有会话一致性的选项吗?
尝试使用以下代码,以确保在查询配置了会话一致性的 Azure Cosmos DB 中的多个项目时保证“读你写”。
QueryRequestOptions
接受单个会话令牌。如果涉及多个分区,单个会话令牌可能无法覆盖所有分区。
为了跨多个分区进行查询并维护“Read-Your-Write”保证,代码使用逗号分隔格式将所有捕获的会话令牌合并到单个令牌字符串中。您应该传递一个“合并会话令牌”,其中包括查询中涉及的所有分区的会话令牌。通过合并跨分区最新写入的会话令牌,Cosmos DB 可确保查询检索每个分区的最新一致数据。
class Program
{
private static string endpointUri = "uri";
private static string primaryKey = "primaryKey";
private static string databaseId = "db1";
private static string containerId = "cont1";
private static CosmosClient cosmosClient;
private static Container container;
private static Dictionary<string, string> partitionSessionTokens = new Dictionary<string, string>();
static async Task Main(string[] args)
{
try
{
cosmosClient = new CosmosClient(endpointUri, primaryKey);
container = cosmosClient.GetContainer(databaseId, containerId);
string partitionKey1 = "partition1";
string partitionKey2 = "partition2";
await WriteItemAsync(partitionKey1, new { id = "1", partitionKey = partitionKey1, value = "Item1" });
await WriteItemAsync(partitionKey2, new { id = "2", partitionKey = partitionKey2, value = "Item2" });
await QueryWithSessionTokensAsync();
}
catch (CosmosException ex)
{
Console.WriteLine($"Cosmos DB Error: {ex.StatusCode} - {ex.Message}");
}
catch (Exception ex)
{
Console.WriteLine($"Unexpected Error: {ex.Message}");
}
}
private static async Task WriteItemAsync(string partitionKey, object item)
{
try
{
ItemResponse<object> response = await container.CreateItemAsync(item, new PartitionKey(partitionKey));
string sessionToken = response.Headers.Session;
Console.WriteLine($"Write completed for PartitionKey {partitionKey}, SessionToken: {sessionToken}");
partitionSessionTokens[partitionKey] = sessionToken;
}
catch (CosmosException ex)
{
Console.WriteLine($"Write Error: {ex.StatusCode} - {ex.Message}");
}
}
private static async Task QueryWithSessionTokensAsync()
{
string mergedSessionToken = MergeSessionTokens(partitionSessionTokens);
Console.WriteLine($"Merged Session Token: {mergedSessionToken}");
QueryRequestOptions options = new QueryRequestOptions
{
ConsistencyLevel = ConsistencyLevel.Session,
SessionToken = mergedSessionToken
};
string query = "SELECT * FROM c";
try
{
FeedIterator<dynamic> queryIterator = container.GetItemQueryIterator<dynamic>(query, requestOptions: options);
while (queryIterator.HasMoreResults)
{
FeedResponse<dynamic> response = await queryIterator.ReadNextAsync();
foreach (var item in response)
{
Console.WriteLine($"Item: {item}");
}
}
}
catch (CosmosException ex)
{
Console.WriteLine($"Query Error: {ex.StatusCode} - {ex.Message}");
}
}
private static string MergeSessionTokens(Dictionary<string, string> sessionTokens)
{
return string.Join(",", sessionTokens.Values);
}
}
Write completed for PartitionKey partition1, SessionToken: 0:0#2#1=-1
Write completed for PartitionKey partition2, SessionToken: 0:0#3#1=-1
Merged Session Token: 0:0#2#1=-1,0:0#3#1=-1
Item: {
"id": "1",
"partitionKey": "partition1",
"value": "Item1"
}
Item: {
"id": "2",
"partitionKey": "partition2",
"value": "Item2"
}