我正在尝试使用信号量来控制可以同时运行的作业实例的数量。虽然这对于等待方法来说相当简单,但我还希望该值可以在运行时配置,以便我可以向上或向下增加计数。
我意识到倒计时可能会出现问题,但有没有办法真正做到这一点?这是使用信号量的正确方法吗?
根据ReleaseSemaphore的文档,对于
lReleaseCount
参数:
信号量对象当前计数要增加的量。该值必须大于零。 如果指定的数量会导致信号量的计数超过创建信号量时指定的最大计数,则计数不会更改并且函数返回 FALSE。
本文档和其他文档表明信号量不是您的限制的正确选择。信号量一旦创建,就会有一个硬性最大值,如果不重新创建信号量就无法更改该最大值。换句话说,它不是一个可以更改的动态值。
在这种情况下,您需要找到另一种方法来管理您的限制。
您可以使用信号量的一种方法是分配一个“足够大”的信号量来满足所有未来的需求,然后获取足够的“实例”,以将可用数量减少到您需要的数量。片刻。当您想增加可用实例的数量时,只需释放您一开始抓住的一些实例即可。 但是,我质疑你为什么要这样做。真正决定你可以同时执行多少个作业的限制因素是什么?信号量很可能不是这个问题的正确答案。
我不建议为此使用信号量。
如果您使用 .NET 4,我建议的方法是创建一个自定义
TaskScheduler,它允许您在运行时更改并发级别。然后,您可以使用单个 Parallel.For
/
ForEach
调用在选项中传递此 TaskScheduler 来运行整个操作,并在运行时更改并发级别。这将使处理向上或向下移动级别变得相当容易。当级别上升时,您只需根据需要添加新线程即可。当它们出现故障时,只需从内部集合中删除该线程(但不要停止它),并让它完成当前的工作。这将允许您根据需要进行扩展。
的建议,我在 kotlin 中实现了动态信号量。我不知道如何在 C# 中做到这一点,但应该相当相似。只需将其视为伪代码答案即可。 该代码具有以下行为:
const val maximalSize = 100
private val DynamicSemaphoreSingleton = DynamicSemaphore(maximalSize)
fun getDynamicSemaphoreSingleton(permits: Int): DynamicSemaphore {
return DynamicSemaphoreSingleton.also { it.updatePermits(permits) }
}
class DynamicSemaphore(private var permits: Int) : Semaphore {
private val backingSemaphore = Semaphore(maximalSize, maximalSize - permits)
private var extraAcquires = 0
fun updatePermits(newPermits: Int) {
val difference = newPermits - permits
if (difference < 0) {
val numberOfFailedAcquires =
List(-difference) { backingSemaphore.tryAcquire() }.count { it.not() }
synchronized(this) { extraAcquires += numberOfFailedAcquires }
} else {
repeat(difference) { backingSemaphore.release() }
}
permits = newPermits
}
override val availablePermits: Int
get() = backingSemaphore.availablePermits
override suspend fun acquire() {
backingSemaphore.acquire()
}
override fun release() {
synchronized(this) {
if (extraAcquires > 0) {
extraAcquires--
} else {
backingSemaphore.release()
}
}
}
override fun tryAcquire(): Boolean {
return backingSemaphore.tryAcquire()
}
}