这涵盖了各种编程语言支持的异步编程模型,使用async和await关键字。
我在生产中遇到了错误,因为从同步方法调用了一个异步方法。这在应用程序中产生了奇怪的行为。终于明白了,但我还是担心……
我正在使用 System.Net.Http 中的 HTTPClient 对 API 发出请求。 API 限制为每秒 10 个请求。 我的代码大致是这样的: 列表任务=新列表 我正在使用 System.Net.Http 中的 HTTPClient 对 API 发出请求。 API 限制为每秒 10 个请求。 我的代码大致是这样的: List<Task> tasks = new List<Task>(); items..Select(i => tasks.Add(ProcessItem(i)); try { await Task.WhenAll(taskList.ToArray()); } catch (Exception ex) { } ProcessItem 方法执行一些操作,但始终使用以下方式调用 API: await SendRequestAsync(..blah)。看起来像: private async Task<Response> SendRequestAsync(HttpRequestMessage request, CancellationToken token) { token.ThrowIfCancellationRequested(); var response = await HttpClient .SendAsync(request: request, cancellationToken: token).ConfigureAwait(continueOnCapturedContext: false); token.ThrowIfCancellationRequested(); return await Response.BuildResponse(response); } 最初代码运行良好,但当我开始使用 Task.WhenAll 时,我开始从 API 收到“超出速率限制”消息。如何限制发出请求的速率? 值得注意的是,ProcessItem 可以根据项目进行 1-4 个 API 调用。 API 限制为每秒 10 个请求。 然后让您的代码执行一批 10 个请求,确保它们至少需要一秒钟: Items[] items = ...; int index = 0; while (index < items.Length) { var timer = Task.Delay(TimeSpan.FromSeconds(1.2)); // ".2" to make sure var tasks = items.Skip(index).Take(10).Select(i => ProcessItemsAsync(i)); var tasksAndTimer = tasks.Concat(new[] { timer }); await Task.WhenAll(tasksAndTimer); index += 10; } 更新 我的 ProcessItems 方法根据项目进行 1-4 次 API 调用。 在这种情况下,批处理并不是合适的解决方案。您需要将异步方法限制为某个 number,这意味着 SemaphoreSlim。棘手的部分是您希望允许更多呼叫随着时间的推移。 我还没有尝试过这段代码,但我的总体想法是有一个周期函数来释放信号量最多10次。所以,像这样: private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(10); private async Task<Response> ThrottledSendRequestAsync(HttpRequestMessage request, CancellationToken token) { await _semaphore.WaitAsync(token); return await SendRequestAsync(request, token); } private async Task PeriodicallyReleaseAsync(Task stop) { while (true) { var timer = Task.Delay(TimeSpan.FromSeconds(1.2)); if (await Task.WhenAny(timer, stop) == stop) return; // Release the semaphore at most 10 times. for (int i = 0; i != 10; ++i) { try { _semaphore.Release(); } catch (SemaphoreFullException) { break; } } } } 用途: // Start the periodic task, with a signal that we can use to stop it. var stop = new TaskCompletionSource<object>(); var periodicTask = PeriodicallyReleaseAsync(stop.Task); // Wait for all item processing. await Task.WhenAll(taskList); // Stop the periodic task. stop.SetResult(null); await periodicTask; 答案与这个类似。 不要使用任务列表和 WhenAll,而是使用 Parallel.ForEach 并使用 ParallelOptions 将并发任务数限制为 10 个,并确保每个任务至少需要 1 秒: Parallel.ForEach( items, new ParallelOptions { MaxDegreeOfParallelism = 10 }, async item => { ProcessItems(item); await Task.Delay(1000); } ); 或者如果您想确保每个项目花费的时间尽可能接近 1 秒: Parallel.ForEach( searches, new ParallelOptions { MaxDegreeOfParallelism = 10 }, async item => { var watch = new Stopwatch(); watch.Start(); ProcessItems(item); watch.Stop(); if (watch.ElapsedMilliseconds < 1000) await Task.Delay((int)(1000 - watch.ElapsedMilliseconds)); } ); 或者: Parallel.ForEach( searches, new ParallelOptions { MaxDegreeOfParallelism = 10 }, async item => { await Task.WhenAll( Task.Delay(1000), Task.Run(() => { ProcessItems(item); }) ); } ); 更新答案 我的 ProcessItems 方法根据项目进行 1-4 次 API 调用。因此,批量大小为 10 时,我仍然超出了速率限制。 您需要在SendRequestAsync中实现滚动窗口。包含每个请求的时间戳的队列是合适的数据结构。您将时间戳早于 10 秒的条目出队。碰巧的是,有一个“实现”作为对 SO 上类似问题的答案。 原答案 可能对其他人仍然有用 处理此问题的一种简单方法是以 10 个为一组对请求进行批处理,同时运行这些请求,然后等待总共 10 秒过去(如果尚未过去)。如果这批请求可以在 10 秒内完成,这将使您达到速率限制,但如果这批请求需要更长时间,则效果不佳。 查看 MoreLinq 中的 .Batch() 扩展方法。代码看起来大约像 foreach (var taskList in tasks.Batch(10)) { Stopwatch sw = Stopwatch.StartNew(); // From System.Diagnostics await Task.WhenAll(taskList.ToArray()); if (sw.Elapsed.TotalSeconds < 10.0) { // Calculate how long you still have to wait and sleep that long // You might want to wait 10.5 or 11 seconds just in case the rate // limiting on the other side isn't perfectly implemented } } https://github.com/thomhurst/EnumerableAsyncProcessor 我编写了一个库来帮助实现这种逻辑。 用法是: var responses = await AsyncProcessorBuilder.WithItems(items) // Or Extension Method: items.ToAsyncProcessorBuilder() .SelectAsync(item => ProcessItem(item), CancellationToken.None) .ProcessInParallel(levelOfParallelism: 10, TimeSpan.FromSeconds(1)); 这是使用 BlockingCollection 的解决方案。您可以动态添加http请求。而且它有效地利用了时间。 public class RateLimitedHttpClient { private readonly HttpClient _httpClient; private readonly BlockingCollection<bool> _blockingCollection; private readonly TimeSpan _rateLimitPeriod; public RateLimitedHttpClient(HttpClient httpClient, int maxRequestsPerPeriod, TimeSpan rateLimitPeriod) { _httpClient = httpClient; _blockingCollection = new BlockingCollection<bool>(maxRequestsPerPeriod); _rateLimitPeriod = rateLimitPeriod; } public async Task<T> GetAsync<T>(string url) { _blockingCollection.Add(false); _ = Task.Delay(_rateLimitPeriod).ContinueWith((task) => { _blockingCollection.Take(); }); var response = await _httpClient.GetAsync(url); var content = await response.Content.ReadFromJsonAsync<T>(); return content; } } 用途: var httpClient = new RateLimitedHttpClient(new HttpClient(), 500, TimeSpan.FromSeconds(5)); var tasks = Enumerable.Range(0, 2000).Select(async i => { var res = await httpClient.GetAsync<int>($"https://localhost:5001/tests/ping?num={i}"); Console.WriteLine(res); }); await Task.WhenAll(tasks); 所有任务将在~22秒后完成。前 500 个请求速度很快,其他请求则需要更长的时间,因为它们面临速率限制。
TypeError:对象 AsyncMock 不能在“await”表达式中使用
我为一个函数创建了一个 pytest,但是当我尝试运行它时出现以下错误: > 返回等待 cntrl.get_function_by_topic_by_method(url, method, *args, query_params=query_params,
我正在尝试用特定的属性值填充对象数组,并在 Node.JS 环境中使用带有 async/await 的 Promises 和 Javascript。这是主要部分: getPublicNotes(connectHe...
为什么我的异步 ASP.NET Web API 控制器阻塞主线程?
我有一个 ASP.NET Web API 控制器,我原以为它会异步操作。控制器设计为在第一个请求时休眠 20 秒,但为任何后续请求提供服务
我有几个函数相互异步调用,我需要捕获调用最后一个函数时的情况。 wait 语句应该调用一个我应该等待的函数,但它不是......
为什么在处理长时间运行的 CPU 密集型任务时,Task.Run() 会阻止控制器操作?
我正在开发 .NET 6 Web API,并使用 Task.Run() 处理 CPU 密集型任务,将其卸载到后台线程。然而,我的控制器操作似乎仍然被阻止,并且请求...
我正在尝试退出表单。但在退出之前,我需要确保一些无限任务已停止。这是我所拥有的一个最小的例子。 公共部分类 Form1 : Form {
在 Enumerable.All 扩展中使用带有返回 bool 的异步 lambda 表达式
我有以下异步方法 私有异步任务 HasPolicy(AuthorizationFilterContext上下文,字符串策略) { var 授权=等待_authorization.AuthorizeAsync(context.
C# 在 Enumerable.All 扩展中使用带有返回 bool 的异步 lambda 表达式
我有以下异步方法 私有异步任务 HasPolicy(AuthorizationFilterContext上下文,字符串策略) { var 授权=等待_authorization.AuthorizeAsync(上下文。
如何使用 async/await 避免此 .NET Core Web API 中的线程池耗尽和死锁?
我正在开发 .NET Core Web API,在调用异步方法时遇到了性能问题和潜在的死锁。我怀疑我错误地使用了 async 和 wait 。下面是...
我在使用 adrf 库在 django Rest 框架中测试异步模式时遇到问题。 我创建了一个 django 项目,其视图如下: 从rest_framework.decorators导入api_view作为sync_api_vi...
RefreshIndicator 是否仅适用于 Scrollable 或 ListView 相关的小部件?我希望将它与脚手架内的容器一起使用,并使用手势识别器,从而在拉下它时重新刷新...
CancellationToken.ThrowIfCancellationRequested 后的故障与取消任务状态
通常我不会发布带有答案的问题,但这次我想引起一些注意,我认为这可能是一个晦涩但常见的问题。就是被这个问题触发的,从那时起我
使用 Firebase 实时数据库进行 Swift 异步/等待
我正在尝试编写一个简单的数据库查询来从实时数据库中获取一些数据,我想使用异步/等待,但我不知道是否可以使用observeSingleEvent 可以吗
我正在尝试将一些代码从 Objective-C 转换为 Swift,并尽可能多地使用 async/await。 此代码利用了一个 C 库,该库使用带有 void* 上下文的回调。有
我是用JavaScript连接Microsoft SignalR服务器,因为我在Flutter Web中没有找到任何支持signalR客户端的dart包。我包含了 https://cdnjs.cloudflare.com/ajax/libs/microsoft-s...
await Task.Run() 之后返回到 C# 控制台应用程序中的初始线程
在 C# (.Net Core) 控制台应用程序中等待 Task.Run() 后,永远不会返回主线程。在 Task.Run()/Task.Factory.StartNew() 等之后如何返回控制台应用程序中的主线程 ...
将 IEnumerable<Task<T>> 转换为 IAsyncEnumerable<T> 的最佳方法?
这是我能想到的最好的: 公共静态异步 IAsyncEnumerable AsAsyncEnumerable(此 IEnumerable> 任务) { foreach(任务任务中的任务...
异步运行代码,允许每个代码块在下一个代码块开始之前完成[关闭]
我正在使用 WPF C# 编写一个“对不起”游戏。 选择后 “7”自动播放器将有很多基于的选项 “动7”或“分动”。 尝试...