我在多个非缓冲通道上使用select时发现了
select {
case <- chana:
case <- chanb:
}
即使两个通道都有数据,但处理此选择时,如果chana和case chanb不兼容,则调用。
package main
import (
"fmt"
_ "net/http/pprof"
"sync"
"time"
)
func main() {
chana := make(chan int)
chanb := make(chan int)
go func() {
for i := 0; i < 1000; i++ {
chana <- 100 * i
}
}()
go func() {
for i := 0; i < 1000; i++ {
chanb <- i
}
}()
time.Sleep(time.Microsecond * 300)
acount := 0
bcount := 0
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
select {
case <-chana:
acount++
case <-chanb:
bcount++
}
if acount == 1000 || bcount == 1000 {
fmt.Println("finish one acount, bcount", acount, bcount)
break
}
}
wg.Done()
}()
wg.Wait()
}
运行此演示,当其中一个chana,chanb完成读/写,另一个可能仍然保留999-1。
有没有办法确保平衡?
Go select
声明不偏向任何(准备好的)案件。引用规范:
如果一个或多个通信可以继续,则可以通过统一的伪随机选择来选择可以继续的单个通信。否则,如果存在默认情况,则选择该情况。如果没有默认情况,则“select”语句将阻塞,直到至少一个通信可以继续。
如果可以进行多次通信,则随机选择一个。这不是一个完美的随机分布,并且规范不保证,但它是随机的。
你经历的是Go Playground有GOMAXPROCS=1
(which you can verify here)和goroutine调度程序没有先发制人的结果。这意味着默认情况下goroutines不是并行执行的。如果遇到阻塞操作(例如,从网络读取,或尝试从阻塞的通道接收或发送),则将goroutine置于驻留状态,并且另一个准备继续运行。
由于您的代码中没有阻塞操作,因此goroutine可能不会被置于停放状态,并且可能只有一个“生产者”goroutine将运行,而另一个可能无法安排(永远)。
在我的本地计算机上运行你的代码GOMAXPROCS=4
,我有非常“现实”的结果。运行几次,输出:
finish one acount, bcount 1000 901
finish one acount, bcount 1000 335
finish one acount, bcount 1000 872
finish one acount, bcount 427 1000
如果您需要优先考虑单个案例,请查看以下答案:Force priority of go select statement
select
的默认行为不保证同等优先级,但平均而言它将接近它。如果你需要保证相同的优先级,那么你不应该使用select
,但你可以从2个通道做一个2序列的非阻塞接收,这看起来像这样:
for {
select {
case <-chana:
acount++
default:
}
select {
case <-chanb:
bcount++
default:
}
if acount == 1000 || bcount == 1000 {
fmt.Println("finish one acount, bcount", acount, bcount)
break
}
}
如果两个电源值相同,则上述2个非阻塞接收将以相同的速度(具有相同的优先级)消耗2个通道,如果没有,则另一个通道不断地接收而不会被延迟或阻塞。
有一点需要注意的是,如果没有任何通道提供任何接收值,这将基本上是一个“忙”循环,因此消耗计算能力。为了避免这种情况,我们可以检测到没有任何通道准备就绪,然后对两个接收使用select
语句,然后阻塞,直到其中一个接收完成,而不是浪费任何CPU资源:
for {
received := 0
select {
case <-chana:
acount++
received++
default:
}
select {
case <-chanb:
bcount++
received++
default:
}
if received == 0 {
select {
case <-chana:
acount++
case <-chanb:
bcount++
}
}
if acount == 1000 || bcount == 1000 {
fmt.Println("finish one acount, bcount", acount, bcount)
break
}
}
有关goroutine调度的更多详细信息,请参阅以下问题:
Number of threads used by Go runtime
Goroutines 8kb and windows OS thread 1 mb
Why does it not create many threads when many goroutines are blocked in writing file in golang?
正如评论中所提到的,如果你想确保平衡,你可以放弃在阅读goroutine中完全使用select
并依赖无缓冲通道提供的同步:
go func() {
for {
<-chana
acount++
<-chanb
bcount++
if acount == 1000 || bcount == 1000 {
fmt.Println("finish one acount, bcount", acount, bcount)
break
}
}
wg.Done()
}()
编辑:您也可以从供应方面进行平衡,但@ icza的答案对我来说似乎比这更好,并且还解释了导致这种情况发生的调度。令人惊讶的是,即使在我的(虚拟)机器上它也是片面的。
这里可以平衡供应方面的两个例程(某种程度上似乎不适用于Playground)。
package main
import (
"fmt"
_ "net/http/pprof"
"sync"
"sync/atomic"
"time"
)
func main() {
chana := make(chan int)
chanb := make(chan int)
var balanceSwitch int32
go func() {
for i := 0; i < 1000; i++ {
for atomic.LoadInt32(&balanceSwitch) != 0 {
fmt.Println("Holding R1")
time.Sleep(time.Nanosecond * 1)
}
chana <- 100 * i
fmt.Println("R1: Sent i", i)
atomic.StoreInt32(&balanceSwitch, 1)
}
}()
go func() {
for i := 0; i < 1000; i++ {
for atomic.LoadInt32(&balanceSwitch) != 1 {
fmt.Println("Holding R2")
time.Sleep(time.Nanosecond * 1)
}
chanb <- i
fmt.Println("R2: Sent i", i)
atomic.StoreInt32(&balanceSwitch, 0)
}
}()
time.Sleep(time.Microsecond * 300)
acount := 0
bcount := 0
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
select {
case <-chana:
acount++
case <-chanb:
bcount++
}
fmt.Println("Acount Bcount", acount, bcount)
if acount == 1000 || bcount == 1000 {
fmt.Println("finish one acount, bcount", acount, bcount)
break
}
}
wg.Done()
}()
wg.Wait()
}
通过更改atomic.LoadInt32(&balanceSwitch) != XX
和atomic.StoreInt32(&balanceSwitch, X)
或其他机制,您可以将其映射到任意数量的例程。这可能不是最好的事情,但如果这是一个要求,那么你可能不得不考虑这些选择。希望这可以帮助。