HttpClient 超时和 Polly Bulkhead 策略问题

问题描述 投票:0回答:1

我使用 Polly Bulkhead 策略遇到许多超时异常,该策略帮助我限制发送到特定主机的并发调用数量。然而,HttpClient 超时时间似乎影响了整个委托。

我正在使用 IHttpClientFactory 通过以下代码进行配置:

services.AddHttpClient(string.Empty)
.AddPolicyHandler(GetBulkheadPolicy(100));


private static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicy(int maxConcurrentRequests)
{
    return Policy.BulkheadAsync(maxConcurrentRequests, int.MaxValue)
        .AsAsyncPolicy<HttpResponseMessage>();
}

我的问题是,我希望超时仅影响请求本身,而不影响舱壁策略,因为我想要实现的行为如下:

  • 限制对特定主机的并发请求数
  • 无限等待,直到有能力发送请求(当队列已满时,Polly 将引发异常)
  • 将请求发送到主机并应用超时,例如默认超时。

我已经使用

Semaphore
而不是 Bulkhead Polly 策略实现了该行为,但我想使用策略封装该代码。

谢谢。

c# dotnet-httpclient polly bulkhead
1个回答
5
投票

我将这些示例放在一起,以演示如何对

HttpClient
请求执行限制的不同选项。我必须强调,这些只是示例,与生产代码相去甚远,因此请通过这个镜头仔细检查它们。

以下示例代码展示了如何在火灾和忘记中发出请求 方式(所以他们不关心回应)。该解决方案假设请求数量多于可用吞吐量。换句话说,生产者比消费者更快,这就是为什么有某种排队机制来处理这种不平衡的原因。

带有批处理和操作块

public class ThrottlingWithBatchBlock
{
    static readonly HttpClient client = new();
    private readonly BatchBlock<HttpRequestMessage> requests = new(100);
    private ActionBlock<HttpRequestMessage[]> consumer;

    public ThrottlingWithBatchBlock()
    {
        consumer = new(
            reqs => ConsumerAsync(reqs),
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
        requests.LinkTo(consumer);
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.SendAsync(request);
    }

    private async Task ConsumerAsync(HttpRequestMessage[] requests)
    {
        foreach (var request in requests)
            await client.SendAsync(request).ConfigureAwait(false);
    }
}

带缓冲块

public class ThrottlingWithBufferBlock
{
    static readonly HttpClient client = new();
    private readonly BufferBlock<HttpRequestMessage> requests = new(
            new DataflowBlockOptions { BoundedCapacity = 100 });

    public ThrottlingWithBufferBlock()
    {
        _ = ConsumerAsync();
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.SendAsync(request);
    }

    async Task ConsumerAsync()
    {
        while (await requests.OutputAvailableAsync())
        {
            var request = await requests.ReceiveAsync();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

有通道

public class ThrottlingWithChannels
{
    static readonly HttpClient client = new();
    private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
            new BoundedChannelOptions(100) { SingleWriter = true, SingleReader = false });

    public ThrottlingWithChannels()
    {
        _ = ConsumerAsync();
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.Writer.WaitToWriteAsync();
        await requests.Writer.WriteAsync(request);
    }

    async Task ConsumerAsync()
    {
        while (await requests.Reader.WaitToReadAsync())
        {
            var request = await requests.Reader.ReadAsync();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

具有阻塞集合

public class ThrottlingWithBlockingCollection
{
    static readonly HttpClient client = new();
    private BlockingCollection<HttpRequestMessage> requests = new();

    public ThrottlingWithBlockingCollection()
    {
        _ = Enumerable.Range(1, 100)
            .Select(_ => ConsumerAsync()).ToArray();
    }

    public Task IssueNewRequest(HttpRequestMessage request)
    {
        requests.Add(request);
        return Task.CompletedTask;
    }

    async Task ConsumerAsync()
    {
        while (true)
        {
            var request = requests.Take();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

使用并行 Foreach

public class ThrottlingWithParallelForEach
{
    static readonly HttpClient client = new();
    private BlockingCollection<HttpRequestMessage> requests = new();

    public ThrottlingWithParallelForEach()
    {
        _ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false), 100);
    }

    public Task IssueNewRequest(HttpRequestMessage request)
    {
        requests.Add(request);
        return Task.CompletedTask;
    }
}
//Based on https://codereview.stackexchange.com/a/203487
public static partial class ParallelForEach
{
    public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int degreeOfParallelism)
    {
        var toBeProcessedJobs = new HashSet<Task>();
        var remainingJobsEnumerator = source.GetEnumerator();

        void AddNewJob()
        {
            if (remainingJobsEnumerator.MoveNext())
            {
                var readyToProcessJob = body(remainingJobsEnumerator.Current);
                toBeProcessedJobs.Add(readyToProcessJob);
            }
        }

        while (toBeProcessedJobs.Count < degreeOfParallelism)
        {
            AddNewJob();
        }

        while (toBeProcessedJobs.Count > 0)
        {
            Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
            toBeProcessedJobs.Remove(processed);
            AddNewJob();
        }

        return;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.