我编写了一个使用
DispatchQueue.concurrentPerform(iterations:execute:)
的例程并将其用于多线程编程。
令我惊讶的是,当我错误地将
queue.sync
的无意义迭代放入其他函数中时,性能会更好。
这次迭代让concurrentPerform
在A12X Bionic中使用更多核心。
当然苹果的文件说,
许多因素影响并发队列执行的任务数量,包括可用核心数量、其他进程正在完成的工作量以及其他串行调度队列中的任务数量和优先级。
我想通过合理的方式取得更好的表现。 如何控制
concurrentPerform
的并行度?
将队列的 QoS 更改为
.userInitiated
没有效果。
我们来试试吧!下一个示例以两种不同的方式完成简单的工作,首先在 .default .concurrent 队列上异步分派所有作业,然后使用 DispatchQueue.concurrentPerform。
DispatchQueue.concurrentPerform 是非常好的且易于使用的构造。
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
let q = DispatchQueue(label: "internal", qos: .utility, attributes: .concurrent)
func job()->String {
var sum = 0
for i in 1...1000 {
let r = Int.random(in: 0..<i)
sum += r
}
let res = sum.description
return res
}
func asyncFoo(on: DispatchQueue, id: Int, completition: @escaping (_ id: Int, _ result: String)->()) {
on.async {
let res = job()
completition(id, res)
}
}
let group = DispatchGroup()
var start = Date()
for i in 0..<10 {
group.enter() // enter the group before the task starts
asyncFoo(on: q, id: i) { (id, result) in
print("id:", id, i, result)
group.leave() // leave the group when task finished
}
}
group.notify(queue: .main) {
var stop = Date()
print("all asynchronously dispatched done in", stop.timeIntervalSince(start), "seconds")
let task: (Int)->() = { i in
let res = job()
print("id:", i, res)
}
print("continue again ...")
start = Date()
DispatchQueue.concurrentPerform(iterations: 10, execute: task)
stop = Date()
print("all .concurrentPerform done in", stop.timeIntervalSince(start), "seconds")
PlaygroundPage.current.finishExecution()
}
print("continue execution ...")
结果呢?
continue execution ...
id: 7 7 251189
id: 2 2 252628
id: 8 8 248525
id: 5 5 248212
id: 0 0 254412
id: 3 3 255094
id: 6 6 260566
id: 1 1 242460
id: 9 9 247018
id: 4 4 246296
all asynchronously dispatched done in 0.10741996765136719 seconds
continue again ...
id: 2 248549
id: 3 245366
id: 7 242868
id: 8 252247
id: 0 250905
id: 4 249909
id: 6 247525
id: 9 246204
id: 1 253908
id: 5 249081
all .concurrentPerform done in 0.05399894714355469 seconds
如果可用,我更喜欢使用 .concurrentPerform,但这实际上取决于...没有 API 可以更改 .concurrentPerform 上的任何内容,但很可能它将是您最好的执行者。
从技术上讲,
concurrentPerform
确实限制了设备上处理器数量的并行度。 concurrentPerform
的问题是每次迭代都是一个单独的调度。如果每次迭代只做很少的工作,那么大量调度的适度开销会很快减少(甚至完全抵消)并行性带来的好处。
在您引用的该文档中,改进循环代码部分提出了一种通过“跨步”增加
concurrentPerform
(在 C 和 Objective-C 中称为 dispatch_apply
)每次迭代期间执行的工作的模式。因此,假设您有一百万次迭代,而不是进行一百万次调度,每次处理一个数据点,而是考虑进行 100 次迭代,每次处理 10,000 个数据点。将调度数量从 100 万减少到 100,可能会对性能产生巨大影响。
考虑这种方法,它将迭代分成单独的、连续的“块”(借用 OpenMP 中使用的术语):
extension DispatchQueue {
/// Chunked concurrentPerform
///
/// - Parameters:
///
/// - iterations: How many total iterations.
///
/// - chunks: How many chunks into which these iterations will be divided. This is optional and will default to
/// `activeProcessorCount`. If the work is largely uniform, you can safely omit this parameter and the
/// work will evenly distributed amongst the CPU cores.
///
/// If different chunks are likely to take significantly different amounts of time,
/// you may want to increase this value above the processor count to avoid blocking the whole process
/// for slowest chunk and afford the opportunity for threads processing faster chunks to handle more than one.
///
/// But, be careful to not increase this value too high, as each dispatched chunk entails a modest amount of overhead.
/// You may want to empirically test different chunk sizes (vs the default value) for your particular use-case.
///
/// - chunk: Closure to be called for each chunk.
static func chunkedConcurrentPerform(iterations: Int, chunks: Int? = nil, chunk: @Sendable (Range<Int>) -> Void) {
let chunks = min(iterations, chunks ?? ProcessInfo.processInfo.activeProcessorCount)
let (baseChunkSize, remainder) = iterations.quotientAndRemainder(dividingBy: chunks)
concurrentPerform(iterations: chunks) { chunkIndex in
let start = chunkIndex * baseChunkSize + min(chunkIndex, remainder)
let end = start + baseChunkSize + (chunkIndex < remainder ? 1 : 0)
chunk(start ..< end)
}
}
}
所以,而不是:
DispatchQueue.concurrentPerform(iterations: iterations) { i in
…
}
你会:
DispatchQueue.chunkedConcurrentPerform(iterations: iterations) { range in
for i in range {
…
}
}
因此,当
iterations
等于 1,000,000 时,前者将导致 1m 次调度,而在 8 处理器设备上,后者只会执行 8 次调度,每次调度处理索引的子集:
0..<125000
125000..<250000
250000..<375000
375000..<500000
500000..<625000
625000..<750000
750000..<875000
875000..<1000000
如果
iterations
很大,并且为每个值 i
执行的工作是适度的,这可能会导致性能显着提高。当您增加 concurrentPerform
每次迭代的工作量时,通常会获得最佳性能。
无论如何,如果您想进一步限制并行度,您可以提供
chunks
参数:
DispatchQueue.chunkedConcurrentPerform(iterations: iterations, chunks: 4) { range in
for i in range {
…
}
}
得出这些范围:
0..<250000
250000..<500000
500000..<750000
750000..<1000000
最终效果是将并行度降低到 4(即使该设备有 8 个处理器)。话虽如此,必须进一步减少处理器数量以下的“块”是相当不寻常的。通常,省略此参数,即在处理器之间均匀地分解工作,每个处理器一次调度,就可以完成我们所需要的。
注意,所有标准多线程/基准测试警告仍然适用:
虽然这种分块方法可以比单独的
concurrentPerform
提供显着的改进,但仍然存在一些用例,简单的串行计算仍然可以更有效。 (一种常见的用例是需要同步的情况。)只是有一些用例不适合并行性。或者有时,某些例程可能会受益于使用不同的方法(例如,GPU 与 CPU)。
避免使用游乐场进行基准测试,因为它引入了各种可以极大改变性能结果的诊断。不仅仅是它速度较慢,而且速度不均匀,而且它的性能远不能代表您在真实应用程序中体验到的情况。
请注意,您应该只对应用程序的“发布”版本进行基准测试,而不是“调试”版本。同样,所有调试开销都会扭曲结果,导致得出错误的结论。
您可能还想专注于物理设备上的基准测试,而不是依赖于模拟器结果。
编写多线程算法时,请注意任何共享的可变状态,确保同步对该状态的访问,并编写最小化执行同步次数的例程。