并联订购的消耗品

问题描述 投票:2回答:1

我想并行处理一些项目。此处理是独立的(顺序无关紧要)并返回输出。然后应尽快将这些输出按顺序中继。

也就是说,该方法应该与此相同(除了并行调用Process):

IEnumerable<T> OrderedParallelImmediateSelect<T> (IEnumerable<object> source)
{
    foreach (var input in source) {
        var result = Process (input);
        yield return result;
    }
}

因此,它需要尝试按顺序处理项目。由于这(当然)不保证按顺序完成,因此结果收集器必须确保等待延迟结果。

一旦订单中的下一个结果出现,就必须立即返回。在排序结果之前,我们不能等待处理整个输入。

这是一个如何看起来像这样的例子:

begin 0
begin 1     <-- we start processing in increasing order
begin 2
complete 1  <-- 1 is complete but we are still waiting for 0
begin 3
complete 0  <-- 0 is complete, so we can return it and 1, too
return 0
return 1
begin 4
begin 5
complete 4  <-- 2 and 3 are missing before we may return this
complete 2  <-- 2 is done, 4 must keep waiting
return 2
begin 6
complete 3  <-- 3 and 4 can now be returned
return 3
return 4

如果可能的话,我想在常规线程池上执行处理。

这种情况是.NET提供的解决方案吗?我已经构建了一个自定义解决方案,但更喜欢使用更简单的东西。

我知道很多类似的问题,但似乎它们都允许等待所有项目完成处理或不保证有序的结果。

这是一个可悲的似乎不起作用的尝试。用IEnumerable替换ParallelQuery没有效果。

int Process (int item)
{
    Console.WriteLine ($"+ {item}");
    Thread.Sleep (new Random (item).Next (100, 1000));
    Console.WriteLine ($"- {item}");
    return item;
}
void Output (IEnumerable<int> items)
{
    foreach (var it in items) {
        Console.WriteLine ($"=> {it}");
    }
}

IEnumerable<int> OrderedParallelImmediateSelect (IEnumerable<int> source)
{
    // This processes in parallel but does not return the results immediately
    return source.AsParallel ().AsOrdered ().Select (Process);
}

var input = Enumerable.Range (0, 20);
Output (OrderedParallelImmediateSelect (input));

输出:

+0 +1 +3 +2 +4 +5 +6 +7 +9 +10 +11 +8 -1 +12 -3 +13 -5 +14 -7 +15 -9 +16 -11 +17 -14 +18 -16 +19 -0 -18 -2 -4 -6 -8 -13 -10 -15 -17 -12 -19 =>0 =>1 =>2 =>3 =>4 =>5 =>6 =>7 =>8 =>9 =>10 =>11 =>12 =>13 =>14 =>15 =>16 =>17 =>18 =>19

c# .net parallel-processing
1个回答
3
投票

我创建了这个程序,作为控制台应用程序:

using System;
using System.Linq;
using System.Threading;

namespace PlayAreaCSCon
{
    class Program
    {
        static void Main(string[] args)
        {
            var items = Enumerable.Range(0, 1000);
            int prodCount = 0;

            foreach(var item in items.AsParallel()
            .AsOrdered()
            .WithMergeOptions(ParallelMergeOptions.NotBuffered)
            .Select((i) =>
            {
                Thread.Sleep(i % 100);
                Interlocked.Increment(ref prodCount);
                return i;
            }))
            {
                Console.WriteLine(item);
            }
            Console.ReadLine();
        }
    }
}

然后我最初在Console.WriteLine(item);上设置了一个断点。运行程序,当我第一次点击该断点时,prodCount为5 - 我们肯定在所有处理完成之前消耗结果。删除断点后,所有结果似乎都按原始顺序生成。

© www.soinside.com 2019 - 2024. All rights reserved.