如何在parallel.for中强制执行有序执行序列?

问题描述 投票:0回答:2

我有一个简单的并行循环来做一些事情,然后我将结果保存到文件中。

object[] items; // array with all items
object[] resultArray = new object[numItems];
Parallel.For(0, numItems, (i) => 
{ 
    object res = doStuff(items[i], i);
    resultArray[i] = res;
});

foreach (object res in resultArray)
{
    sequentiallySaveResult(res);
}

为了保存,我需要按正确的顺序写入结果。通过将结果放入

resultArray
,结果的顺序再次正确。

但是,由于结果很大并且占用大量内存。 我想按顺序处理这些项目,例如四个线程启动并处理项目 1-4,下一个空闲线程处理项目 5,依此类推。

这样,我可以启动另一个线程,监视数组中接下来需要写入的项目(或者每个线程可以在项目完成时发出一个事件),这样我就可以开始写入第一个结果,而后面的项目仍在处理中,然后释放内存。

Parallel.For 是否可以按照给定的顺序处理项目?我当然可以使用

concurentQueue
,将所有索引按正确的顺序放在那里并手动启动线程。

但如果可能的话,我想保留“Parallel.For”实现中关于要使用多少线程等的所有自动化。

免责声明:我无法切换到

ForEach
,我需要
i

编辑#1:
目前,执行顺序是完全随机的,举个例子:

Processing item 1/255
Processing item 63/255
Processing item 32/255
Processing item 125/255
Processing item 94/255
Processing item 156/255
Processing item 187/255
Processing item 249/255
...

编辑#2:
已完成工作的更多详细信息:

我处理灰度图像,需要提取每个“层”的信息(上例中的项目),因此我从 0 到 255(对于 8 位)并对图像执行任务。

我有一个类可以同时访问像素值:

 unsafe class UnsafeBitmap : IDisposable
    {

        private BitmapData bitmapData;
        private Bitmap gray;
        private int bytesPerPixel;
        private int heightInPixels;
        private int widthInBytes;
        private byte* ptrFirstPixel;

        public void PrepareGrayscaleBitmap(Bitmap bitmap, bool invert)
        {
            gray = MakeGrayscale(bitmap, invert);

            bitmapData = gray.LockBits(new Rectangle(0, 0, gray.Width, gray.Height), ImageLockMode.ReadOnly, gray.PixelFormat);
            bytesPerPixel = System.Drawing.Bitmap.GetPixelFormatSize(gray.PixelFormat) / 8;
            heightInPixels = bitmapData.Height;
            widthInBytes = bitmapData.Width * bytesPerPixel;
            ptrFirstPixel = (byte*)bitmapData.Scan0;
        }

        public byte GetPixelValue(int x, int y)
        {
            return (ptrFirstPixel + ((heightInPixels - y - 1) * bitmapData.Stride))[x * bytesPerPixel];
        }

        public void Dispose()
        {
            gray.UnlockBits(bitmapData);
        }
    }

循环是

UnsafeBitmap ubmp; // initialized, has the correct bitmap
int numLayers = 255;
int bitmapWidthPx = 10000;
int bitmapHeightPx = 10000;
object[] resultArray = new object[numLayer];
Parallel.For(0, numLayers, (i) => 
{ 
        for (int x = 0; x < bitmapWidthPx ; x++)
    {
        inLine = false;
        for (int y = 0; y < bitmapHeightPx ; y++)
        {
            byte pixel_value = ubmp.GetPixelValue(x, y);
            
            if (i <= pixel_value && !inLine)
            {
                result.AddStart(x,y);
                inLine = true;
            }
            else if ((i > pixel_value || y == Height - 1) && inLine)
            {
                result.AddEnd(x, y-1);
                inLine = false;
            }
        }
    }
    result_array[i] = result;
});

foreach (object res in resultArray)
{
    sequentiallySaveResult(res);
}

我还想启动一个线程进行保存,检查接下来需要写入的项目是否可用,写入它,从内存中丢弃。为此,如果处理按顺序开始,以便结果“大致”按顺序到达,那就太好了。如果第 5 层的结果到达倒数第二个,我必须等待写入第 5 层(以及后面的所有内容)直到最后。 如果有4个线程启动,则开始处理第1-4层,当一个线程完成后,开始处理第5层,下一个第6层,依此类推,结果将或多或少以相同的顺序出现,我可以开始写入结果到文件并从内存中丢弃它们。

c# multithreading .net-core seq concurrent-processing
2个回答
4
投票
Parallel

类知道如何并行化工作负载,但不知道如何合并处理后的结果。所以我建议改用

PLINQ
。您要求以原始顺序保存结果并与处理同时进行,这使得它比平时有点棘手,但它仍然是完全可行的: IEnumerable<object> results = Partitioner .Create(items, EnumerablePartitionerOptions.NoBuffering) .AsParallel() .AsOrdered() .WithMergeOptions(ParallelMergeOptions.NotBuffered) .Select((item, index) => DoStuff(item, index)) .AsEnumerable(); foreach (object result in results) { SequentiallySaveResult(result); }

说明:

需要使用
  1. AsOrdered
     运算符才能按原始顺序检索结果。
    需要使用
  2. WithMergeOptions
     运算符来防止结果缓冲,以便在结果可用时立即保存它们。
  3. Partitioner.Create
     是必需的,因为数据源是数组,并且 PLINQ 默认情况下对数组进行
    静态
    分区。这意味着数组被分成多个范围,并分配一个线程来处理每个范围。一般来说,这是一个很好的性能优化,但在这种情况下,它违背了及时有序检索结果的目的。因此需要一个动态分区器,从头到尾按顺序枚举源。
  4. EnumerablePartitionerOptions.NoBuffering
    配置可防止 PLINQ 使用的工作线程一次抓取多个项目(这是默认的 PLINQ 分区技巧,称为“块分区”)。
  5. AsEnumerable
  6. 并不是真正需要的。它的存在只是为了表示并行处理的结束。无论如何,接下来的
    foreach
    ParallelQuery<object>
    视为
    IEnumerable<object>
    
        

0
投票
SemaphoreSlim

SemaphoreSlim.WaitAsync
。再加上计数器检查将为您带来所需的结果。
但是我不相信它是必要的,因为如果我理解正确并且您只想按顺序保存它们以避免将它们存储在内存中,您可以使用内存映射文件来:

    如果结果大小相同,只需将缓冲区写入位置
  1. index * size

    
    

  2. 如果结果大小不同,请在获得结果时写入临时映射文件,并让另一个线程在正确的顺序输出文件出现时复制它们。这是 IO 绑定操作,因此不要使用任务池。
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.