我正在观看 Rob Pike 的精彩演讲,名为“并发不是并行”,我试图用并发查询来扩展这个示例,但似乎我并不完全理解 goroutine 是如何工作的。特别是,我不明白如何在执行期间取消 goroutine。
这是一个可以执行查询的虚拟连接(假设它是不可更改的,就像外部 API 一样):
type Conn struct {
id int
delay time.Duration
}
type Result int
func (c *Conn) DoQuery(query string) Result {
defer fmt.Printf("DoQuery call for connection %d finished\n", c.id)
time.Sleep(c.delay)
return Result(len(query))
}
下面是向每个连接发送查询的 Query
函数。当它获得第一个结果时,它应该取消所有其他调用(作为拆卸),但事实并非如此。当 goroutine 开始查询时,它失去了观看
ctx.Done()
通道的能力,因为
conn.DoQuery(...)
调用处于阻塞状态。
func Query(conns []Conn, query string) Result {
results := make(chan Result, len(conns))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, conn := range conns {
go func() {
defer fmt.Printf("goroutine for connection %d finished\n", conn.id)
select {
case <-ctx.Done():
return
default:
// While the goroutine computes the result of this call
// it does not watch the ctx.Done() channel, so the call
// cannot be canceled mid-execution.
results <- conn.DoQuery(query)
}
}()
}
return <-results
}
这也是将所有内容连接在一起的 main
函数(如果您想运行该示例):
func main() {
conns := []Conn{
{id: 1, delay: 1 * time.Second},
{id: 2, delay: 3 * time.Second},
{id: 3, delay: 5 * time.Second},
{id: 4, delay: 4 * time.Second},
}
start := time.Now()
result := Query(conns, "select count(*) from users;")
duration := time.Since(start)
fmt.Println(result, duration)
for {}
}
所以我的问题是:实现 Query
函数以实现所需行为的正确方法是什么?
当其中一个 goroutine 完成处理时关闭其他 go 例程的一种方法是维护一个连接大小的布尔通道。
我们将频道命名为
quit
谁先完成处理,谁就将 true 推入布尔通道 所有 go 例程都可以访问此通道。在 goroutine 中,你可以通过
来处理这个问题
select {
case <- quit:
return
default:
// do your stuff here
}
当其中一个 go 例程完成处理后,您可以让它将 n-1
true 值推送到通道中,所有其他 goroutine 将从通道中获取值并返回,这会自动结束 goroutine。