producer-consumer 相关问题

生产者 - 消费者问题(也称为有界缓冲问题)是多进程同步问题的经典示例。该问题描述了两个进程,生产者和使用者,他们共享一个用作队列的通用固定大小缓冲区。

KAFKAPYTHON消费者组会话定时出现

I使用Confluent-kafka v1.3.0,我在消费者组会话超时方面存在以下问题。 我的配置看起来像: c ['kafka'] = { 'bootstrap.servers':'host.docker.internal:9104',, '缺点...

回答 1 投票 0


使用有界通道<T>使用严格的内存分配要求

Channel<T> 在通道传播的TS是巨大的(每个1GB),这需要将任何给定时刻的数组总数限制为最小。因此,两个生产商在创建byte[]之前一直在等待,直到他们知道频道中有空的空间为止。这是第一个生产者: Channel<byte[]> channel = Channel.CreateBounded<byte[]>(2); 第二生产商是相同的。不幸的是,即使频道中只有一个空插槽,这都使两个生产商都可以开始创建一个新数组。因此,在某个时候,系统可能同时拥有4个巨大的阵列:1被消费者消耗,其中1个存储在频道中,而2个同时创建的是两个生产商(试图填充一个空的一个空插槽)。 我想将托管内存中的数组总数限制为3。我有什么办法可以驯服我的生产者,这样他们才能开始创建新的byte[]渠道?换句话说,在创建数组后,生产者应该能够在这样的频道中立即写入它:new byte[1_000_000_000] ...并且Task producer1 = Task.Run(async () => { while (true) { await channel.Writer.WaitToWriteAsync(); // The channel has space available. Let's create the array. byte[] array = new byte[1_000_000_000]; // Here initialize the array (mainly I/O bound, time consuming) // At this moment the channel might be full, // because the other producer filled the gap. await channel.Writer.WriteAsync(array); } }); 始终应该是byte[]。 这种情况是人为的。它的灵感来自。 clarification:字节阵列的构建和初始化是生产者的专有责任,应该保持这种状态。部分或全部分配其他地方的施工工作是不可取的。 一个选项,尽管它需要一些手动管理,但它将是一个限制大小的频道的简单包装器。 bool success = channel.Writer.TryWrite(array); 该“裸机”实现的最大问题是,您必须在发布该项目时手动管理。 作为替代方案,您可以返回一个“包装”实例:success有这些锁定的返回实例。 OFC然后您将分配跟踪对象,但这是围绕分配的组成和手动代码之间的权衡。 Edit每评论: true //This is for the sake of simplicity in example SingletonHolder.LockedGibiByteArrayPool = new LockingArrayPool<byte[]>(new [] { new byte[1_000_000_000], new byte[1_000_000_000] }); Task producer1 = Task.Run(async () => { while (true) { await channel.Writer.WaitToWriteAsync(); // Grab an array from our pool. byte[] array = await SingletonHolder.LockedGibiByteArrayPool.GetItemAsync(); // Here initialize the array (mainly I/O bound, time consuming) // The channel should not be full, // But the reader -must- make sure to release the array // when it is done. // alternatively, use the 'LockPooledItem` pattern suggested, // and then at least it's just a `Dispose()` call... await channel.Writer.WriteAsync(array); } }); buffered生产者/消费者系统(如通道和数据流)在最大缓冲尺寸周围有些“模糊”(我永远不记得数据流是否在输出缓冲区中计数项目是否计数)。正如您指出的那样,它们不计算生产者或消费者持有的任何物品。 ,为了随时限制total对象数量,您需要自己的分配器。 Disposable usage: public sealed class TokenAllocator { private readonly SemaphoreSlim _mutex; public TokenAllocator(int maxTokens) => _mutex = new(maxTokens); public async Task<IDisposable> AllocateAsync() { await _mutex.WaitAsync(); return Disposable.Create(() => _mutex.Release()); } } 注: 生产者示例假定例外将导致应用程序故障。如果必须从中恢复异常,则生产商中的var allocator = new TokenAllocator(3); var channel = Channel.CreateBounded<(IDisposable token, byte[] item)>(2); var consumer = Task.Run(async () => { await foreach (var (token, item) in channel.Reader.ReadAllAsync()) using (token) { ... // Do something with `item` } }); var producer1 = Task.Run(async () => { while (true) { var token = await allocator.AllocateAsync(); try { var item = new byte[1_000_000_000]; ... // Do something with `item` } catch { token.Dispose(); throw; } await channel.Writer.WriteAsync((token, item)); } }); 需要围绕它的public sealed class LimitedAllocator<T> { private readonly SemaphoreSlim _mutex; public LimitedAllocator(int maxItems) => _mutex = new(maxItems); public async Task<AllocatedItem> AllocateAsync(Func<T> create) { await _mutex.WaitAsync(); return new(this, create()); } private void Free() => _mutex.Release(); public sealed class AllocatedItem : IDisposable { public AllocatedItem(LimitedAllocator<T> allocator, T item) { Item = item; _disposer = Disposable.Create(() => allocator.Free()); } public T Item { get; } public void Dispose() => _disposer.Dispose(); private readonly IDisposable _disposer; } } /var allocator = new LimitedAllocator<byte[]>(3); var channel = Channel.CreateBounded<LimitedAllocator<byte[]>.AllocatedItem>(2); var consumer = Task.Run(async () => { await foreach (var allocatedItem in channel.Reader.ReadAllAsync()) using (allocatedItem) { ... // Do something with allocatedItem.Item } }); var producer1 = Task.Run(async () => { while (true) { var allocatedItem = await allocator.AllocateAsync(() => new byte[1_000_000_000]); ... // Do something with allocatedItem.Item await channel.Writer.WriteAsync(allocatedItem); } }); ,仅在例外情况下处理分配的项目。 在...中仅是内存(例如try)的情况下,您可以考虑使用catch而不是T。 LimitedAllocator<T>本质上是一种与内存结合的一次性。 制片人不再等待查看渠道中是否有空间;它只是在等待查看loscator是否有空间。如果分配器中有可用的空间,那么该生产商将成为将创建一个要发送到渠道的项目的空间。 如果您非常不喜欢byte[]with-intem的配对,那么使用连接的属性是可以使用的。但这倾向于更多的魔术和较低的可维护代码。 这里是另一种方法,它也使用了IMemoryOwnerStephenCleary和To11mtm的答案。不同之处在于,信号量的AllocatedItem和IMemoryOwner被初始化为通道的确切容量,并且信号量是在从通道中取出AllocatedItem后立即释放的,而不是在完全处理该通道时。 : IDisposable 本质上,SemaphoreSlim成为限制渠道能力的后卫。如果需要,您不妨使用无限频道。 这种方法可确保任何时间分配的最大数量为3,不包括符合垃圾收集的资格。

回答 2 投票 0




为什么对 cv wait 和 pop() 使用单独的 lock_guard 会导致分段错误?

我创建了一个Packets类来同步消费者和生产者之间的数据处理。我的设计对条件变量(cv wait)和弹出数据包的操作使用单独的锁...

回答 2 投票 0

为什么对 cv wait 和 pop() 使用单独的锁会导致分段错误?

我创建了一个Packets类来同步消费者和生产者之间的数据处理。我的设计对条件变量(cv wait)和弹出数据包的操作使用单独的锁...

回答 1 投票 0

3个线程使用的ConcurrentDictionary

我有一个使用 ConcurrentDictionary 的类。 在此类中,有三个函数对此 ConcurrentDictionnary 执行一些操作。 每个函数都由不同的线程调用。

回答 1 投票 0

生产者-消费者的两个信号量的行为

这里是人工智能生成的代码 - 看起来它“应该”工作,但没有: 1> 即使在生产者生成第一个 (1) 之前,消费者端也不应该开始打印输出 (a 0) ——因为它

回答 1 投票 0

生产者消费者队列最佳实践和性能

我正在用 C# 构建一个生产者消费者队列,并且我正在阅读寻找在鲁棒性和性能方面最好的方法。 多年来我一直使用 BlockingCollection 但我发现

回答 1 投票 0

Go:用于通用数据处理的接口{}的chan

我目前正在尝试用golang编写一些生产者和消费者代码,试图保持它的通用性。 它看起来像这样。 函数生产者 () { 共享数据通道 <- getData() //retu...

回答 1 投票 0

双重异常/大师冥想错误:esp32cam 的生产者消费者问题运行书籍示例内存损坏

我的第一个问题。请耐心等待。 我正在使用 PlatformIO Windows10 VM 运行 VS-Code,尝试运行书中的第一个项目:使用 ESP32 开发 IOT 项目,Vedat Ozan Oner...h...

回答 1 投票 0

我的生产者-消费者实现超出了最大大小

我已经实现了一个 Runnable Producer 和一个 Runnable Consumer 以及一个 Store 类。商店分配了最大尺寸的货架。每个生产者添加一个项目,每个消费者将删除一个...

回答 1 投票 0

如何使用.NET Framework和异步I/O高效处理数百万本地文件?

我需要处理一个大存储库(源存储库)并在另一个存储库(元数据存储库)中创建/更新(或保持完整)大约 2M 个文件。 虽然文件有~2M,但大部分都只是...

回答 1 投票 0

如何将.NET Framework中的Parallel.ForEach移植到异步IO?

我需要处理一个大存储库(源存储库)并在另一个存储库(元数据存储库)中创建/更新(或保持完整)大约 2M 个文件。 我当前的实现不使用异步 I/O,甚至

回答 1 投票 0

Windows 服务 .Net 8 上的 FileSystemWatcher [IHostedService]

我正在尝试使用 .Net 8 创建一个 Windows 服务应用程序作为托管服务,我的应用程序的想法是监视文件夹列表。问题是我正在丢失由...触发的事件 Created

回答 1 投票 0

当我的队列有东西要处理时如何触发后台线程

我正在使用 ConcurrentQueue 来存储来自多个线程的消息。 如何创建一些后台线程,当我的队列中有东西时会自动触发?

回答 2 投票 0

LabVIEW 正在覆盖队列中捕获的图像

我正在使用视觉例程从 300fps GigE 相机捕获帧(“抓取”)。 相机传输帧的速度比高清写入帧的速度要快一些,所以我在一个循环中捕捉并坚持......

回答 1 投票 0

向消费者发出生产者已停止生产的信号

我正在开发一个控制台 .NET Core 应用程序,它收集电子邮件数据,然后使用通道异步发送这些电子邮件。 我目前正在将消费者添加为托管服务...

回答 1 投票 0

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.