我使用 Polly Bulkhead 策略遇到许多超时异常,该策略帮助我限制发送到特定主机的并发调用数量。然而,HttpClient 超时时间似乎影响了整个委托。
我正在使用 IHttpClientFactory 通过以下代码进行配置:
services.AddHttpClient(string.Empty)
.AddPolicyHandler(GetBulkheadPolicy(100));
private static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicy(int maxConcurrentRequests)
{
return Policy.BulkheadAsync(maxConcurrentRequests, int.MaxValue)
.AsAsyncPolicy<HttpResponseMessage>();
}
我的问题是,我希望超时仅影响请求本身,而不影响舱壁策略,因为我想要实现的行为如下:
我已经使用
Semaphore
而不是 Bulkhead Polly 策略实现了该行为,但我想使用策略封装该代码。
谢谢。
我将这些示例放在一起,以演示如何对
HttpClient
请求执行限制的不同选项。我必须强调,这些只是示例,与生产代码相去甚远,因此请通过这个镜头仔细检查它们。
以下示例代码展示了如何在火灾和忘记中发出请求 方式(所以他们不关心回应)。该解决方案假设请求数量多于可用吞吐量。换句话说,生产者比消费者更快,这就是为什么有某种排队机制来处理这种不平衡的原因。
public class ThrottlingWithBatchBlock
{
static readonly HttpClient client = new();
private readonly BatchBlock<HttpRequestMessage> requests = new(100);
private ActionBlock<HttpRequestMessage[]> consumer;
public ThrottlingWithBatchBlock()
{
consumer = new(
reqs => ConsumerAsync(reqs),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
requests.LinkTo(consumer);
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
private async Task ConsumerAsync(HttpRequestMessage[] requests)
{
foreach (var request in requests)
await client.SendAsync(request).ConfigureAwait(false);
}
}
public class ThrottlingWithBufferBlock
{
static readonly HttpClient client = new();
private readonly BufferBlock<HttpRequestMessage> requests = new(
new DataflowBlockOptions { BoundedCapacity = 100 });
public ThrottlingWithBufferBlock()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.OutputAvailableAsync())
{
var request = await requests.ReceiveAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
public class ThrottlingWithChannels
{
static readonly HttpClient client = new();
private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
new BoundedChannelOptions(100) { SingleWriter = true, SingleReader = false });
public ThrottlingWithChannels()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.Writer.WaitToWriteAsync();
await requests.Writer.WriteAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.Reader.WaitToReadAsync())
{
var request = await requests.Reader.ReadAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
public class ThrottlingWithBlockingCollection
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithBlockingCollection()
{
_ = Enumerable.Range(1, 100)
.Select(_ => ConsumerAsync()).ToArray();
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
async Task ConsumerAsync()
{
while (true)
{
var request = requests.Take();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
public class ThrottlingWithParallelForEach
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithParallelForEach()
{
_ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false), 100);
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
}
//Based on https://codereview.stackexchange.com/a/203487
public static partial class ParallelForEach
{
public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int degreeOfParallelism)
{
var toBeProcessedJobs = new HashSet<Task>();
var remainingJobsEnumerator = source.GetEnumerator();
void AddNewJob()
{
if (remainingJobsEnumerator.MoveNext())
{
var readyToProcessJob = body(remainingJobsEnumerator.Current);
toBeProcessedJobs.Add(readyToProcessJob);
}
}
while (toBeProcessedJobs.Count < degreeOfParallelism)
{
AddNewJob();
}
while (toBeProcessedJobs.Count > 0)
{
Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
toBeProcessedJobs.Remove(processed);
AddNewJob();
}
return;
}
}