当我将以下代码作为应用程序的一部分运行时,将行
var capacityScenarios = debouncer.Result;
替换为以下行:var capacityScenarios = await debouncer;
会导致性能大幅提升。执行时间差约3分钟,并且随着天数/线程数的增加,量会更大。
我注意到这一点是因为我们最近才将一些异步功能包含到一些以前存在的类中。因此,我没有调整树上的所有类以在某些点返回
Task<T>
,而是只是在任务上执行 .Wait
或 .Result
。
我知道
.Wait
或 .Result
持有线程,并创建一个新线程来运行任务,但我不明白这如何导致如此戏剧性的执行波动。我是否在逻辑上做错了什么,可能会导致这种情况?
编辑:我的
debouncer
类只是停止一个调用,理想情况下将枚举作为输入,以便一堆线程可以单独对其进行调用,并且在一段超时后,将使用适当聚合的枚举进行原始调用(而不是一次性呼叫)。 debouncer
使每个调用线程等待单个任务,因此一旦 debouncer
完成,每个线程将同时完成。我将其包含在这里,以便任何人都可以自己演示缓慢的情况,并查看底层设计,以防影响任何设计改进建议。
public void SlowOrFastFunction(){
var tasks = new List<Task>();
for (int i = 0; i < 500; i++)
{
var date = DateTime.Now.AddDays(i);
//Note async usage is only here for commenting in the faster await call. Remove async when using .Result call as it's not needed.
var task = Task.Run(async () =>
{
var debouncer = Debouncer.Debounce("Simulation", 3000, date, (input) =>
{
return input;
}, (inputs) =>
{
return inputs.Select(i =>
{
return i
}).ToArray();
});
//var capacityScenarios = await debouncer;
var capacityScenarios = debouncer.Result;
});
tasks.Add(task);
}
Task.WaitAll(tasks.ToArray());
}
public static class Debouncer
{
static ConcurrentDictionary<string, Tuple<Task, DateTime, List<object>>> _taskTimeInputDictionary = new();
static ConcurrentDictionary<string, Tuple<object, DateTime>> _processLockDictionary = new();
public async static Task<T> Debounce<inputT, aggrigateOutput, T>(string uniqueKey, int milliseconds, inputT input, Func<List<inputT>, aggrigateOutput> inputAggregator, Func<aggrigateOutput, T> action, bool singleThreaded = false, ILogger? logger = null)
{
Task<T> task = null;
var processLockDictionary = _processLockDictionary.GetOrAdd(uniqueKey + $"_ProcessLock {(singleThreaded ? "_ST" : "")}", new Tuple<object, DateTime>(new object(), DateTime.Now));
var processLock = processLockDictionary.Item1;
_processLockDictionary.TryUpdate(uniqueKey + $"_ProcessLock {(singleThreaded ? "_ST" : "")}", new Tuple<object, DateTime>(processLock, DateTime.Now), processLockDictionary);
lock (processLock)
{
var dictionaryRecord = _taskTimeInputDictionary.AddOrUpdate(uniqueKey + "_Task",
(key) => //key not found - create new
{
logger?.LogInformation("Adding dictionary record for " + uniqueKey);
return new Tuple<Task, DateTime, List<object>>(null, DateTime.Now.AddMilliseconds(milliseconds), new List<object>() { input });
},
(key, tt) =>
{
tt.Item3.Add(input);
return new Tuple<Task, DateTime, List<object>>(tt.Item1, DateTime.Now.AddMilliseconds(milliseconds), tt.Item3);
}
);
if (dictionaryRecord.Item1 == null)
{
Task<T> temptask = BuildTask(uniqueKey, inputAggregator, action, singleThreaded, logger, processLock);
_taskTimeInputDictionary.TryUpdate(uniqueKey + "_Task", new Tuple<Task, DateTime, List<object>>(temptask, dictionaryRecord.Item2, dictionaryRecord.Item3), dictionaryRecord);
task = temptask;
}
else
{
task = dictionaryRecord.Item1 as Task<T>;
}
}
//await Task.Delay(10000);
return await task;
//await Task.Delay(milliseconds);
//return default(T);
}
private static Task<T> BuildTask<inputT, aggrigateOutput, T>(string uniqueKey, Func<List<inputT>, aggrigateOutput> inputAggregatorFunc, Func<aggrigateOutput, T> processFunc, bool singleThreaded, ILogger? logger, object processLock)
{
try
{
return TimedExecutor(uniqueKey, inputAggregatorFunc, processFunc, singleThreaded, processLock, logger);
}
catch (Exception ex)
{
logger?.LogError("Error triggering timed executor. Returning Null: " + ex);
return Task.FromResult(default(T));
}
}
private static Task<T> TimedExecutor<inputT, aggregatorOutput, T>(string uniqueKey, Func<List<inputT>, aggregatorOutput> inputAggregatorFunc, Func<aggregatorOutput, T> processFunc, bool singleThreaded, object processLock, ILogger? logger)
{
aggregatorOutput input = default(aggregatorOutput);
bool Run;
DateTime executeTime;
var dictionaryRecord = _taskTimeInputDictionary[uniqueKey + "_Task"];
executeTime = dictionaryRecord.Item2;
Run = DateTime.Now > executeTime;
if (Run)
{
lock (processLock)
{
_taskTimeInputDictionary.Remove(uniqueKey + "_Task", out dictionaryRecord);
input = inputAggregatorFunc(dictionaryRecord.Item3.Select(i => (inputT)i).ToList());
logger?.LogInformation("Executing debouncer action for " + uniqueKey + ". Removing dictionary record.");
//Clean up process lock dictionary
_processLockDictionary.Where(i =>
{
var accessDiff = DateTime.Now.Ticks - i.Value.Item2.Ticks;
var remove = TimeSpan.FromTicks(accessDiff) > TimeSpan.FromHours(2);
return remove;
}).ForEach(i =>
{
_processLockDictionary.TryRemove(i.Key, out _);
});
}
if (singleThreaded && Run)
{
try
{
try
{
var result = processFunc(input);
return Task.FromResult(result);
}
catch (Exception ex)
{
logger?.LogError("Error executing debouncer action. Returning null from action: " + ex);
return Task.FromResult(default(T));
}
}
catch (Exception ex)
{
throw;
};
}
}
if (!singleThreaded && Run)
{
try
{
var result = processFunc(input);
return Task.FromResult(result);
}
catch (Exception ex)
{
logger?.LogError("Error executing debouncer action. Returning null from action: " + ex);
return Task.FromResult(default(T));
}
}
return Task.Run(async () =>
{
await Task.Delay(25);
return await BuildTask(uniqueKey, inputAggregatorFunc, processFunc, singleThreaded, logger, processLock);
});
}
}
我还没有找到为什么我喜欢这个的解决方案。我相信,当您无法运行返回带有操作的任务的函数时,
Task.Run()
无法尽可能高效地运行。
在我的例子中,我使用可枚举多次调用
Task.Run()
,并且在操作中,我调用了非任务结果函数。该函数调用了另一个非任务结果函数,该函数最终创建了一个执行 .Result
的 debouncer 任务。这样做,感觉好像有一些阻塞发生,无论是在线程级别还是在执行某种同步执行和间歇性的异步执行。
我在代码中解决了这个速度问题,方法是将被调用函数的返回对象调整为对象的任务,一直备份调用堆栈,直到初始
Task.Run()
调用。通过简单调整任务功能,速度大幅提升95%
这个解决方案确实解决了问题,但IMO认为这不是一个好的解决方案,因为它依赖于我控制中介功能。如果我无法将中间功能转化为任务,那么我就会陷入困境。